Distributed¶
For distributed training, TorchX relies on the scheduler’s gang scheduling
capabilities to schedule n copies of nodes. Once launched, the application
is expected to be written in a way that leverages this topology, for instance,
with PyTorch’s
DDP.
You can express a variety of node topologies with TorchX by specifying multiple
torchx.specs.Role in your component’s AppDef. Each role maps to
a homogeneous group of nodes that performs a “role” (function) in the overall
training. Scheduling-wise, TorchX launches each role as a sub-gang.
A DDP-style training job has a single role: trainers. Whereas a training job that uses parameter servers will have two roles: parameter server, trainer. You can specify different entrypoint (executable), num replicas, resource requirements, and more for each role.
DDP Builtin¶
DDP-style trainers are common and easy to templetize since they are homogeneous
single role AppDefs, so there is a builtin: dist.ddp. Assuming your DDP
training script is called main.py, launch it as:
# locally, 1 node x 4 workers
$ torchx run -s local_cwd dist.ddp -j 1x4 --script main.py
# locally, 2 node x 4 workers (8 total)
$ torchx run -s local_cwd dist.ddp -j 2x4 --script main.py
# remote (optionally pass --rdzv_port to use a different master port than the default 29500)
$ torchx run -s kubernetes -cfg queue=default dist.ddp \
    -j 2x4 \
    --script main.py
# remote -- elastic/autoscaling with 2 minimum and max 5 nodes with 8
# workers each
$ torchx run -s kubernetes dist.ddp -j 2:5x8 --script main.py
Note that the only difference compared to the local launch is the scheduler (-s).
The dist.ddp builtin uses torchelastic (more specifically torch.distributed.run)
under the hood. Read more about torchelastic here.
Components APIs¶
- torchx.components.dist.ddp(*script_args: str, script: Optional[str] = None, m: Optional[str] = None, image: str = 'ghcr.io/pytorch/torchx:0.3.0', name: Optional[str] = None, h: Optional[str] = None, cpu: int = 2, gpu: int = 0, memMB: int = 1024, j: str = '1x2', env: Optional[Dict[str, str]] = None, max_retries: int = 0, rdzv_port: int = 29500, mounts: Optional[List[str]] = None, debug: bool = False) AppDef[source]¶
- Distributed data parallel style application (one role, multi-replica). Uses torch.distributed.run to launch and coordinate PyTorch worker processes. Defaults to using - c10drendezvous backend on rendezvous_endpoint- $rank_0_host:$rdzv_port. Note that- rdzv_portparameter is ignored when running on single node, and instead we use port 0 which instructs torchelastic to chose a free random port on the host.- Note: (cpu, gpu, memMB) parameters are mutually exclusive with h(named resource) where
- htakes precedence if specified for setting resource requirements. See registering named resources.
 - Parameters
- script_args – arguments to the main module 
- script – script or binary to run within the image 
- m – the python module path to run 
- image – image (e.g. docker) 
- name – job name override (uses the script name if not specified) 
- cpu – number of cpus per replica 
- gpu – number of gpus per replica 
- memMB – cpu memory in MB per replica 
- h – a registered named resource (if specified takes precedence over cpu, gpu, memMB) 
- j – [{min_nnodes}:]{nnodes}x{nproc_per_node}, for gpu hosts, nproc_per_node must not exceed num gpus 
- env – environment varibles to be passed to the run (e.g. ENV1=v1,ENV2=v2,ENV3=v3) 
- max_retries – the number of scheduler retries allowed 
- rdzv_port – the port on rank0’s host to use for hosting the c10d store used for rendezvous. Only takes effect when running multi-node. When running single node, this parameter is ignored and a random free port is chosen. 
- mounts – mounts to mount into the worker environment/container (ex. type=<bind/volume>,src=/host,dst=/job[,readonly]). See scheduler documentation for more info. 
- debug – whether to run with preset debug flags enabled 
 
 
- Note: (cpu, gpu, memMB) parameters are mutually exclusive with