分布式通信包 - torch.distributed¶
注意
请参考 PyTorch 分布式概述,简要介绍与分布式训练相关的所有功能。
后端¶
torch.distributed支持三个内置后端,每个后端都有
不同的功能。下表显示了可用的功能
用于 CPU/CUDA 张量。
仅当用于构建 PyTorch 的实现支持 CUDA 时,MPI 才支持 CUDA。
| 后端 | 
 | 
 | 
 | |||
|---|---|---|---|---|---|---|
| 装置 | 中央处理器 | 图形处理器 | 中央处理器 | 图形处理器 | 中央处理器 | 图形处理器 | 
| 发送 | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
| recv | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
| 广播 | ✓ | ✓ | ✓ | ? | ✘ | ✓ | 
| all_reduce | ✓ | ✓ | ✓ | ? | ✘ | ✓ | 
| 减少 | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
| all_gather | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
| 收集 | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
| 散射 | ✓ | ✘ | ✓ | ? | ✘ | ✘ | 
| reduce_scatter | ✘ | ✘ | ✘ | ✘ | ✘ | ✓ | 
| all_to_all | ✘ | ✘ | ✓ | ? | ✘ | ✓ | 
| 障碍 | ✓ | ✘ | ✓ | ? | ✘ | ✓ | 
PyTorch 附带的后端¶
PyTorch 分布式包支持 Linux(稳定)、MacOS(稳定)和 Windows(原型)。 默认情况下,对于 Linux,Gloo 和 NCCL 后端构建并包含在 PyTorch 中 分布式(仅在使用 CUDA 构建时为 NCCL)。MPI 是一个可选的后端,它只能是 如果您从源代码构建 PyTorch,则包括 include。(例如,在具有 MPI 的主机上构建 PyTorch 安装。
注意
从 PyTorch v1.8 开始,Windows 支持所有集体通信后端,但 NCCL、
如果 init_method 的init_process_group()指向它必须遵循的文件
添加到以下架构中:
- 本地文件系统、 - init_method="file:///d:/tmp/some_file"
- 共享文件系统、 - init_method="file://////{machine_name}/{share_folder_name}/some_file"
与 Linux 平台相同,您可以通过设置环境变量来启用 TcpStore, MASTER_ADDR 和 MASTER_PORT。
使用哪个后端?¶
过去,我们经常被问到:“我应该使用哪个后端?
- 经验法则 - 使用 NCCL 后端进行分布式 GPU 训练 
- 使用 Gloo 后端进行分布式 CPU 训练。 
 
- 具有 InfiniBand 互连的 GPU 主机 - 使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的 API API 
 
- 具有以太网互连的 GPU 主机 - 使用 NCCL,因为它目前提供了最好的分布式 GPU 训练性能,尤其是对于多进程单节点或 多节点分布式训练。如果您在使用 NCCL 中,请使用 Gloo 作为回退选项。(请注意,Gloo 目前 运行速度比 GPU 的 NCCL 慢。 
 
- 具有 InfiniBand 互连的 CPU 主机 - 如果您的 InfiniBand 启用了 IP over IB,请使用 Gloo,否则, 请改用 MPI。我们计划添加 InfiniBand 支持 Gloo 在即将发布的版本中。 
 
- 具有以太网互连的 CPU 主机 - 请使用 Gloo,除非您有使用 MPI 的特定原因。 
 
常见环境变量¶
选择要使用的网络接口¶
默认情况下,NCCL 和 Gloo 后端都会尝试找到合适的网络接口来使用。 如果自动检测到的接口不正确,您可以使用以下命令覆盖它 环境变量(适用于相应的后端):
- 例如,NCCL_SOCKET_IFNAME - export NCCL_SOCKET_IFNAME=eth0
- 例如,GLOO_SOCKET_IFNAME - export GLOO_SOCKET_IFNAME=eth0
如果您使用的是 Gloo 后端,则可以通过将
他们用逗号表示,像这样: .
后端将以循环方式跨这些接口分派作。
所有进程都必须在此变量中指定相同数量的接口。export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3
其他 NCCL 环境变量¶
调试 - 如果 NCCL 失败,您可以设置打印显式
warning 消息以及基本的 NCCL 初始化信息。NCCL_DEBUG=INFO
您还可以使用获取有关特定
NCCL 的方面。例如,将打印
集体调用,这在调试挂起时可能会有所帮助,尤其是那些
由 collective type 或 message size 不匹配引起。如果是拓扑
检测失败,如果进一步帮助,设置检查详细的检测结果并另存为参考会很有帮助
需要 NCCL 团队。NCCL_DEBUG_SUBSYSNCCL_DEBUG_SUBSYS=COLLNCCL_DEBUG_SUBSYS=GRAPH
性能调优 - NCCL 根据其拓扑检测执行自动调优,以节省用户的
调整努力。在某些基于套接字的系统上,用户可能仍会尝试调整并增加套接字
网络带宽。这两个环境变量已由 NCCL 预先调整
适用于某些云提供商,例如 AWS 或 GCP。NCCL_SOCKET_NTHREADSNCCL_NSOCKS_PERTHREAD
有关 NCCL 环境变量的完整列表,请参阅 NVIDIA NCCL 的官方文档
基本¶
torch.distributed 包提供 PyTorch 支持和通信原语
用于跨一个或多个上运行的多个计算节点的多进程并行性
机器。类torch.nn.parallel.DistributedDataParallel()在此基础上构建
功能,以提供同步分布式训练作为任何
PyTorch 模型。这与 Multiprocessing 包 torch.multiprocessing 和torch.nn.DataParallel()因为它支持
多台联网机器,并且用户必须显式启动单独的
每个进程的主训练脚本的副本。
在单机同步情况下,torch.distributed 或torch.nn.parallel.DistributedDataParallel()wrapper 可能仍然比其他
数据并行的方法,包括torch.nn.DataParallel():
- 每个进程都维护自己的优化器,并对每个 迭 代。虽然这可能看起来多余,因为已经收集了梯度 一起并平均跨流程,因此对于每个流程都是相同的,这意味着 不需要参数广播步骤,从而减少了在 节点。 
- 每个进程都包含一个独立的 Python 解释器,无需额外的解释器 开销和“GIL 抖动”,这来自驱动多个执行线程、模型 副本或 GPU。这对于 大量使用 Python 运行时,包括具有递归层或许多小 组件。 
初始化¶
需要使用torch.distributed.init_process_group()函数。这会阻塞,直到所有进程都
加入。
- 
torch.distributed.is_available()[来源]¶
- 如果分发的包可用,则返回。否则,不会公开任何其他 API。目前,可在 Linux、MacOS 和 Windows 上使用。设置为在从源构建 PyTorch 时启用它。 目前,默认值适用于 Linux 和 Windows,适用于 MacOS。 - True- torch.distributed- torch.distributed- USE_DISTRIBUTED=1- USE_DISTRIBUTED=1- USE_DISTRIBUTED=0
- 
torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(seconds=1800), world_size=- 1, rank=- 1, store=无, group_name='', pg_options=无)[来源]¶
- 初始化默认的分布式进程组,这也将 初始化分发的包。 - 初始化进程组有两种主要方法:
- 指定 、 和 显式。 - store- rank- world_size
- 指定(一个 URL 字符串),它指示位置/方式 以发现对等节点。(可选)指定 和 , 或者对 URL 中的所有必需参数进行编码并省略它们。 - init_method- rank- world_size
 
 - 如果未指定,则假定为 “env://”。 - init_method- 参数
- backend (str 或 Backend) – 要使用的后端。根据 build-time 配置,有效值包括 、 、 和。此字段应为小写字符串 (例如,),也可以通过 - mpi- gloo- nccl- "gloo"- Backend属性(例如,)。如果使用 每台具有后端的机器上有多个进程,每个进程 必须具有对它使用的每个 GPU 的独占访问权限,就像共享 GPU 一样 进程之间可能会导致死锁。- Backend.GLOO- nccl
- init_method (str, optional) – 指定如何初始化 进程组。如果未指定 or,则默认为 “env://”。 与 互斥。 - init_method- store- store
- world_size (int, optional) – 参与的进程数 工作。如果指定,则为 required。 - store
- rank (int, optional) – 当前进程的 rank (它应该是 介于 0 和 -1 之间的数字)。 如果指定,则为 required。 - world_size- store
- store (Store,可选) – 所有工作人员均可访问的键/值存储,已使用 交换连接/地址信息。 与 互斥。 - init_method
- timeout (timedelta, optional) – 针对 进程组。默认值等于 30 分钟。 这适用于后端。对于 ,这是 仅当环境变量 OR 设置为 1 时适用。设置后,这是 process 将阻塞并等待 collectives 完成 引发异常。当 set 时, 这是 Collective 将中止的持续时间 异步,进程将崩溃。 将向用户提供可以捕获和处理的错误, 但是由于其阻塞性质,它会产生性能开销。上 另一方面,它几乎没有 性能开销,但在出现错误时会导致进程崩溃。这是 done 的,因为 CUDA 执行是异步的,并且不再安全 在异步 NCCL作失败后继续执行用户代码 可能会导致后续 CUDA作在损坏的 数据。只应设置这两个环境变量中的一个。 - gloo- nccl- NCCL_BLOCKING_WAIT- NCCL_ASYNC_ERROR_HANDLING- NCCL_BLOCKING_WAIT- NCCL_ASYNC_ERROR_HANDLING- NCCL_BLOCKING_WAIT- NCCL_ASYNC_ERROR_HANDLING
- group_name (str, optional, deprecated) – 组名称。 
- pg_options (ProcessGroupOptions,可选) – 进程组选项 指定在 特定流程组的构建。截至目前,唯一的 options 是针对后端的,可以指定 NCCL 后端可以在以下情况下获取高优先级 CUDA 流 有计算内核在等待。 - ProcessGroupNCCL.Options- nccl- is_high_priority_stream
 
 - 注意 - 要启用 ,需要从源构建 PyTorch 在支持 MPI 的系统上。 - backend == Backend.MPI
- 
torch.distributed.is_torchelastic_launched()[来源]¶
- 检查此进程是否是使用 (aka torchelastic) 启动的。环境的存在 变量作为代理,判断当前进程是否 与 TorchElastic 一起启动。这是一个合理的代理,因为 map 到的 rendezvous id 始终是一个 非 null 值,指示用于对等发现目的的作业 ID.. - torch.distributed.elastic- TORCHELASTIC_RUN_ID- TORCHELASTIC_RUN_ID
目前支持三种初始化方法:
TCP 初始化¶
使用 TCP 进行初始化有两种方法,都需要网络地址
可从所有进程访问,并且所需的 .第一种方法
需要指定属于 Rank 0 进程的地址。这
初始化方法要求所有进程都手动指定秩。world_size
请注意,最新分布式不再支持多播地址
包。 也被弃用。group_name
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
                        rank=args.rank, world_size=4)
