Quickstart¶
This is a self contained guide on how to write a simple app and start launching distributed jobs on local and remote clusters.
Installation¶
First thing we need to do is to install the TorchX python package which includes the CLI and the library.
# install torchx with all dependencies
$ pip install torchx[dev]
See the README for more information on installation.
[1]:
%%sh
torchx --help
usage: torchx [-h] [--log_level LOG_LEVEL] [--version]
              {describe,log,run,builtins,runopts,status,configure} ...
torchx CLI
optional arguments:
  -h, --help            show this help message and exit
  --log_level LOG_LEVEL
                        Python logging log level
  --version             show program's version number and exit
sub-commands:
  Use the following commands to run operations, e.g.: torchx run ${JOB_NAME}
  {describe,log,run,builtins,runopts,status,configure}
Hello World¶
Lets start off with writing a simple “Hello World” python app. This is just a normal python program and can contain anything you’d like.
Note
This example uses Jupyter Notebook %%writefile to create local files for example purposes. Under normal usage you would have these as standalone files.
[2]:
%%writefile my_app.py
import sys
print(f"Hello, {sys.argv[1]}!")
Overwriting my_app.py
Launching¶
We can execute our app via torchx run. The local_cwd scheduler executes the app relative to the current directory.
For this we’ll use the utils.python component:
[3]:
%%sh
torchx run --scheduler local_cwd utils.python --help
usage: torchx run <run args...> python  [--help] [-m M] [-c C]
                                        [--script SCRIPT] [--image IMAGE]
                                        [--name NAME] [--cpu CPU] [--gpu GPU]
                                        [--memMB MEMMB] [-h H]
                                        [--num_replicas NUM_REPLICAS]
                                        ...
Runs ``python`` with the specified module, command or script on the specified
...
positional arguments:
  args                  arguments passed to the program in sys.argv[1:]
                        (ignored with `--c`) (required)
optional arguments:
  --help                show this help message and exit
  -m M, --m M           run library module as a script (default: None)
  -c C, --c C           program passed as string (may error if scheduler has a
                        length limit on args) (default: None)
  --script SCRIPT       .py script to run (default: None)
  --image IMAGE         image to run on (default:
                        ghcr.io/pytorch/torchx:0.1.2)
  --name NAME           name of the job (default: torchx_utils_python)
  --cpu CPU             number of cpus per replica (default: 1)
  --gpu GPU             number of gpus per replica (default: 0)
  --memMB MEMMB         cpu memory in MB per replica (default: 1024)
  -h H, --h H           a registered named resource (if specified takes
                        precedence over cpu, gpu, memMB) (default: None)
  --num_replicas NUM_REPLICAS
                        number of copies to run (each on its own container)
                        (default: 1)
The component takes in the script name and any extra arguments will be passed to the script itself.
[4]:
%%sh
torchx run --scheduler local_cwd utils.python --script my_app.py "your name"
torchx 2022-03-28 19:32:18 INFO     Log files located in: /tmp/torchx_gui2r2de/torchx/torchx_utils_python-r070dnf6366g0c/python/0
torchx 2022-03-28 19:32:19 INFO     Waiting for the app to finish...
python/0 Hello, your name!
torchx 2022-03-28 19:32:20 INFO     Job finished: SUCCEEDED
local_cwd://torchx/torchx_utils_python-r070dnf6366g0c
We can run the exact same app via the local_docker scheduler. This scheduler will package up the local workspace as a layer on top of the specified image. This provides a very similar environment to the container based remote schedulers.
Note
This requires Docker installed and won’t work in environments such as Google Colab. See the Docker install instructions: https://docs.docker.com/get-docker/
[5]:
%%sh
torchx run --scheduler local_docker utils.python --script my_app.py "your name"
torchx 2022-03-28 19:32:21 INFO     Building workspace: file:///home/tristanr/Developer/torchx-0.1.2/docs/source for role[0]: python, image: ghcr.io/pytorch/torchx:0.1.2
torchx 2022-03-28 19:32:28 INFO     Done building workspace
torchx 2022-03-28 19:32:28 INFO     New image: sha256:ef203e20ad16f5871c1d6d5c713e89f681c0bb2b575e7b2fc2340b70b5560ac1 built from workspace
torchx 2022-03-28 19:32:29 INFO     Waiting for the app to finish...
python/0 Hello, your name!
torchx 2022-03-28 19:32:30 INFO     Job finished: SUCCEEDED
local_docker://torchx/torchx_utils_python-qvt3gk41l1bb0
TorchX defaults to using the ghcr.io/pytorch/torchx Docker container image which contains the PyTorch libraries, TorchX and related dependencies.
Distributed¶
TorchX’s dist.ddp component uses TorchElastic to manage the workers. This means you can launch multi-worker and multi-host jobs out of the box on all of the schedulers we support.
[6]:
%%sh
torchx run --scheduler local_docker dist.ddp --help
usage: torchx run <run args...> ddp  [--help] [--script SCRIPT] [-m M]
                                     [--image IMAGE] [--name NAME] [-h H]
                                     [--cpu CPU] [--gpu GPU] [--memMB MEMMB]
                                     [-j J] [--env ENV]
                                     [--max_retries MAX_RETRIES]
                                     [--rdzv_port RDZV_PORT] [--mounts MOUNTS]
                                     ...
