ReadingService¶
ReadingService handles in-place modification of DataPipe graph based on different use cases.
Features¶
Dynamic Sharding¶
Dynamic sharding is achieved by MultiProcessingReadingService and DistributedReadingService to shard the pipeline based on the information of corresponding multiprocessing and distributed workers. And, TorchData offers two types of DataPipe letting users define the sharding place within the pipeline.
sharding_filter(ShardingFilter): When the pipeline is replicable, each distributed/multiprocessing worker loads data from its own replica of theDataPipegraph, while skipping samples that do not belong to the corresponding worker at the point wheresharding_filteris placed.sharding_round_robin_dispatch(ShardingRoundRobinDispatcher): When there is anysharding_round_robin_dispatchDataPipein the pipeline, that branch (i.e. all DataPipes prior tosharding_round_robin_dispatch) will be treated as a non-replicable branch (in the context of multiprocessing). A single dispatching process will be created to load data from the non-replicable branch and distribute data to the subsequent worker processes.
The following is an example of having two types of sharding strategies in the pipeline.
![digraph Example {
subgraph cluster_replicable {
label="Replicable"
a -> b -> c -> d -> l;
color=blue;
}
subgraph cluster_non_replicable {
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="Non-Replicable"
e -> f -> g -> k;
h -> i -> j -> k;
}
k -> l -> fullsync -> end;
a [label="DP1"];
b [label="shuffle"];
c [label="sharding_filter", color=blue];
d [label="DP4"];
e [label="DP2"];
f [label="shuffle"];
g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
h [label="DP3"];
i [label="shuffle"];
j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
k [label="DP5 (Lowest common ancestor)"];
l [label="DP6"];
fullsync;
end [shape=box];
}](https://pytorch.org/data/0.6/_images/graphviz-a4b77ae8d32185927d8707cd0b25fc9226103ca2.png)
When multiprocessing takes place, the graph becomes:
![digraph Example {
subgraph cluster_worker_0 {
label="Worker 0"
a0 -> b0 -> c0 -> d0 -> l0;
m0 -> l0;
color=blue;
}
subgraph cluster_worker_1 {
label="Worker 1"
a1 -> b1 -> c1 -> d1 -> l1;
m1 -> l1;
color=blue;
}
subgraph cluster_non_replicable {
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="Non-Replicable"
e -> f -> g -> k;
h -> i -> j -> k;
k -> round_robin_demux;
}
round_robin_demux -> m0;
round_robin_demux -> m1;
l0 -> n;
l1 -> n;
n -> fullsync -> end;
a0 [label="DP1"];
b0 [label="shuffle"];
c0 [label="sharding_filter", color=blue];
d0 [label="DP4"];
a1 [label="DP1"];
b1 [label="shuffle"];
c1 [label="sharding_filter", color=blue];
d1 [label="DP4"];
e [label="DP2"];
f [label="shuffle"];
g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
h [label="DP3"];
i [label="shuffle"];
j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
k [label="DP5 (Lowest common ancestor)"];
fullsync;
l0 [label="DP6"];
l1 [label="DP6"];
m0 [label="Client"]
m1 [label="Client"]
n [label="Client"]
end [shape=box];
}](https://pytorch.org/data/0.6/_images/graphviz-8581116405d37f067d4dfa1c6bb711728c59d81e.png)
Client in the graph is a DataPipe that sends a request and receives a response from multiprocessing queues.
Determinism¶
In DataLoader2, a SeedGenerator becomes a single source of randomness and each ReadingService would access it via initialize_iteration() and generate corresponding random seeds for random DataPipe operations.
In order to make sure that the Dataset shards are mutually exclusive and collectively exhaustive on multiprocessing processes and distributed nodes, MultiProcessingReadingService and DistributedReadingService would help DataLoader2 to synchronize random states for any random DataPipe operation prior to sharding_filter or sharding_round_robin_dispatch. For the remaining DataPipe operations after sharding, unique random states are generated based on the distributed rank and worker process id by each ReadingService, in order to perform different random transformations.
Graph Mode¶
This also allows easier transition of data-preprocessing pipeline from research to production. After the DataPipe graph is created and validated with the ReadingServices, a different ReadingService that configures and connects to the production service/infrastructure such as AIStore can be provided to DataLoader2 as a drop-in replacement. The ReadingService could potentially search the graph, and find DataPipe operations that can be delegated to the production service/infrastructure, then modify the graph correspondingly to achieve higher-performant execution.
Extend ReadingService¶
The followings are interfaces for custom ReadingService.
- class torchdata.dataloader2.ReadingServiceInterface¶
Interface for
ReadingService. Please extend customReadingServicebased on this interface class.ReadingService must be picklable prior to
initializebeing called. This is because a copy of it will be created byDataLoader2to avoid the situation where the same ReadingService object is used by multipleDataLoader2, and its internal state will be modifiable by each of them.As a result of this constraint, certain initialization steps may need to take place within the
initializemethod rather than__init__of the ReadingService class.- finalize() None¶
ReadingServicecleans up internal states and fully shuts down the service. Called inDataLoader2’sshutdownand__del__.
- finalize_iteration() None¶
ReadingServiceends service after an epoch is finished. Called when the iterator ofDataLoader2is depleted.
- abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]¶
ReadingServicetakes aDataPipegraph, adapts it into a newDataPipegraph based on the custom need. Called once in creatingDataLoader2iterator at first time. Prior to calling this method, theReadingServiceobject must be picklable.- Parameters:
datapipe – Original
DataPipegraph.- Returns:
An adapted or a new
DataPipegraph.
- initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]]¶
ReadingServicespins up service for an epoch. Called at the beginning of every time gettingDataLoader2iterator.- Parameters:
seed_generator – SeedGenerator object created and managed by DataLoader2. As the single source of randomness, it will govern the determinism for all of random operations with the graph of DataPipes.
iter_reset_fn – Optional reset function from the prior
ReadingServciewhenSequentialReadingServicechains multipleReadingServices
- Returns:
A new
iter_reset_fnto be used by subseqeuentReadingService
Example
MultiProcessingReadingService starts setting worker seeds per process and prefetching items from the graph.
The checkpoint/snapshotting feature is a work in progress. Here is the preliminary interface (small changes are likely):
- class torchdata.dataloader2.CheckpointableReadingServiceInterface¶
Extend
ReadingServiceInterfacewith two additional methods to save/restore the state of the data-processing graph.- abstract checkpoint() bytes¶
ReadingServiceserializes the internal states. Called inDataLoader2.state_dict.
- abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe]¶
ReadingServiceadaptsDataPipegraph based on the serialized state. Called once in creatingDataLoader2iterator at first time. Counterpart ofinitialize, which adaptDataPipegraph from scratch.- Parameters:
datapipe – original
DataPipegraph before adapted byReadingServiceserialized_state – The serialized state of internal state used to restore the state of the adapted
DataPipegraph.
- Returns:
Adapted
DataPipegenerated from the serialized state.
Graph Functions¶
And, graph utility functions are provided in torchdata.dataloader.graph to help users to do DataPipe graph rewrite for custom ReadingService:
Traverse the DataPipes and their attributes to extract the DataPipe graph. |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |
|
Given the graph of DataPipe generated by |