环境变量初始化¶
此方法将从环境变量中读取配置,从而允许 一个用于完全自定义获取信息的方式。要设置的变量 是:
- MASTER_PORT-必填;必须是 rank 为 0 的计算机上的 free port
- MASTER_ADDR- 必需(等级 0 除外);Rank 0 节点的地址
- WORLD_SIZE-必填;可以在此处设置,也可以在调用 init 函数中设置
- RANK-必填;可以在此处设置,也可以在调用 init 函数中设置
等级为 0 的计算机将用于设置所有连接。
这是默认方法,这意味着不必指定(或
可以是 )。init_methodenv://
初始化后¶
一次torch.distributed.init_process_group()运行,可以使用以下函数。自
检查进程组是否已初始化使用torch.distributed.is_initialized().
- 
class (name)[来源]torch.distributed.Backend¶
- 可用后端的类似枚举的类:GLOO、NCCL、MPI 和其他已注册的 backends 的 - 此类的值为小写字符串,例如 .他们可以 作为属性访问,例如 . - "gloo"- Backend.NCCL- 这个类可以直接调用来解析字符串,例如,会检查是否有效,并且 如果是这样,则返回解析后的小写字符串。它还接受大写字符串, 例如,返回 . - Backend(backend_str)- backend_str- Backend("GLOO")- "gloo"- 注意 - 该条目存在,但仅用作 某些字段的初始值。用户不应直接使用它 也不假设它的存在。 - Backend.UNDEFINED
- 
torch.distributed.get_backend(group=None)[来源]¶
- 返回给定进程组的后端。 - 参数
- group (ProcessGroup,可选) – 要处理的流程组。这 default 是 General Main Process 组。如果另一个特定组 时,调用进程必须是 的一部分。 - group
- 返回
- 给定进程组的后端,以小写字符串表示。 
 
分布式 Key-Value Store¶
分布式包自带分布式键值存储,可以是
用于在组中的进程之间共享信息,以及
在 中初始化分发的包torch.distributed.init_process_group()(通过显式创建 store
作为指定 .)有 3 种选择
键值存储:init_methodTCPStore,FileStore和HashStore.
- 
类 torch.distributed.TCPStore¶
- 基于 TCP 的分布式键值存储实现。服务器存储包含 数据,而客户端存储可以通过 TCP 连接到服务器存储,并且 执行插入键值等作 pair,检索键值对等。那里 应始终是一个服务器存储初始化,因为客户端存储将等待 用于建立连接的服务器。 - set()- get()- 参数
- host_name (str) – 服务器存储应在其上运行的主机名或 IP 地址。 
- port (int) – 服务器存储应侦听传入请求的端口。 
- world_size (int, optional) – 存储用户总数 (客户端数 + 服务器 1)。默认值为 None (None 表示非固定数量的商店用户)。 
- is_master (bool, optional) – 初始化服务器存储时为 True,客户端存储为 False。默认值为 False。 
- timeout (timedelta,可选) – 存储区在初始化期间以及 和 等方法使用的超时。默认值为 timedelta(seconds=300) - get()- wait()
- wait_for_worker (bool, optional) – 是否等待所有工作程序与服务器存储连接。仅当 world_size 为 固定 值时,这才适用。默认值为 True。 
 
 - 例::
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Run on process 1 (server) >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # Run on process 2 (client) >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # Use any of the store methods from either the client or server after initialization >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key") 
 