Distributed data parallel style application (one role, multi-replica). ...
positional arguments:
  script_args           arguments to the main module (required)
optional arguments:
  --help                show this help message and exit
  --script SCRIPT       script or binary to run within the image (default:
                        None)
  -m M, --m M           the python module path to run (default: None)
  --image IMAGE         image (e.g. docker) (default:
                        ghcr.io/pytorch/torchx:0.1.2)
  --name NAME           job name override (uses the script name if not
                        specified) (default: None)
  -h H, --h H           a registered named resource (if specified takes
                        precedence over cpu, gpu, memMB) (default: None)
  --cpu CPU             number of cpus per replica (default: 2)
  --gpu GPU             number of gpus per replica (default: 0)
  --memMB MEMMB         cpu memory in MB per replica (default: 1024)
  -j J, --j J           {nnodes}x{nproc_per_node}, for gpu hosts,
                        nproc_per_node must not exceed num gpus (default: 1x2)
  --env ENV             environment varibles to be passed to the run (e.g.
                        ENV1=v1,ENV2=v2,ENV3=v3) (default: None)
  --max_retries MAX_RETRIES
                        the number of scheduler retries allowed (default: 0)
  --rdzv_port RDZV_PORT
                        the port on rank0's host to use for hosting the c10d
                        store used for rendezvous. Only takes effect when
                        running multi-node. When running single node, this
                        parameter is ignored and a random free port is chosen.
                        (default: 29500)
  --mounts MOUNTS       mounts to mount into the worker environment/container
                        (ex.
                        type=<bind/volume>,src=/host,dst=/job[,readonly]). See
                        scheduler documentation for more info. (default: None)
Lets create a slightly more interesting app to leverage the TorchX distributed support.
[7]:
%%writefile dist_app.py
import torch
import torch.distributed as dist
dist.init_process_group(backend="gloo")
print(f"I am worker {dist.get_rank()} of {dist.get_world_size()}!")
a = torch.tensor([dist.get_rank()])
dist.all_reduce(a)
print(f"all_reduce output = {a}")
Writing dist_app.py
Let launch a small job with 2 nodes and 2 worker processes per node:
[8]:
%%sh
torchx run --scheduler local_docker dist.ddp -j 2x2 --script dist_app.py
torchx 2022-03-28 19:32:33 INFO     Building workspace: file:///home/tristanr/Developer/torchx-0.1.2/docs/source for role[0]: dist_app, image: ghcr.io/pytorch/torchx:0.1.2
torchx 2022-03-28 19:32:39 INFO     Done building workspace
torchx 2022-03-28 19:32:39 INFO     New image: sha256:f2382b3ca50e13d9bf20f046305c37e8ea4cc1fda91f741cddc7c8d59f977211 built from workspace
torchx 2022-03-28 19:32:40 INFO     Waiting for the app to finish...
dist_app/0 WARNING:__main__:
dist_app/0 *****************************************
dist_app/0 Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
dist_app/0 *****************************************
dist_app/1 WARNING:__main__:
dist_app/1 *****************************************
dist_app/1 Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
dist_app/1 *****************************************
dist_app/1 [1]:I am worker 3 of 4!
dist_app/1 [1]:all_reduce output = tensor([6])
dist_app/0 [0]:I am worker 0 of 4!
dist_app/0 [0]:all_reduce output = tensor([6])
dist_app/0 [1]:I am worker 1 of 4!
dist_app/1 [0]:I am worker 2 of 4!
dist_app/1 [0]:all_reduce output = tensor([6])
dist_app/0 [1]:all_reduce output = tensor([6])
torchx 2022-03-28 19:32:48 INFO     Job finished: SUCCEEDED
local_docker://torchx/dist_app-sfpfrvnr2fzlkc
Workspaces / Patching¶
For each scheduler there’s a concept of an image. For local_cwd and slurm it uses the current working directory. For container based schedulers such as local_docker, kubernetes and aws_batch it uses a docker container.
To provide the same environment between local and remote jobs, TorchX CLI uses workspaces to automatically patch images for remote jobs on a per scheduler basis.
When you launch a job via torchx run it’ll overlay the current directory on top of the provided image so your code is available in the launched job.
For docker based schedulers you’ll need a local docker daemon to build and push the image to your remote docker repository.
.torchxconfig¶
Arguments to schedulers can be specified either via a command line flag to torchx run -s <scheduler> -c <args> or on a per scheduler basis via a .torchxconfig file.
[9]:
%%writefile .torchxconfig
[kubernetes]
queue=torchx
image_repo=<your docker image repository>
[slurm]
partition=torchx
Writing .torchxconfig
Remote Schedulers¶
TorchX supports a large number of schedulers. Don’t see yours? Request it!
Remote schedulers operate the exact same way the local schedulers do. The same run command for local works out of the box on remote.
$ torchx run --scheduler slurm dist.ddp -j 2x2 --script dist_app.py
$ torchx run --scheduler kubernetes dist.ddp -j 2x2 --script dist_app.py
$ torchx run --scheduler aws_batch dist.ddp -j 2x2 --script dist_app.py
$ torchx run --scheduler ray dist.ddp -j 2x2 --script dist_app.py
Depending on the scheduler there may be a few extra configuration parameters so TorchX knows where to run the job and upload built images. These can either be set via -c or in the .torchxconfig file.
All config options:
[10]:
%%sh
torchx runopts
local_docker:
    usage:
        [copy_env=COPY_ENV]
    optional arguments:
        copy_env=COPY_ENV (typing.List[str], None)
            list of glob patterns of environment variables to copy if not set in AppDef. Ex: FOO_*
