阅读服务¶
ReadingService根据不同的使用案例处理图形的就地修改。DataPipe
特征¶
动态分片¶
动态分片是通过 and 根据对应的多进程和分布式 worker 的信息对流水线进行分片来实现的。而且, TorchData 提供了两种类型的用户,让用户可以定义管道中的分片位置。MultiProcessingReadingServiceDistributedReadingServiceDataPipe
- sharding_filter(- ShardingFilter):当管道可复制时,每个分布式/多进程工作程序都会从自己的图形副本加载数据,同时跳过不属于放置点的相应工作程序的样本。- DataPipe- sharding_filter
- sharding_round_robin_dispatch(- ShardingRoundRobinDispatcher):当管道中有任何分支时,该分支(即 之前的所有 DataPipes )将被视为不可复制的分支(在多处理的上下文中)。将创建一个 dispatching 进程,用于从不可复制的分支加载数据并将数据分发到后续的工作进程。- sharding_round_robin_dispatch- DataPipe- sharding_round_robin_dispatch
以下是管道中具有两种分片策略的示例。
![digraph 示例 {
 子图 cluster_replicable {
 label=“可复制”
 a -> b -> c -> d -> l;
 颜色=蓝色;
 }
子图 cluster_non_replicable {
 style=filled;
 颜色=浅灰色;
 节点 [style=filled,color=white];
 label=“不可复制”
 e -> f -> g -> k;
 h -> i -> j -> k;
 }
k -> l -> fullsync -> 结束;
a [label=“DP1”];
 b [label=“shuffle”];
 c [标签=“sharding_filter”, color=blue];
 d [label=“DP4”];
 e [label=“DP2”];
 f [label=“shuffle”];
 g [标签=“sharding_round_robin_dispatch”, style=“填充,圆整”, 颜色=红色, 填充颜色=白色];
 h [label=“DP3”];
 i [label=“shuffle”];
 j [label=“sharding_round_robin_dispatch”, style=“filled,rounded”, color=red, fillcolor=white];
 k [label=“DP5 (最低共同祖先)”];
 l [label=“DP6”];
 完全同步;
 end [形状=框];
}](https://pytorch.org/data/0.8/_images/graphviz-a4b77ae8d32185927d8707cd0b25fc9226103ca2.png)
当 multiprocessing 发生时,图形变为:
![digraph 示例 {
 子图 cluster_worker_0 {
 label=“工人 0”
 a0 -> b0 -> c0 -> d0 -> l0;
 m0 -> l0;
 颜色=蓝色;
 }
子图 cluster_worker_1 {
 label=“工作线程 1”
 a1 -> b1 -> c1 -> d1 -> l1;
 m1 -> l1;
 颜色=蓝色;
 }
子图 cluster_non_replicable {
 style=filled;
 颜色=浅灰色;
 节点 [style=filled,color=white];
 label=“不可复制”
 e -> f -> g -> k;
 h -> i -> j -> k;
 k -> round_robin_demux;
 }
round_robin_demux -> m0;
 round_robin_demux -> m1;
 l0 -> n;
 l1 -> n;
 n -> fullsync -> 结束;
a0 [label=“DP1”];
 b0 [label=“shuffle”];
 c0 [label=“sharding_filter”, color=blue];
 d0 [label=“DP4”];
 a1 [label=“DP1”];
 b1 [label=“shuffle”];
 c1 [label=“sharding_filter”, color=blue];
 d1 [label=“DP4”];
 e [label=“DP2”];
 f [label=“shuffle”];
 g [标签=“sharding_round_robin_dispatch”, style=“填充,圆整”, 颜色=红色, 填充颜色=白色];
 h [label=“DP3”];
 i [label=“shuffle”];
 j [label=“sharding_round_robin_dispatch”, style=“filled,rounded”, color=red, fillcolor=white];
 k [label=“DP5 (最低共同祖先)”];
 完全同步;
 l0 [label=“DP6”];
 l1 [label=“DP6”];
 m0 [label=“客户”]
 m1 [label=“客户”]
 n [label=“客户”]
 end [形状=框];
}](https://pytorch.org/data/0.8/_images/graphviz-8581116405d37f067d4dfa1c6bb711728c59d81e.png)
Client图中是 A,它发送请求并从 Multiprocessing 队列接收响应。DataPipe
决定论¶
在 中,a 成为随机性的单一来源,每个来源都可以通过 a 访问它并为随机运算生成相应的随机种子。DataLoader2SeedGeneratorReadingServiceinitialize_iteration()DataPipe
为了确保 Dataset 分片在多处理进程和分布式节点上是互斥的并且集体详尽,这将有助于MultiProcessingReadingServiceDistributedReadingServiceDataLoader2以同步 或 之前的任何随机运算的随机状态。对于分片后的其余作,每个作都会根据分布式 rank 和 worker process id 生成唯一的随机状态,以便执行不同的随机转换。DataPipesharding_filtersharding_round_robin_dispatchDataPipeReadingService
图形模式¶
这也允许数据预处理管道更轻松地从研究过渡到生产。创建图形并使用 验证图形后,配置并连接到生产服务/基础设施的不同方法,例如可以提供给DataPipeReadingServicesReadingServiceAIStoreDataLoader2作为直接替代品。它可能会搜索图形,并找到可以委托给生产服务/基础设施的作,然后相应地修改图形以实现更高性能的执行。ReadingServiceDataPipe
扩展 ReadingService¶
以下是自定义的接口。ReadingService
- 类 torchdata.dataloader2.ReadingServiceInterface¶
- 的接口。请基于此接口类扩展自定义。 - ReadingService- ReadingService- ReadingService 在被调用之前必须是可挑选的。这是因为它的副本将是 created by 以避免同一 ReadingService 对象被 multiple ,并且其内部状态将可由它们中的每一个修改。 - initialize- DataLoader2- DataLoader2- 由于此约束,某些初始化步骤可能需要在方法中进行,而不是在 ReadingService 类中进行。 - initialize- __init__- finalize() 无¶
- ReadingService清理内部状态并完全关闭服务。 在 的 和 中调用 。- DataLoader2- shutdown- __del__
 - finalize_iteration() 无¶
- ReadingService在 epoch 完成后结束服务。调用时间 的迭代器已耗尽。- DataLoader2
 - 抽象 initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe]、 MapDataPipe]¶
