torchx.specs¶
AppDef¶
- class torchx.specs.AppDef(name: str, roles: List[torchx.specs.api.Role] = <factory>, metadata: Dict[str, str] = <factory>)[source]¶
Represents a distributed application made up of multiple
Rolesand metadata. Contains the necessary information for the driver to submit this app to the scheduler.- Parameters
name – Name of application
roles – List of roles
metadata – AppDef specific configuration, in comparison
RunConfigis runtime specific configuration.
- add_metadata(key: str, value: str) → torchx.specs.api.AppDef[source]¶
Adds metadata to the application. .. note:: If the key already exists, this method overwrites the metadata value.
Role¶
- class torchx.specs.Role(name: str, image: str, base_image: Optional[str] = None, entrypoint: str = '<MISSING>', args: List[str] = <factory>, env: Dict[str, str] = <factory>, num_replicas: int = 1, max_retries: int = 0, retry_policy: torchx.specs.api.RetryPolicy = <RetryPolicy.APPLICATION: 'APPLICATION'>, resource: torchx.specs.api.Resource = Resource(cpu=-1, gpu=-1, memMB=-1, capabilities={}), port_map: Dict[str, int] = <factory>)[source]¶
A set of nodes that perform a specific duty within the
AppDef. Examples:Distributed data parallel app - made up of a single role (trainer).
App with parameter server - made up of multiple roles (trainer, ps).
Note
An
imageis a software bundle that is installed on the container scheduled by the scheduler. The container on the scheduler dictates what an image actually is. An image could be as simple as a tar-ball or map to a docker image. The scheduler typically knows how to “pull” the image given an image name (str), which could be a simple name (e.g. docker image) or a url e.g.s3://path/my_image.tar).Note
An optional
base_imagecan be specified if the scheduler supports a concept of base images. For schedulers that run Docker containers the base image is not useful since the application image itself can be built from a base image (using theFROM base/image:latestconstruct in the Dockerfile). However the base image is useful for schedulers that work with simple image artifacts (e.g.*.tar.gz) that do not have a built-in concept of base images. For these schedulers, specifying a base image that includes dependencies while the main image is the actual application code makes it possible to make changes to the application code without incurring the cost of re-building the uber artifact.Usage:
trainer = Role(name="trainer", "pytorch/torch:1") .runs("my_trainer.py", "--arg", "foo", ENV_VAR="FOOBAR") .replicas(4) .require(Resource(cpu=1, gpu=1, memMB=500)) .ports({"tcp_store":8080, "tensorboard": 8081}) # for schedulers that support base_images trainer = Role(name="trainer", image="pytorch/torch:1", base_image="common/ml-tools:latest")...
- Parameters
name – name of the role
image – a software bundle that is installed on a container.
base_image – Optional base image, if schedulers support image overlay
entrypoint – command (within the container) to invoke the role
args – commandline arguments to the entrypoint cmd
env – environment variable mappings
replicas – number of container replicas to run
max_retries – max number of retries before giving up
retry_policy – retry behavior upon replica failures
resource – Resource requirement for the role. The role should be scheduled by the scheduler on
num_replicascontainer, each of them should have at leastresourceguarantees.port_map – Port mapping for the role. The key is the unique identifier of the port e.g. “tensorboard”: 9090
- pre_proc(scheduler: str, dryrun_info: torchx.specs.api.AppDryRunInfo) → torchx.specs.api.AppDryRunInfo[source]¶
Modifies the scheduler request based on the role specific configuration. The method is invoked for each role during scheduler
submit_dryrun. If there are multiple roles, the method is invoked for each role in order that is defined by theAppDef.roleslist.
- class torchx.specs.RetryPolicy(value)[source]¶
Defines the retry policy for the
Rolesin theAppDef. The policy defines the behavior when the role replica encounters a failure:unsuccessful (non zero) exit code
hardware/host crashes
preemption
eviction
Note
Not all retry policies are supported by all schedulers. However all schedulers must support
RetryPolicy.APPLICATION. Please refer to the scheduler’s documentation for more information on the retry policies they support and behavior caveats (if any).- REPLICA: Replaces the replica instance. Surviving replicas are untouched.
Use with
torch_dist_roleto have torch coordinate restarts and membership changes. Otherwise, it is up to the application to deal with failed replica departures and replacement replica admittance.
APPLICATION: Restarts the entire application.
Resource¶
- class torchx.specs.Resource(cpu: int, gpu: int, memMB: int, capabilities: Dict[str, Any] = <factory>)[source]¶
Represents resource requirements for a
Role.- Parameters
cpu – number of cpu cores (note: not hyper threads)
gpu – number of gpus
memMB – MB of ram
capabilities – additional hardware specs (interpreted by scheduler)
- static copy(original: torchx.specs.api.Resource, **capabilities: Any) → torchx.specs.api.Resource[source]¶
Copies a resource and applies new capabilities. If the same capabilities are present in the original resource and as parameter, the one from parameter will be used.
- torchx.specs.get_named_resources(res: str) → torchx.specs.api.Resource[source]¶
Get resource object based on the string definition registered via entrypoints.txt.
Torchx implements
named_resourceregistration mechanism, which consists of the following steps:Create a module and define your resource retrieval function:
# my_module.resources from typing import Dict from torchx.specs import Resource def gpu_x_1() -> Dict[str, Resource]: return Resource(cpu=2, memMB=64 * 1024, gpu = 2)
Register resource retrieval in the entrypoints section:
[torchx.named_resources] gpu_x_1 = my_module.resources:gpu_x_1
The
gpu_x_1can be used as string argument to this function:from torchx.specs import named_resources resource = named_resources["gpu_x_1"]
Macros¶
- class torchx.specs.macros[source]¶
Defines macros that can be used with
Role.entrypointandRole.args. The macros will be substituted at runtime to their actual values.Available macros:
img_root- root directory of the pulled container.imagebase_img_root- root directory of the pulled role.base_image(resolves to “<NONE>” if no base_image set)
app_id- application id as assigned by the schedulerreplica_id- unique id for each instance of a replica of a Role,for instance a role with 3 replicas could have the 0, 1, 2 as replica ids. Note that when the container fails and is replaced, the new container will have the same
replica_idas the one it is replacing. For instance if node 1 failed and was replaced by the scheduler the replacing node will also havereplica_id=1.
Example:
# runs: hello_world.py --app_id ${app_id} trainer = Role(name="trainer").runs("hello_world.py", "--app_id", macros.app_id) app = AppDef("train_app").of(trainer) app_handle = session.run(app, scheduler="local", cfg=RunConfig())
- class Values(img_root: str, app_id: str, replica_id: str, base_img_root: str = '<NONE>')[source]¶
- apply(role: torchx.specs.api.Role) → torchx.specs.api.Role[source]¶
apply applies the values to a copy the specified role and returns it.
Run Configs¶
- class torchx.specs.RunConfig(cfgs: Dict[str, Optional[Union[str, int, float, bool, List[str]]]] = <factory>)[source]¶
Additional run configs for the app. These are typically scheduler runtime configs/arguments that do not bind to
AppDefnor theScheduler. For example a particular cluster (within the scheduler) the application should be submitted to. Since the same app can be launched into multiple types of clusters (dev, prod) the cluster id config does not bind to the app. Neither does this bind to the scheduler since the cluster can be partitioned by size of the instances (S, M, L) or by a preemption setting (e.g. on-demand vs spot).Since
Sessionallows the application to be submitted to multiple schedulers, users who want to submit the same app into multiple schedulers from the same session can union all theRunConfigsinto a single object. The scheduler implementation will selectively read the configs it needs.This class is intended to be trivially serialized and passed around or saved hence only allow primitives as config values. Should the scheduler need more than simple primitives (e.g. list of str) it is up to the scheduler to document a way to encode this value as a str and parse it (e.g. representing list of str as comma delimited str).
Usage:
# write config = RunConfig() config.set("run_as_user", "prod") config.set("priority", 10) # read config.get("run_as_user") # "prod" config.get("priority") # 10 config.get("never_set") # None
- class torchx.specs.runopts[source]¶
Holds the accepted scheduler run configuration keys, default value (if any), and help message string. These options are provided by the
Schedulerand validated inSession.runagainst user providedRunConfig. AllowsNonedefault values. Required opts must NOT have a non-None default.Important
This class has no accessors because it is intended to be constructed and returned by
Scheduler.run_config_optionsand printed out as a “help” tool or as part of an exception msg.Usage:
opts = runopts() opts.add("run_as_user", type_=str, help="user to run the job as") opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True) opts.add("priority", type_=float, default=0.5, help="job priority") opts.add("preemptible", type_=bool, default=False, help="is the job preemptible") # invalid opts.add("illegal", default=10, required=True) opts.add("bad_type", type=str, default=10) opts.check(RunConfig) print(opts)
- add(cfg_key: str, type_: Type[Optional[Union[str, int, float, bool, List[str]]]], help: str, default: Optional[Union[str, int, float, bool, List[str]]] = None, required: bool = False) → None[source]¶
Adds the
configoption with the given help string anddefaultvalue (if any). If thedefaultis not specified then this option is a required option.
- static is_type(obj: Optional[Union[str, int, float, bool, List[str]]], tp: Type[Optional[Union[str, int, float, bool, List[str]]]]) → bool[source]¶
Returns True if
objis type oftp. Similar to isinstance() but supports tp = List[str], thus can be used to validate ConfigValue.
- resolve(config: torchx.specs.api.RunConfig) → torchx.specs.api.RunConfig[source]¶
Checks the given config against this
runoptsand sets default configs if not set.Warning
This method mutates the provided config!
Run Status¶
- class torchx.specs.AppStatus(state: torchx.specs.api.AppState, num_restarts: int = 0, msg: str = '', structured_error_msg: str = '<NONE>', ui_url: Optional[str] = None, roles: List[torchx.specs.api.RoleStatus] = <factory>)[source]¶
The runtime status of the
AppDef. The scheduler can return an arbitrary text message (msg field). If any error occurs, scheduler can populatestructured_error_msgwith json response.replicasrepresent the statuses of the replicas in the job. If the job runs with multiple retries, the parameter will contain the statuses of the most recent retry. Note: if the previous retries failed, but the most recent retry succeeded or in progress,replicaswill not contain occurred errors.
- class torchx.specs.AppState(value)[source]¶
State of the application. An application starts from an initial
UNSUBMITTEDstate and moves throughSUBMITTED,PENDING,RUNNINGstates finally reaching a terminal state:SUCCEEDED,``FAILED``,CANCELLED.If the scheduler supports preemption, the app moves from a
RUNNINGstate toPENDINGupon preemption.If the user stops the application, then the application state moves to
STOPPED, then toCANCELLEDwhen the job is actually cancelled by the scheduler.UNSUBMITTED - app has not been submitted to the scheduler yet
SUBMITTED - app has been successfully submitted to the scheduler
PENDING - app has been submitted to the scheduler pending allocation
RUNNING - app is running
SUCCEEDED - app has successfully completed
FAILED - app has unsuccessfully completed
CANCELLED - app was cancelled before completing
- torchx.specs.ReplicaState¶
alias of
torchx.specs.api.AppState