- 
类 torch.distributed.HashStore¶
- 基于底层 hashmap 的线程安全存储实现。此 store 可以使用 在同一进程中(例如,由其他线程),但不能跨进程使用。 - 例::
- >>> import torch.distributed as dist >>> store = dist.HashStore() >>> # store can be used from other threads >>> # Use any of the store methods after initialization >>> store.set("first_key", "first_value") 
 
- 
类 torch.distributed.FileStore¶
- 一种 store 实现,它使用文件来存储底层键值对。 - 例::
- >>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # Use any of the store methods from either the client or server after initialization >>> store1.set("first_key", "first_value") >>> store2.get("first_key") 
 
- 
类 torch.distributed.PrefixStore¶
- 3 个键值存储中任意一个的包装器 ( - TCPStore,- FileStore和- HashStore) 这会为插入到存储中的每个键添加一个前缀。- 参数
- prefix (str) – 在插入到存储区之前添加到每个键前面的前缀字符串。 
- store (torch.distributed.store) – 构成底层键值存储的 store 对象。 
 
 
- 
torch.distributed.Store.set(自我:torch._C._distributed_c10d。存储、arg0:str、arg1:str) → 无¶
- 根据提供的 和 将键值对插入到存储中。如果 store 中已经存在,它将覆盖旧的 值替换为新提供的 . - key- value- key- value- 例::
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key") 
 
- 
torch.distributed.Store.get(自我:torch._C._distributed_c10d。存储,arg0: str) → 字节¶
- 检索与存储中给定的值关联的值。如果不是 存在于 store 中,该函数将等待 ,这是定义的 初始化 store 时,在引发异常之前。 - key- key- timeout- 参数
- key (str) – 该函数将返回与此键关联的值。 
- 返回
- 与 if 关联的值在存储中。 - key- key
 - 例::
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key") 
 
- 
torch.distributed.Store.add(自我:torch._C._distributed_c10d。存储, arg0: str, arg1: int) → int¶
- 对给定的 add 的第一次调用会创建一个关联的计数器 with 在 store 中,初始化为 .后续调用以添加 以相同的增量将计数器替换为指定的 . 使用已 在 store 中设置 will result 在异常中。 - key- key- amount- key- amount- add()- set()- 例::
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # Should return 7 >>> store.get("first_key") 
 
- 
torch.distributed.Store.compare_set(自我:torch._C._distributed_c10d。存储、arg0:str、arg1:str、arg2:str) → 字节¶
- 根据提供的 和 在 和 插入之前执行比较。 仅当 store 中已存在 或 为空字符串时,才会设置。 - key- expected_value- desired_value- desired_value- expected_value- key- expected_value- 参数
 - 例::
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # Should return "second_value" >>> store.get("key") 
 
- 
torch.distributed.Store.wait(*args, **kwargs)¶
- overloaded 函数。 - wait(self: torch._C._distributed_c10d.存储,arg0: List[str]) -> 无 
 - 等待将每个密钥添加到存储中。如果不是所有键都是 set before the (set during store initialization) 之前设置),则会引发异常。 - keys- timeout- wait- 参数
- keys (list) – 在存储中设置它们之前要等待的键列表。 
 - 例::
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 30 seconds >>> store.wait(["bad_key"]) 
 - wait(self: torch._C._distributed_c10d.存储, arg0: List[str], arg1: datetime.timedelta) -> 无 
 - 等待将每个 key in 添加到存储中,并引发异常 如果提供的 . - keys- timeout- 参数
- keys (list) – 在存储中设置它们之前要等待的键列表。 
- timeout (timedelta) – 在引发异常之前等待添加键的时间。 
 
 - 例::
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"], timedelta(seconds=10)) 
 
- 
torch.distributed.Store.num_keys(自我:torch._C._distributed_c10d。存储) → int¶
- 返回 store 中设置的键数。请注意,此数字通常会 比 和 添加的键数大 1,因为一个键用于协调所有 使用 store 的工作人员。 - set()- add()- 警告 - 当与 - TCPStore返回写入基础文件的键数。如果 store 被销毁,并且使用相同的文件创建了另一个 store,则将保留原始键。- num_keys- 返回
- 存储中存在的键数。 
 - 例::
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # This should return 2 >>> store.num_keys() 
 
- 
torch.distributed.Store.delete_key(自我:torch._C._distributed_c10d。存储,arg0: str) → bool¶
- 从存储中删除 关联的键值对。如果成功删除了键,则返回 true,如果未成功删除,则返回 false。 - key- 参数
- key (str) – 要从存储中删除的密钥 
- 返回
- 如果已删除,则为 True,否则为 False。 - key
 - 例::
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, HashStore can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # This should return true >>> store.delete_key("first_key") >>> # This should return false >>> store.delete_key("bad_key") 
 
- 
torch.distributed.Store.set_timeout(自我:torch._C._distributed_c10d。存储,arg0: datetime.timedelta) → None¶
- 设置 store 的默认超时。此超时在初始化期间以及 in 和 中使用。 - wait()- get()- 参数
- timeout (timedelta) – 要在 store 中设置的超时。 
 - 例::
- >>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"]) 
 
组¶
默认情况下,集合对 default 组(也称为 world)进行作,并且
要求所有进程都进入分布式函数调用。但是,某些工作负载可能会受益
来自更细粒度的通信。这就是分布式组的用武之地
进入游戏。new_group()函数可以是
用于创建新组,其中包含所有进程的任意子集。它返回
一个不透明的组句柄,可以作为参数提供给所有集合
(集合是分布式函数,用于在某些众所周知的编程模式中交换信息)。group
- 
torch.distributed.new_group(ranks=None, timeout=datetime.timedelta(seconds=1800),backend=None, pg_options=None)[来源]¶
- 创建新的分布式组。 - 此功能要求主组中的所有进程(即所有 作为分布式作业一部分的进程)输入此功能,甚至 如果他们不打算成为该组的成员。此外,组 应在所有进程中以相同的顺序创建。 - 警告 - 在后端同时使用多个进程组 不安全,用户应在 他们的应用程序来确保一次只使用一个进程组。 这意味着来自一个流程组的集合应该已经完成 执行(不仅仅是排队,因为 CUDA 执行是 async) 的 Collectives 之前。 有关更多详细信息,请参阅同时使用多个 NCCL 通讯器。 - NCCL- 参数
- ranks (list[int]) – 组成员的排名列表。如果 ,将为 设置为 All ranks。默认值为 。 - None- None
- timeout (timedelta, optional) – 针对 进程组。默认值等于 30 分钟。 这适用于后端。对于 ,这是 仅当环境变量 OR 设置为 1 时适用。设置后,这是 process 将阻塞并等待 collectives 完成 引发异常。当 set 时, 这是 Collective 将中止的持续时间 异步,进程将崩溃。 将向用户提供可以捕获和处理的错误, 但是由于其阻塞性质,它会产生性能开销。上 另一方面,它几乎没有 性能开销,但在出现错误时会导致进程崩溃。这是 done 的,因为 CUDA 执行是异步的,并且不再安全 在异步 NCCL作失败后继续执行用户代码 可能会导致后续 CUDA作在损坏的 数据。只应设置这两个环境变量中的一个。 - gloo- nccl- NCCL_BLOCKING_WAIT- NCCL_ASYNC_ERROR_HANDLING- NCCL_BLOCKING_WAIT- NCCL_ASYNC_ERROR_HANDLING- NCCL_BLOCKING_WAIT- NCCL_ASYNC_ERROR_HANDLING
- backend (str 或 Backend,可选) – 要使用的后端。根据 构建时配置,有效值为 和 。 默认情况下,使用与全局组相同的后端。此字段 应该以小写字符串的形式给出(例如 ),它可以 也可通过以下方式访问 - gloo- nccl- "gloo"- Backend属性(例如,)。如果传入,则后端 将使用与默认进程组对应的进程组。默认值为 。- Backend.GLOO- None- None
- pg_options (ProcessGroupOptions,可选) – 进程组选项 指定在 特定流程组的构建。即,对于后端,可以指定 进程组可以选择高优先级的 CUDA 流。 - nccl- is_high_priority_stream
 