- ReadingService获取一个图形,根据自定义需求将其调整为新图形。 首次创建 iterator 时调用一次。在调用该方法之前, 对象必须是可腌制的。- DataPipe- DataPipe- DataLoader2- ReadingService- 参数
- datapipe – 原始图形。 - DataPipe
- 结果
- 改编的图表或新图表。 - DataPipe
 
 - initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: 可选[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = 无) 可选[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]]¶
- ReadingService为一个 epoch 启动 service。开始时被调用 每次获取 iterator 的- DataLoader2- 参数
- seed_generator – 由 DataLoader2 创建和管理的 SeedGenerator 对象。作为单个 source of randomness 的 source 的 Randomness 的 intent 函数,它将控制所有随机运算的确定性 与 DataPipes 的图形。 
- iter_reset_fn – 当链接多个时,可选的重置功能 - ReadingServcie- SequentialReadingService- ReadingServices
 
- 结果
- subseqeuent 将使用的 New - iter_reset_fn- ReadingService
 - 例 - MultiProcessingReadingService 开始为每个进程设置 worker 种子并预取 图表中的项。 
 
检查点/快照功能正在开发中。下面是初步界面(可能会有小的更改):
- 类 torchdata.dataloader2.CheckpointableReadingServiceInterface¶
- 使用两种附加方法进行扩展,以保存/恢复数据处理图的状态。 - ReadingServiceInterface- 抽象 checkpoint() 字节¶
- ReadingService序列化内部状态。已调用 。- DataLoader2.state_dict
 - 抽象还原(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe]¶
- ReadingService根据序列化状态调整 Graph。 首次创建 iterator 时调用一次。 的对应项 ,它从头开始调整 graph。- DataPipe- DataLoader2- initialize- DataPipe- 参数
- DataPipe – 改编者 - DataPipe- ReadingService
- serialized_state – 用于恢复状态的内部状态的序列化状态 适应的图形。 - DataPipe
 
- 结果
- 改编自 serialized 状态。 - DataPipe
 
 
图形函数¶
并且,中提供了图形工具函数来帮助用户对自定义进行图形重写:torchdata.dataloader.graphDataPipeReadingService
| 遍历 DataPipes 及其属性以提取 DataPipe 图形。 | |
| 给定 function 生成的 DataPipe 图,返回提供的 DataPipe 类型的 DataPipe 实例。 | |
| 给定函数生成的 DataPipe 图,返回所有 DataPipe 实例的列表,不重复。 | |
| 给定 function 生成的 DataPipe 图和需要移除的 DataPipe,返回新的 DataPipe 图。 | |
| 给定 function 生成的 DataPipe 图和需要替换的 DataPipe 以及新的 DataPipe,返回新的 DataPipe 图。 |