local_cwd:
    usage:
        [log_dir=LOG_DIR],[prepend_cwd=PREPEND_CWD],[auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES]
    optional arguments:
        log_dir=LOG_DIR (str, None)
            dir to write stdout/stderr log files of replicas
        prepend_cwd=PREPEND_CWD (bool, False)
            if set, prepends CWD to replica's PATH env var making any binaries in CWD take precedence over those in PATH
        auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES (bool, False)
            sets the `CUDA_AVAILABLE_DEVICES` for roles that request GPU resources. Each role replica will be assigned one GPU. Does nothing if the device count is less than replicas.
slurm:
    usage:
        [partition=PARTITION],[time=TIME],[nomem=NOMEM],[comment=COMMENT],[constraint=CONSTRAINT],[mail-user=MAIL-USER],[mail-type=MAIL-TYPE],[job_dir=JOB_DIR]
    optional arguments:
        partition=PARTITION (str, None)
            The partition to run the job in.
        time=TIME (str, None)
            The maximum time the job is allowed to run for. Formats:             "minutes", "minutes:seconds", "hours:minutes:seconds", "days-hours",             "days-hours:minutes" or "days-hours:minutes:seconds"
        nomem=NOMEM (bool, False)
            disables memory request to workaround https://github.com/aws/aws-parallelcluster/issues/2198
        comment=COMMENT (str, None)
            Comment to set on the slurm job.
        constraint=CONSTRAINT (str, None)
            Constraint to use for the slurm job.
        mail-user=MAIL-USER (str, None)
            User to mail on job end.
        mail-type=MAIL-TYPE (str, None)
            What events to mail users on.
        job_dir=JOB_DIR (str, None)
            The directory to place the job code and outputs. The
            directory must not exist and will be created. To enable log
            iteration, jobs will be tracked in ``.torchxslurmjobdirs``.
kubernetes:
    usage:
        queue=QUEUE,[namespace=NAMESPACE],[image_repo=IMAGE_REPO],[service_account=SERVICE_ACCOUNT]
    required arguments:
        queue=QUEUE (str)
            Volcano queue to schedule job in
    optional arguments:
        namespace=NAMESPACE (str, default)
            Kubernetes namespace to schedule job in
        image_repo=IMAGE_REPO (str, None)
            The image repository to use when pushing patched images, must have push access. Ex: example.com/your/container
        service_account=SERVICE_ACCOUNT (str, None)
            The service account name to set on the pod specs
aws_batch:
    usage:
        queue=QUEUE,[image_repo=IMAGE_REPO]
    required arguments:
        queue=QUEUE (str)
            queue to schedule job in
    optional arguments:
        image_repo=IMAGE_REPO (str, None)
            The image repository to use when pushing patched images, must have push access. Ex: example.com/your/container
ray:
    usage:
        [cluster_config_file=CLUSTER_CONFIG_FILE],[cluster_name=CLUSTER_NAME],[dashboard_address=DASHBOARD_ADDRESS],[working_dir=WORKING_DIR],[requirements=REQUIREMENTS]
    optional arguments:
        cluster_config_file=CLUSTER_CONFIG_FILE (str, None)
            Use CLUSTER_CONFIG_FILE to access or create the Ray cluster.
        cluster_name=CLUSTER_NAME (str, None)
            Override the configured cluster name.
        dashboard_address=DASHBOARD_ADDRESS (str, 127.0.0.1:8265)
            Use ray status to get the dashboard address you will submit jobs against
        working_dir=WORKING_DIR (str, None)
            Copy the the working directory containing the Python scripts to the cluster.
        requirements=REQUIREMENTS (str, None)
            Path to requirements.txt