- 返回
- 可提供给集体调用的分布式组的句柄。 
 
点对点通信¶
isend()和irecv()使用时返回分布式请求对象。通常,此对象的类型未指定
因为它们永远不应该手动创建,但保证它们支持两种方法:
- is_completed()- 如果作已完成,则返回 True
- wait()- 将阻止进程,直到作完成。 保证在返回后返回 True。- is_completed()
同步和异步集合作¶
每个集合运算函数都支持以下两种运算,
根据传递到 Collective 的标志的设置:async_op
Synchronous operation - 默认模式,设置为 时。
当函数返回时,保证
执行集体作。对于 CUDA作,不能保证
CUDA作已完成,因为 CUDA作是异步的。对于 CPU 集合体,任何
利用集体调用输出的进一步函数调用将按预期运行。对于 CUDA 集合,
利用同一 CUDA 流上的输出的函数调用将按预期运行。用户必须注意
在不同流下运行场景下的同步。有关 CUDA 语义的详细信息,例如 stream
同步,请参阅 CUDA 语义。
请参阅以下脚本,查看 CPU 和 CUDA作的这些语义差异的示例。async_opFalse
异步作 - 当设置为 True 时。集合作函数
返回 Distributed Request 对象。通常,您不需要手动创建它,并且它
保证支持两种方法:async_op
- is_completed()- 对于 CPU 集合,则返回已完成。对于 CUDA作, 如果作已成功排入 CUDA 流,并且输出可以在 default 流,无需进一步同步。- True- True
- wait()- 对于 CPU 集合,将阻止进程,直到作完成。在这种情况下 的 CUDA 集合中,将阻塞,直到作成功排入 CUDA 流中,并且 output 可以在 default 流上使用,而无需进一步同步。
- get_future()- 返回 object。支持 NCCL,也支持 GLOO 上的大多数作 和 MPI,但点对点作除外。 注意:随着我们继续采用 Futures 和合并 API,调用可能会变得多余。- torch._C.Future- get_future()
例
以下代码可以作为使用分布式集合时 CUDA作语义的参考。 它显示了在不同的 CUDA 流上使用集体输出时需要同步的明确需求:
# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
    s.wait_stream(torch.cuda.default_stream())
    output.add_(100)
if rank == 0:
    # if the explicit call to wait_stream was omitted, the output below will be
    # non-deterministically 1 or 101, depending on whether the allreduce overwrote
    # the value after the add completed.
    print(output)
集合函数¶
- 
torch.distributed.broadcast(张量、src、group=None、async_op=False)[来源]¶
- 将 Tensor 广播到整个 Group。 - tensor在所有进程中必须具有相同数量的元素 参与集体。
- 
torch.distributed.broadcast_object_list(object_list, src=0, group=None, device=None)[来源]¶
- 将可腌制对象广播到整个组中。类似 自 - object_list- broadcast(),但 Python 对象可以传入。 请注意,中的所有对象都必须是可 picklable 的,才能 播出。- object_list- 参数
- object_list (List[Any]) – 要广播的输入对象列表。 每个对象都必须是可腌制的。只有等级上的对象才会 广播,但每个排名必须提供大小相等的列表。 - src
- src (int) – 要从中广播的源排名。 - object_list
- group —(ProcessGroup,可选):要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。 - None
- device ( , 可选 ) – 如果不是 None,则对象为 serialized 并转换为 Tensors,这些张量被移动到 Before broadcasting 中。默认值为 。 - torch.device- device- None
 
- 返回
- None.如果 rank 是组的一部分,则将包含 从 rank 广播对象。- object_list- src
 - 注意 - 对于基于 NCCL 的处理组,内部张量表示 的对象必须在通信之前移动到 GPU 设备 地方。在这种情况下,使用的设备由 用户提供,用户有责任 确保此设置,以便每个排名都有一个单独的 GPU(通过 )。 - torch.cuda.current_device()- torch.cuda.set_device()- 注意 - 请注意,此 API 与 - all_gather()collect,因为它不提供句柄,因此 将是一个阻塞调用。- async_op- 警告 - broadcast_object_list()uses 模块,它 已知是不安全的。可以构造恶意的 pickle data 将在 unpickling 期间执行任意代码。只调用这个 函数。- pickle- 例::
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> objects = [None, None, None] >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] 
 
- 
torch.distributed.all_reduce(张量,op=<ReduceOp.SUM:0>,group=None,async_op=False)[来源]¶
- 减少所有机器上的张量数据,使所有机器都得到 最终结果。 - After the call 在所有进程中都将按位相同。 - tensor- 支持复杂张量。 - 参数
- 返回
- 异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组 
 - 例子 - >>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6]) # Rank 0 tensor([4, 6]) # Rank 1 - >>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j]) # Rank 0 tensor([4.+4.j, 6.+6.j]) # Rank 1 
- 
torch.distributed.reduce(张量,dst,op=<ReduceOp.SUM:0>,group=None,async_op=False)[来源]¶
- 减少所有计算机的张量数据。 - 只有具有 rank 的进程才会收到最终结果。 - dst
- 
torch.distributed.all_gather(tensor_list,张量,组=无,async_op=False)[来源]¶
- 从列表中的整个组中收集张量。 - 支持复杂张量。 - 参数
- 返回
- 异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组 
 - 例子 - >>> # All tensors below are of torch.int64 dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [torch.zeros(2, dtype=torch.int64) for _ in range(2)] >>> tensor_list [tensor([0, 0]), tensor([0, 0])] # Rank 0 and 1 >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank >>> tensor tensor([1, 2]) # Rank 0 tensor([3, 4]) # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2]), tensor([3, 4])] # Rank 0 [tensor([1, 2]), tensor([3, 4])] # Rank 1 - >>> # All tensors below are of torch.cfloat dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [torch.zeros(2, dtype=torch.cfloat) for _ in range(2)] >>> tensor_list [tensor([0.+0.j, 0.+0.j]), tensor([0.+0.j, 0.+0.j])] # Rank 0 and 1 >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j]) # Rank 0 tensor([3.+3.j, 4.+4.j]) # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 0 [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 1 
