DataLoader2¶
A light-weight DataLoader2 is introduced to decouple the overloaded data-manipulation functionalities from torch.utils.data.DataLoader to DataPipe operations. Besides, a certain features can only be achieved with DataLoader2 like snapshotting and switching backend services to perform high-performant operations.
DataLoader2¶
- class torchdata.dataloader2.DataLoader2(datapipe: Union[IterDataPipe, MapDataPipe], datapipe_adapter_fn: Optional[Union[Iterable[Adapter], Adapter]] = None, reading_service: Optional[ReadingServiceInterface] = None)¶
- DataLoader2is used to optimize and execute the given- DataPipegraph based on- ReadingServiceand- Adapterfunctions, with support for- Dynamic sharding for multi-process and distributed data loading 
- Multiple backend - ReadingServices
- DataPipegraph in-place modification like shuffle control, memory pinning, etc.
- Snapshot the state of data-preprocessing pipeline (WIP) 
 - Parameters:
- datapipe ( - IterDataPipeor- MapDataPipe) –- DataPipefrom which to load the data. A deepcopy of this will be made during initialization, allowing the input to be re-used in a different- DataLoader2without sharing states.
- datapipe_adapter_fn ( - Iterable[Adapter]or- Adapter, optional) –- Adapterfunction(s) that will be applied to the DataPipe (default:- None).
- reading_service (ReadingServiceInterface, optional) – defines how - DataLoader2should execute operations over the- DataPipe, e.g. multiprocessing/distributed (default:- None). A deepcopy of this will be made during initialization, allowing the input to be re-used in a different- DataLoader2without sharing states.
 
 - __iter__() DataLoader2Iterator[T_co]¶
- Return a singleton iterator from the - DataPipegraph adapted by- ReadingService.- DataPipewill be restored if the serialized state is provided to construct- DataLoader2. And,- initialize_iterationand- finalize_iteratorwill be invoked at the beginning and end of the iteration correspondingly.
 - classmethod from_state(state: Dict[str, Any], reading_service: CheckpointableReadingServiceInterface) DataLoader2[T_co]¶
- Create new - DataLoader2with- DataPipegraph and- ReadingServicerestored from the serialized state.
 - load_state_dict(state: Dict[str, Any]) None¶
- For the existing - DataLoader2, load serialized state to restore- DataPipegraph and reset the internal state of- ReadingService.
 - shutdown() None¶
- Shuts down - ReadingServiceand clean up iterator.
 - state_dict() Dict[str, Any]¶
- Return a dictionary to represent the state of data-processing pipeline with keys: - serialized_datapipe:Serialized- DataPipebefore- ReadingServiceadaption.
- reading_service_state: The state of- ReadingServiceand adapted- DataPipe.
 
 
Note:
DataLoader2 doesn’t support torch.utils.data.Dataset or torch.utils.data.IterableDataset. Please wrap each of them with the corresponding DataPipe below:
- torchdata.datapipes.map.SequenceWrapper:- torch.utils.data.Dataset
- torchdata.datapipes.iter.IterableWrapper:- torch.utils.data.IterableDataset
ReadingService¶
ReadingService specifies the execution backend for the data-processing graph. There are three types of ReadingServices in TorchData:
| 
 | |
| 
 | |
| 
 | 
Each ReadingServices would take the DataPipe graph and modify it to achieve a few features like dynamic sharding, sharing random seeds and snapshoting for multi-/distributed processes.
This also allows easier transition of data-preprocessing pipeline from research to production. After the DataPipe graph is created and validated with the ReadingServices, a different ReadingService that configures and connects to the production service/infra such as AIStore can be provided to DataLoader2 as a drop-in replacement. The ReadingService could potentially search the graph, and find DataPipe operations that can be delegated to the production service/infra, then modify the graph correspondingly to achieve higher-performant execution.
The followings are interfaces for custom ReadingService.
- class torchdata.dataloader2.ReadingServiceInterface¶
- Interface for - ReadingService. Please extend custom- ReadingServicebased on this interface class.- finalize() None¶
- ReadingServicecleans up internal states and fully shuts down the service. Called in- DataLoader2’s- shutdownand- __del__.
 - finalize_iteration() None¶
- ReadingServiceends service after an epoch is finished. Called when the iterator of- DataLoader2is depleted.
 - abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]¶
- ReadingServicetakes a- DataPipegraph, adapts it into a new- DataPipegraph based on the custom need. Called once in creating- DataLoader2iterator at first time.- Parameters:
- datapipe – Original - DataPipegraph.
- Returns:
- An adapted or a new - DataPipegraph.
 
 - initialize_iteration() None¶
- ReadingServicespins up service for an epoch. Called at the beginning of every time getting- DataLoader2iterator.
 
The checkpoint/snapshotting feature is a work in progress. Here is the preliminary interface (small changes are likely):
- class torchdata.dataloader2.CheckpointableReadingServiceInterface¶
- Extend - ReadingServiceInterfacewith two additional methods to save/restore the state of the data-processing graph.- abstract checkpoint() bytes¶
- ReadingServiceserializes the internal states. Called in- DataLoader2.state_dict.
 - abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe]¶
- ReadingServiceadapts- DataPipegraph based on the serialized state. Called once in creating- DataLoader2iterator at first time. Counterpart of- initialize, which adapt- DataPipegraph from scratch.- Parameters:
- datapipe – original - DataPipegraph before adapted by- ReadingService
- serialized_state – The serialized state of internal state used to restore the state of the adapted - DataPipegraph.
 
- Returns:
- Adapted - DataPipegenerated from the serialized state.
 
 
And, graph utility functions are provided in torchdata.dataloader.graph to help users to define their own ReadingService and modify the graph:
| Traverse the DataPipes and their attributes to extract the DataPipe graph. | |
| Given the graph of DataPipe generated by  | |
| Given the graph of DataPipe generated by  | |
| Given the graph of DataPipe generated by  | 
Adapter¶
Adapter is used to configure, modify and extend the DataPipe graph in DataLoader2. It allows in-place
modification or replace the pre-assembled DataPipe graph provided by PyTorch domains. For example, Shuffle(False) can be
provided to DataLoader2, which would disable any shuffle operations in the DataPipes graph.
- class torchdata.dataloader2.adapter.Adapter¶
- Adapter Base Class that follows python Callable protocol. - abstract __call__(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]¶
- Callable function that either runs in-place modification of the - DataPipegraph, or returns a new- DataPipegraph.- Parameters:
- datapipe – - DataPipethat needs to be adapted.
- Returns:
- Adapted - DataPipeor new- DataPipe.
 
 
Here are the list of Adapter provided by TorchData in torchdata.dataloader2.adapter:
| Shuffle DataPipes adapter allows control over all existing Shuffler ( | |
| CacheTimeout DataPipes adapter allows control over timeouts of all existing EndOnDiskCacheHolder ( | 
And, we will provide more Adapters to cover data-processing options:
- PinMemory: Attach a- DataPipeat the end of the data-processing graph that coverts output data to- torch.Tensorin pinned memory.
- FullSync: Attach a- DataPipeto make sure the data-processing graph synchronized between distributed processes to prevent hanging.
- ShardingPolicy: Modify sharding policy if- sharding_filteris presented in the- DataPipegraph.
- PrefetchPolicy,- InvalidateCache, etc.
If you have feature requests about the Adapters you’d like to be provided, please open a GitHub issue. For specific
needs, DataLoader2 also accepts any custom Adapter as long as it inherits from the Adapter class.