Custom Images¶
Docker-based Schedulers¶
If you want more than the standard PyTorch libraries you can add custom Dockerfile or build your own docker container and use it as the base image for your TorchX jobs.
[11]:
%%writefile timm_app.py
import timm
print(timm.models.resnet18())
Writing timm_app.py
[12]:
%%writefile Dockerfile.torchx
FROM pytorch/pytorch:1.10.0-cuda11.3-cudnn8-runtime
RUN pip install timm
COPY . .
Writing Dockerfile.torchx
Once we have the Dockerfile created we can launch as normal and TorchX will automatically build the image with the newly provided Dockerfile instead of the default one.
[13]:
%%sh
torchx run --scheduler local_docker utils.python --script timm_app.py "your name"
torchx 2022-03-28 19:32:49 INFO     loaded configs from /home/tristanr/Developer/torchx-0.1.2/docs/source/.torchxconfig
torchx 2022-03-28 19:32:50 INFO     Building workspace: file:///home/tristanr/Developer/torchx-0.1.2/docs/source for role[0]: python, image: ghcr.io/pytorch/torchx:0.1.2
torchx 2022-03-28 19:32:54 INFO     Done building workspace
torchx 2022-03-28 19:32:54 INFO     New image: sha256:89b1295e21924681cafad386b16c06765f961be32d8ce72726283f53e3d981e0 built from workspace
torchx 2022-03-28 19:32:54 INFO     Waiting for the app to finish...
python/0 ResNet(
python/0   (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
python/0   (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0   (act1): ReLU(inplace=True)
python/0   (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
python/0   (layer1): Sequential(
python/0     (0): BasicBlock(
python/0       (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act1): ReLU(inplace=True)
python/0       (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act2): ReLU(inplace=True)
python/0     )
python/0     (1): BasicBlock(
python/0       (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act1): ReLU(inplace=True)
python/0       (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act2): ReLU(inplace=True)
python/0     )
python/0   )
python/0   (layer2): Sequential(
python/0     (0): BasicBlock(
python/0       (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
python/0       (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act1): ReLU(inplace=True)
python/0       (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act2): ReLU(inplace=True)
python/0       (downsample): Sequential(
python/0         (0): Conv2d(64, 128, kernel_size=(1, 1), stride=(2, 2), bias=False)
python/0         (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       )
python/0     )
python/0     (1): BasicBlock(
python/0       (conv1): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act1): ReLU(inplace=True)
python/0       (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act2): ReLU(inplace=True)
python/0     )
python/0   )
python/0   (layer3): Sequential(
python/0     (0): BasicBlock(
python/0       (conv1): Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
python/0       (bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act1): ReLU(inplace=True)
python/0       (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act2): ReLU(inplace=True)
python/0       (downsample): Sequential(
python/0         (0): Conv2d(128, 256, kernel_size=(1, 1), stride=(2, 2), bias=False)
python/0         (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       )
python/0     )
python/0     (1): BasicBlock(
python/0       (conv1): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act1): ReLU(inplace=True)
python/0       (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act2): ReLU(inplace=True)
python/0     )
python/0   )
python/0   (layer4): Sequential(
python/0     (0): BasicBlock(
python/0       (conv1): Conv2d(256, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
python/0       (bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act1): ReLU(inplace=True)
python/0       (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act2): ReLU(inplace=True)
python/0       (downsample): Sequential(
python/0         (0): Conv2d(256, 512, kernel_size=(1, 1), stride=(2, 2), bias=False)
python/0         (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       )
python/0     )
python/0     (1): BasicBlock(
python/0       (conv1): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act1): ReLU(inplace=True)
python/0       (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
python/0       (bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
python/0       (act2): ReLU(inplace=True)
python/0     )
python/0   )
python/0   (global_pool): SelectAdaptivePool2d (pool_type=avg, flatten=Flatten(start_dim=1, end_dim=-1))
python/0   (fc): Linear(in_features=512, out_features=1000, bias=True)
python/0 )
torchx 2022-03-28 19:32:56 INFO     Job finished: SUCCEEDED
local_docker://torchx/torchx_utils_python-lwlbqfj93zz33
Slurm¶
The slurm and local_cwd use the current environment so you can use pip and conda as normal.
Next Steps¶
- Checkout other features of the torchx CLI 
- Take a look at the list of schedulers supported by the runner 
- Browse through the collection of builtin components 
- See which ML pipeline platforms you can run components on 
- See a training app example