- 
torch.distributed.all_gather_object(object_list, obj, group=None)[来源]¶
- 将整个组中的可腌制对象收集到一个列表中。似 - all_gather(),但 Python 对象可以传入。请注意,对象 必须可腌制才能被收集。- 参数
- object_list (list[Any]) – 输出列表。它的大小应该正确地调整为 size 的组,并将包含输出。 
- object (Any) – 要从当前进程广播的可选取 Python 对象。 
- group (ProcessGroup,可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。 - None
 
- 返回
- 没有。如果调用秩是此组的一部分,则 collective 将填充到 input 中。如果 调用 rank 不属于组,传入的 未修改。 - object_list- object_list
 - 注意 - 请注意,此 API 与 - all_gather()collect,因为它不提供句柄,因此 将是一个阻塞调用。- async_op- 注意 - 对于基于 NCCL 的处理组,内部张量表示 的对象必须在通信之前移动到 GPU 设备 地方。在这种情况下,使用的设备由 用户提供,用户有责任 确保此设置,以便每个排名都有一个单独的 GPU(通过 )。 - torch.cuda.current_device()- torch.cuda.set_device()- 警告 - all_gather_object()uses 模块,即 已知不安全。可以构造恶意的 pickle 数据 它将在 unpickling 期间执行任意代码。只调用这个 函数。- pickle- 例::
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) >>> output ['foo', 12, {1: 2}] 
 
- 
torch.distributed.gather_object(obj, object_gather_list=无, dst=0, group=无)[来源]¶
- 在单个进程中从整个组中收集可腌制对象。 似 - gather(),但 Python 对象可以传入。请注意, object 必须是可腌制的才能被收集。- 参数
- 返回
- 没有。在排名上,将包含 output 的 Collective 的 output 的 Portfolio - dst- object_gather_list
 - 注意 - 请注意,此 API 与 gather collective 略有不同 因为它不提供async_op句柄,因此会阻塞 叫。 - 注意 - 对于基于 NCCL 的处理组,内部张量表示 的对象必须在通信之前移动到 GPU 设备 地方。在这种情况下,使用的设备由 用户提供,用户有责任 确保此设置,以便每个排名都有一个单独的 GPU(通过 )。 - torch.cuda.current_device()- torch.cuda.set_device()- 警告 - gather_object()uses 模块,即 已知不安全。可以构造恶意的 pickle 数据 它将在 unpickling 期间执行任意代码。只调用这个 函数。- pickle- 例::
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.gather_object( gather_objects[dist.get_rank()], output if dist.get_rank() == 0 else None, dst=0 ) >>> # On rank 0 >>> output ['foo', 12, {1: 2}] 
 
- 
torch.distributed.scatter(张量,scatter_list=无,src=0,组=无,async_op=False)[来源]¶
- 将张量列表分散到组中的所有进程。 - 每个进程将只接收一个张量并将其数据存储在参数中。 - tensor- 支持复杂张量。 
- 
torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list, src=0, group=None)[来源]¶
- 将可腌制对象分散到整体中 群。似 - scatter_object_input_list- scatter(),但 Python 对象可以传入。上 每个 rank,scattered 对象将存储为 的第一个元素。请注意,中的所有对象都必须是可腌制的,以便进行分散。- scatter_object_output_list- scatter_object_input_list- 参数
- scatter_object_output_list (List[Any]) – 其第一个 元素将存储分散到此 rank 的对象。 
- scatter_object_input_list (List[Any]) – 要散布的输入对象列表。 每个对象都必须是可腌制的。只有等级上的对象才会 是 scattered,并且参数可以是非 src 秩。 - src- None
- src (int) – 要从中分散的源排名。 - scatter_object_input_list
- group —(ProcessGroup,可选):要处理的流程组。如果为 None,则 将使用默认进程组。默认值为 。 - None
 
- 返回
- None.如果 rank 是组的一部分,则将其第一个元素设置为此 rank 的分散对象。- scatter_object_output_list
 - 注意 - 请注意,此 API 与 Scatter Collective 略有不同 因为它不提供句柄,因此将是一个 blocking 调用。 - async_op- 注意 - 请注意,此 API 不支持 NCCL 后端,因为 ProcessGroupNCCL 不支持基于张量的分散集合体。 - 警告 - scatter_object_list()uses 模块,它 已知是不安全的。可以构造恶意的 pickle data 将在 unpickling 期间执行任意代码。只调用这个 函数。- pickle- 例::
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> # Can be any list on non-src ranks, elements are not used. >>> objects = [None, None, None] >>> output_list = [None] >>> dist.scatter_object_list(output_list, objects, src=0) >>> # Rank i gets objects[i]. For example, on rank 2: >>> output_list [{1: 2}] 
 
- 
torch.distributed.reduce_scatter(output, input_list, op=<ReduceOp.SUM: 0>, group=None, async_op=False)[来源]¶
- Reduce,然后将张量列表分散到组中的所有进程。 
- 
torch.distributed.all_to_all(output_tensor_list、input_tensor_list、组=无、async_op=False)[来源]¶
- 每个进程将输入张量列表分散到一个组中的所有进程,并且 返回 output list 中收集的 Tensors 列表。 - 支持复杂张量。 - 参数
- 返回
- 异步工作句柄(如果 async_op 设置为 True)。 如果不是 async_op 或不是组的一部分,则无。 
 - 警告 - all_to_all 是实验性的,可能会发生变化。 - 例子 - >>> input = torch.arange(4) + rank * 4 >>> input = list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3 - >>> # Essentially, it is similar to following operation: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i) - >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3 - >>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3 
- 
torch.distributed.barrier(group=None, async_op=False, device_ids=None)[来源]¶
- 同步所有进程。 - 这个 collective 会阻塞进程,直到整个 group 进入这个函数, 如果 async_op 为 False,或者如果在 wait() 上调用异步工作句柄。 
- 
torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[来源]¶
- 同步所有进程,类似于 ,但采用 可配置的超时,并且能够报告未通过此 barrier 的 barrier 中。具体来说,对于非零等级,将阻止 直到从排名 0 处理 send/recv。等级 0 将阻止,直到所有发送 处理来自其他排名的 /recv,并将报告排名失败 未能及时做出回应。请注意,如果一个等级未达到 monitored_barrier(例如,由于挂起),所有其他等级都将失败 在 monitored_barrier。 - torch.distributed.barrier- 此 collective 将阻止组中的所有进程/排名,直到 整个 group 成功退出函数,使其可用于调试 和同步。但是,它可能会对性能产生影响,并且只应 用于调试或需要完全同步点的方案 在主机端。出于调试目的,可以插入此屏障 在应用程序的集体调用之前,检查是否有任何秩为 desynchronized。 - 注意 - 请注意,此 collective 仅支持 GLOO 后端。 - 参数
- group (ProcessGroup,可选) – 要处理的流程组。如果 ,将使用默认进程组。 - None
- timeout (datetime.timedelta,可选) – monitored_barrier的超时。 如果 ,将使用默认的进程组超时。 - None
- wait_all_ranks (bool, optional) – 是收集所有失败的排名还是 不。默认情况下,这是 and on rank 0 将抛出它遇到的第一个失败的等级以失败 快。通过设置 will 收集所有失败的排名并引发包含信息的错误 关于所有失败的等级。 - False- monitored_barrier- wait_all_ranks=True- monitored_barrier
 
- 返回
- None.
 - 例::
- >>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() != 1: >>> dist.monitored_barrier() # Raises exception indicating that >>> # rank 1 did not call into monitored_barrier. >>> # Example with wait_all_ranks=True >>> if dist.get_rank() == 0: >>> dist.monitored_barrier(wait_all_ranks=True) # Raises exception >>> # indicating that ranks 1, 2, ... world_size - 1 did not call into >>> # monitored_barrier. 
 
- 
类 torch.distributed.ReduceOp¶
- 可用归约运算的类似枚举的类:、、、、 和 。 - SUM- AVG- PRODUCT- MIN- MAX- BAND- BOR- BXOR- BAND、 和 减少 在以下情况下不可用 使用后端。- BOR- BXOR- NCCL- AVG将值除以世界大小,然后在各个等级之间求和。 仅适用于后端, 并且仅适用于 NCCL 版本 2.10 或更高版本。- AVG- NCCL- 此外,复杂张量不支持 和 。 - MAX- MIN- PRODUCT- 此类的值可以作为属性访问,例如 . 它们用于指定归约集合的策略,例如, - ReduceOp.SUM- reduce(),- all_reduce_multigpu()等。- 成员: - 和 - 平均 (AVG) - 产品 - 最小 - 麦克斯 - 乐队 - 博尔 - BXOR 
分析 Collective 通信¶
请注意,您可以使用 (推荐,仅在 1.8.1 之后可用) 或分析此处提到的集体通信和点对点通信 API。支持所有开箱即用的后端 (、 、 ),并且集体通信使用将在分析输出/跟踪中按预期呈现。分析代码与任何常规 torch作符相同:torch.profilertorch.autograd.profilerglooncclmpi
import torch
import torch.distributed as dist
with torch.profiler():
    tensor = torch.randn(20, 10)
    dist.all_reduce(tensor)
请参阅 Profiler 文档,以全面了解 Profiler 功能。
多 GPU 集合函数¶
如果每个节点上有多个 GPU,则在使用 NCCL 和 Gloo 后端时,broadcast_multigpu()
all_reduce_multigpu()
reduce_multigpu()
all_gather_multigpu()和reduce_scatter_multigpu()支持 Distributed Collective
作。这些功能可能会
提升整体分布式训练性能,方便使用
传递张量列表。传递的 Tensor 列表中的每个 Tensor 都需要
位于调用该函数的主机的单独 GPU 设备上。注意
张量列表的长度在所有
分布式进程。另请注意,目前的多 GPU 集体
函数仅支持 NCCL 后端。
例如,如果我们用于分布式训练的系统有 2 个节点,则每个节点 其中有 8 个 GPU。在 16 个 GPU 中的每一个 GPU 上,我们都有一个 喜欢 all-reduce。以下代码可以作为参考:
在节点 0 上运行的代码
import torch
import torch.distributed as dist
dist.init_process_group(backend="nccl",
                        init_method="file:///distributed_test",
                        world_size=2,
                        rank=0)
tensor_list = []
for dev_idx in range(torch.cuda.device_count()):
    tensor_list.append(torch.FloatTensor([1]).cuda(dev_idx))
dist.all_reduce_multigpu(tensor_list)
在节点 1 上运行的代码
import torch
import torch.distributed as dist
dist.init_process_group(backend="nccl",
                        init_method="file:///distributed_test",
                        world_size=2,
                        rank=1)
tensor_list = []
for dev_idx in range(torch.cuda.device_count()):
    tensor_list.append(torch.FloatTensor([1]).cuda(dev_idx))
dist.all_reduce_multigpu(tensor_list)
调用后,两个节点上的所有 16 个张量都将具有 all-reduced 值 共 16 页
- 
torch.distributed.broadcast_multigpu(tensor_list, src, group=None, async_op=False, src_tensor=0)[来源]¶
- 将 Tensor 广播到具有多个 GPU Tensor 的整个组 每个节点。 - tensor在所有 GPU 中必须具有相同数量的元素 所有参与集体的进程。列表中的每个张量都必须 位于不同的 GPU 上- 目前仅支持 nccl 和 gloo 后端 张量只能是 GPU 张量 - 参数
- tensor_list (List[Tensor]) – 参与 collective 的 Tensor 操作。如果 是排名,则 () 的指定元素将为 广播到 src 进程中的所有其他张量(在不同 GPU 上) 以及其他非 src 进程中的所有张量。 您还需要确保这是相同的 对于所有调用此函数的分布式进程。 - src- src_tensor- tensor_list- tensor_list[src_tensor]- tensor_list- len(tensor_list)
- src (int) – 源排名。 
- group (ProcessGroup,可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。 
- async_op (bool, optional) – 此运算是否应为异步运算 
- src_tensor (int, optional) – 源张量排名 - tensor_list
 
- 返回
- 异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组 
 
- 
torch.distributed.all_reduce_multigpu(tensor_list,op=<ReduceOp.SUM:0>,group=None,async_op=False)[来源]¶
- 减少所有机器上的张量数据,使所有机器都得到 最终结果。此函数减少了每个节点上的张量数量, 而每个张量驻留在不同的 GPU 上。 因此,tensor 列表中的 input tensor 需要是 GPU Tensor。 此外,张量列表中的每个张量都需要驻留在不同的 GPU 上。 - 调用后,all in 将是按位的 所有过程都相同。 - tensor- tensor_list- 支持复杂张量。 - 目前仅支持 nccl 和 gloo 后端 张量只能是 GPU 张量 - 参数
- 返回
- 异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组 
 
- 
torch.distributed.reduce_multigpu(tensor_list、dst、op=<ReduceOp.SUM:0>、group=None、async_op=False、dst_tensor=0)[来源]¶
- 减少所有计算机上多个 GPU 上的张量数据。每个 Tensor in 应驻留在单独的 GPU 上 - tensor_list- 只有具有 rank 的进程的 GPU 才会收到最终结果。 - tensor_list[dst_tensor]- dst- 目前仅支持 nccl 后端 张量只能是 GPU 张量 - 参数
- tensor_list (List[Tensor]) – 输入和输出 GPU 张量 集体。该函数就地运行。 您还需要确保 相同 所有调用此函数的分布式进程。 - len(tensor_list)
- dst (int) – 目标排名 
- op (可选) – 枚举中的值之一。指定用于元素级缩减的作。 - torch.distributed.ReduceOp
- group (ProcessGroup,可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。 
- async_op (bool, optional) – 此运算是否应为异步运算 
- dst_tensor (int, optional) – 目标张量排名 - tensor_list
 
- 返回
- 异步工作句柄(如果 async_op 设置为 True)。 None,否则 
 
- 
torch.distributed.all_gather_multigpu(output_tensor_lists、input_tensor_list、组=无、async_op=False)[来源]¶
- 从列表中的整个组中收集张量。 中的每个张量都应该驻留在单独的 GPU 上 - tensor_list- 目前仅支持 nccl 后端 张量只能是 GPU 张量 - 支持复杂张量。 - 参数
- output_tensor_lists (List[List[Tensor]]) – - 输出列表。它应该 在每个 GPU 上包含大小正确的张量以用于输出 集体的 包含 all_gather 的结果。 - output_tensor_lists[i]- input_tensor_list[i]- 请注意,每个元素的大小为 ,因为函数 all 收集组中每个 GPU 的结果。解释 注意 的每个元素 ,秩 k 的元素将出现在 - output_tensor_lists- world_size * len(input_tensor_list)- output_tensor_lists[i]- input_tensor_list[j]- output_tensor_lists[i][k * world_size + j]- 另请注意,以及每个 element in (每个元素都是一个列表, 因此 ) 需要相同 对于所有调用此函数的分布式进程。 - len(output_tensor_lists)- output_tensor_lists- len(output_tensor_lists[i])
- input_tensor_list (List[Tensor]) – 张量列表(在不同 GPU 上) 从当前进程广播。 请注意,需要相同 所有调用此函数的分布式进程。 - len(input_tensor_list)
- group (ProcessGroup,可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。 
- async_op (bool, optional) – 此运算是否应为异步运算 
 
- 返回
- 异步工作句柄(如果 async_op 设置为 True)。 无,如果不是 async_op 或不属于组 
 
- 
torch.distributed.reduce_scatter_multigpu(output_tensor_list、input_tensor_lists、op=<ReduceOp.SUM:0>、group=None、async_op=False)[来源]¶
- 将张量列表减少并分散到整个组中。只有 nccl 后端 当前受支持。 - 中的每个张量都应该驻留在单独的 GPU 上,因为 应 中的每个张量列表。 - output_tensor_list- input_tensor_lists- 参数
- output_tensor_list (List[Tensor]) – - 输出张量(在不同的 GPU 上) 以接收作结果。 - 请注意,所有 调用该函数的分布式进程。 - len(output_tensor_list)
- input_tensor_lists (List[List[Tensor]]) – - 输入列表。它应该 在每个 GPU 上包含大小正确的张量,用于输入 集体,例如 包含 reduce_scatter 的 GPU 上。 - input_tensor_lists[i]- output_tensor_list[i]- 请注意,每个元素的大小都是 ,因为函数 分散组中每个 GPU 的结果。自 解释 的每个元素,请注意,秩 k 接收 reduce-scattered 结果来自 - input_tensor_lists- world_size * len(output_tensor_list)- input_tensor_lists[i]- output_tensor_list[j]- input_tensor_lists[i][k * world_size + j]- 另请注意,以及每个 element in (每个元素都是一个列表, 因此 ) 需要相同 所有调用此函数的分布式进程。 - len(input_tensor_lists)- input_tensor_lists- len(input_tensor_lists[i])
- group (ProcessGroup,可选) – 要处理的流程组。如果为 None,则 将使用默认进程组。 
- async_op (bool, optional) – 此运算是否应为异步运算。 
 
- 返回
- 异步工作句柄(如果 async_op 设置为 True)。 如果不是 async_op 或不是组的一部分,则无。 
 
第三方后端¶
除了内置的 GLOO/MPI/NCCL 后端外,PyTorch 分布式还支持
第三方后端。
有关如何通过 C++ Extension 开发第三方后端的参考,
请参考 教程 - 自定义 C++ 和 CUDA 扩展 和 。第三方的能力
backend 由它们自己的 implementations 决定。test/cpp_extensions/cpp_c10d_extension.cpp
新后端派生自并注册后端
name 和实例化接口c10d::ProcessGrouptorch.distributed.Backend.register_backend()导入时。
当手动导入此后端并调用torch.distributed.init_process_group()替换为相应的后端名称,则软件包将在
新的后端。torch.distributed
警告
对第三方后端的支持是实验性的,可能会发生变化。
Launch 实用程序¶
torch.distributed 包还在 torch.distributed.launch 中提供了启动实用程序。此帮助程序实用程序可用于启动 每个节点有多个进程,用于分布式训练。
torch.distributed.launch是一个产生多个分布式
每个训练节点上的训练过程。
警告
此模块将被弃用,取而代之的是 torchrun。
该实用程序可用于单节点分布式训练,其中 1 个 或 每个节点将生成更多进程。该实用程序可用于 CPU 训练或 GPU 训练。如果该实用程序用于 GPU 训练, 每个分布式进程都将在单个 GPU 上运行。这可以实现 大幅提升单节点训练性能。它也可以用于 多节点分布式训练,在每个节点上生成多个进程 还可以很好地提高多节点分布式训练性能。 这对于具有多个 Infiniband 的系统尤其有利 具有直接 GPU 支持的接口,因为它们都可用于 聚合通信带宽。
在单节点分布式训练或多节点分布式两种情况下
training 中,此实用程序将为每个节点启动给定数量的进程
().如果用于 GPU 训练,则此数字需要更小
或等于当前系统上的 GPU 数量 (),
并且每个进程都将在从 GPU 0 到
GPU (nproc_per_node - 1) 的 GPU ( - 1)。--nproc_per_nodenproc_per_node
如何使用此模块:
- 单节点多进程分布式训练 
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
           arguments of your training script)
- 多节点多进程分布式训练:(例如两个节点) 
节点 1:(IP:192.168.1.1,具有空闲端口:1234)
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
节点 2:
>>> python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
- 要查找此模块提供的可选参数: 
>>> python -m torch.distributed.launch --help
重要通知:
1. 本实用程序和多进程分布式(单节点或 多节点)GPU 训练目前只能使用 NCCL 分布式后端。因此,NCCL 后端是推荐的后端 用于 GPU 训练。
2. 在您的训练程序中,您必须解析命令行参数:,该参数将由本模块提供。
如果您的训练程序使用 GPU,则应确保仅
运行在 LOCAL_PROCESS_RANK 的 GPU 设备上。这可以通过以下方式完成:--local_rank=LOCAL_PROCESS_RANK
解析 local_rank 参数
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local_rank", type=int)
>>> args = parser.parse_args()
使用 either 将设备设置为本地排名
>>> torch.cuda.set_device(args.local_rank)  # before your code runs
或
>>> with torch.cuda.device(args.local_rank):
>>>    # your code to run
3. 在您的训练程序中,您应该调用以下函数
在开始时启动分布式后端。强烈建议
那。其他 init 方法(例如 )可能有效,
但是是本模块正式支持的那个。init_method=env://tcp://env://
torch.distributed.init_process_group(backend='YOUR BACKEND',
                                     init_method='env://')
4. 在您的训练计划中,您可以使用常规分布式函数
或使用torch.nn.parallel.DistributedDataParallel()模块。如果您的
训练程序使用 GPU 进行训练,您希望使用torch.nn.parallel.DistributedDataParallel()模块
以下是配置方法。
model = torch.nn.parallel.DistributedDataParallel(model,
                                                  device_ids=[args.local_rank],
                                                  output_device=args.local_rank)
请确保将参数设置为唯一的 GPU 设备 ID
您的代码将在其上运行。这通常是
过程。换句话说,需要是 ,
并且需要 be 才能使用它
效用device_idsdevice_ids[args.local_rank]output_deviceargs.local_rank
5. 另一种通过环境变量传递给子进程的方法。当您使用 .您必须调整上面的 subprocess 示例以替换为 ;启动器
不会通过指定此标志。local_rankLOCAL_RANK--use_env=Trueargs.local_rankos.environ['LOCAL_RANK']--local_rank
警告
local_rank不是全局唯一的:它仅每个进程唯一
在计算机上。因此,不要用它来决定是否应该这样做,例如,
写入网络文件系统。请参阅 https://github.com/pytorch/pytorch/issues/12042 的示例
如果你没有正确地做到这一点,事情会怎么出错。
生成实用程序¶
Multiprocessing 包 - torch.multiprocessing 包还在spawntorch.multiprocessing.spawn().此辅助函数
可用于生成多个进程。它的工作原理是将
函数,并生成 N 个进程来运行它。这
也可用于多进程分布式训练。
有关如何使用它的参考,请参考 PyTorch 示例 - ImageNet 实现
请注意,此功能需要 Python 3.4 或更高版本。
调试应用程序torch.distributed¶
调试分布式应用程序可能具有挑战性,因为难以理解挂起、崩溃或跨等级的行为不一致。 提供
一套工具,可帮助以自助方式调试训练应用程序:torch.distributed
监控屏障¶
从 v1.10 开始,torch.distributed.monitored_barrier()作为torch.distributed.barrier()失败,没有关于哪个等级可能有问题的有用信息
当崩溃时,即并非所有 rank 都调用torch.distributed.monitored_barrier()在提供的超时时间内。torch.distributed.monitored_barrier()实现主机端
barrier using / communication 原语在类似于确认的过程中,允许排名 0 报告哪些排名未能确认
时间的障碍。例如,请考虑以下函数,其中 rank 1 无法调用sendrecvtorch.distributed.monitored_barrier()(在实践中,这可能是由于
添加到应用程序 bug 或挂起在上一个集合中):
import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    # monitored barrier requires gloo process group to perform host-side sync.
    group_gloo = dist.new_group(backend="gloo")
    if rank not in [1]:
        dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))
if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    mp.spawn(worker, nprocs=2, args=())
排名 0 时生成以下错误消息,允许用户确定哪些排名可能有问题并进一步调查:
RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
 Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594
TORCH_DISTRIBUTED_DEBUG¶
使用 ,环境变量可用于触发其他有用的日志记录和集体同步检查,以确保所有排名
已适当同步。 可以设置为 (default)、 或 取决于调试级别
必填。请注意,最详细的选项可能会影响应用程序性能,因此只能在调试问题时使用。TORCH_CPP_LOG_LEVEL=INFOTORCH_DISTRIBUTED_DEBUGTORCH_DISTRIBUTED_DEBUGOFFINFODETAILDETAIL
设置将导致使用TORCH_DISTRIBUTED_DEBUG=INFOtorch.nn.parallel.DistributedDataParallel()初始化,并将额外记录选定迭代次数的运行时性能统计信息。这些运行时统计信息
包括前进时间、后退时间、梯度通信时间等数据。例如,给定以下应用程序:TORCH_DISTRIBUTED_DEBUG=DETAIL
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
class TwoLinLayerNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.a = torch.nn.Linear(10, 10, bias=False)
        self.b = torch.nn.Linear(10, 1, bias=False)
    def forward(self, x):
        a = self.a(x)
        b = self.b(x)
        return (a, b)
def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    print("init model")
    model = TwoLinLayerNet().cuda()
    print("init ddp")
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
    inp = torch.randn(10, 10).cuda()
    print("train")
    for _ in range(20):
        output = ddp_model(inp)
        loss = output[0] + output[1]
        loss.sum().backward()
if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ[
        "TORCH_DISTRIBUTED_DEBUG"
    ] = "DETAIL"  # set to DETAIL for runtime logging.
    mp.spawn(worker, nprocs=2, args=())
初始化时呈现以下日志:
I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO
以下日志在运行时(设置时)呈现:TORCH_DISTRIBUTED_DEBUG=DETAIL
I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 40838608
 Avg backward compute time: 5983335
Avg backward comm. time: 4326421
 Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 42850427
 Avg backward compute time: 3885553
Avg backward comm. time: 2357981
 Avg backward comm/comp overlap time: 2234674
此外,还增强了TORCH_DISTRIBUTED_DEBUG=INFOtorch.nn.parallel.DistributedDataParallel()由于模型中未使用的参数。目前,必须传入find_unused_parameters=Truetorch.nn.parallel.DistributedDataParallel()如果存在 Forward Pass 中可能未使用的参数,则进行初始化,并且从 v1.10 开始,所有模型输出都是必需的
在损失计算中用作torch.nn.parallel.DistributedDataParallel()不支持向后传递中未使用的参数。这些限制尤其具有挑战性,尤其是对于较大的
models,因此当崩溃时出现错误,torch.nn.parallel.DistributedDataParallel()将记录所有未使用的参数的完全限定名称。例如,在上面的应用程序中,
如果我们修改为改为 计算为 ,则在向后传递中不会接收梯度,并且
因此会导致失败。在崩溃时,用户会传递有关未使用的参数的信息,这对于大型模型来说可能很难手动查找:lossloss = output[1]TwoLinLayerNet.aDDP
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
 the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0
设置将在用户发出的每个集体调用上触发额外的一致性和同步检查
直接或间接(例如 DDP)。这是通过创建一个包装进程组来完成的,该包装进程组包装由TORCH_DISTRIBUTED_DEBUG=DETAILallreducetorch.distributed.init_process_group()和torch.distributed.new_group()蜜蜂属。因此,这些 API 将返回一个包装器进程组,该组可以像常规进程一样使用
组,但在将集合体分派到底层进程组之前执行一致性检查。目前,这些检查包括torch.distributed.monitored_barrier(),
这可确保所有等级完成其出色的集体呼叫并报告卡住的等级。接下来,通过以下方式检查 Collective 本身的一致性
确保所有集合函数都匹配并以一致的张量形状调用。如果不是这种情况,则在
应用程序崩溃,而不是挂起或无信息性的错误消息。例如,请考虑以下函数,该函数将不匹配的输入形状转换为torch.distributed.all_reduce():
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    tensor = torch.randn(10 if rank == 0 else 20).cuda()
    dist.all_reduce(tensor)
    torch.cuda.synchronize(device=rank)
if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
    mp.spawn(worker, nprocs=2, args=())
对于后端,此类应用程序可能会导致挂起,在重要情况下,这可能很难找到根本原因。如果用户启用并重新运行应用程序,则以下错误消息将揭示根本原因:NCCLTORCH_DISTRIBUTED_DEBUG=DETAIL
work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes:  10
[ torch.LongTensor{1} ]
注意
为了在运行时对调试级别进行精细控制,也可以使用函数 , , 和 。torch.distributed.set_debug_level()torch.distributed.set_debug_level_from_env()torch.distributed.get_debug_level()
此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可以与 TORCH_SHOW_CPP_STACKTRACES=1 结合使用,以便在检测到集体不同步时记录整个调用堆栈。这些
集体反同步检查将适用于所有使用集体调用的应用程序,这些调用由使用c10dtorch.distributed.init_process_group()和torch.distributed.new_group()蜜蜂属。
伐木¶
除了通过torch.distributed.monitored_barrier()和 ,底层 C++ 库也输出 log
消息。这些消息有助于了解分布式训练作业的执行状态,并排查网络连接故障等问题。这
以下 Matrix 显示了如何通过 和 环境变量的组合来调整日志级别。TORCH_DISTRIBUTED_DEBUGtorch.distributedTORCH_CPP_LOG_LEVELTORCH_DISTRIBUTED_DEBUG
| 
 | 
 | 有效日志级别 | 
|---|---|---|
| 
 | 忽视 | 错误 | 
| 
 | 忽视 | 警告 | 
| 
 | 忽视 | 信息 | 
| 
 | 
 | 调试 | 
| 
 | 
 | Trace (又名全部) |