现如今,深度学习模型的规模和复杂度正以前所未有的速度增长,从简单的线性模型到如今动辄数百数千亿参数的大模型,单 GPU 的计算能力已难以满足高效训练的需求。并行训练技术应运而生,它合理分配计算任务到多个 GPU 或多台机器上,显著提升了训练效率。本文将介绍 PyTorch 中几种常见的并行训练方法,包括数据并行(Data Parallel)、分布式数据并行(Distributed Data Parallel,DDP)、Hugging Face 的 Accelerate 工具库等。通过对比和代码示例,帮助读者快速掌握这些技术的原理和使用方法。
# 前期准备
为了演示不同的并行训练方案,首先需要准备对应的训练数据和模型。由于现在不需要关注模型的准确率等指标,因此我们可以使用随机数据来进行训练。
# data.py | |
import torch | |
import numpy as np | |
n = 5000 | |
feature = torch.randint(0, 256, (n, 64, 64, 3)) | |
label = torch.randint(0, 10, (n,)) | |
print(feature.shape, label.shape) | |
print(feature[0], label[0]) | |
np.savez("data.npz", feature=feature.numpy(), label=label.numpy()) |
对于模型,使用简单的 ResNet18 即可。
# train.py | |
import os | |
import torch | |
import time | |
import numpy as np | |
import torchvision.models as models | |
import torch.distributed as dist | |
import torch.nn as nn | |
from torch.nn.parallel import DistributedDataParallel as DDP | |
def get_model(): | |
model = models.resnet18(weights=None) | |
model.conv1 = torch.nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False) | |
model.fc = torch.nn.Linear(512, 10) | |
return model |
至此,数据和模型都准备好了,接下来我们就可以开始介绍不同的并行训练方案了。
# Data Parallel
# 原理介绍
Data Parallel(DP)数据并行是一种常见的分布式训练方法。在 DP 中,模型的副本会被复制到每个 GPU 上,每一批次的数据会被划分成多个子批次,分别分配到各个 GPU 上进行前向传播,反向传播计算出对应的梯度。然后,这些梯度会被汇总并应用到主模型上,从而更新模型的参数,然后再将更新后的参数广播到各个 GPU 上。
在这里,DP 仅会创建一个进程,所以会受到 Python 全局解释器锁(GIL)的影响。不仅如此,由于所有参数的梯度需要汇总到主模型上,每次 optimizer.step() 都需要进行通信操作,因此 DP 的并行训练效率较低。
# 代码实现
DP 的训练方案实现最为简单,只需要在原有模型的基础上,使用 torch.nn.DataParallel 包装一下即可。接下来逐步介绍 DP 的实现过程。
首先定义数据加载器,这和之前的单 GPU 训练是一样的,其中的 batch_size 指的是每次从数据集中取出的样本数量,而不是每个 GPU 上的样本数量。
# train.py | |
def get_dp_dataset_loader(): | |
data = np.load("data.npz") | |
feature = data["feature"] | |
label = data["label"] | |
feature = torch.tensor(feature, dtype=torch.float32).permute(0, 3, 1, 2) / 255.0 | |
label = torch.tensor(label, dtype=torch.long) | |
dataset = torch.utils.data.TensorDataset(feature, label) | |
data_loader = torch.utils.data.DataLoader(dataset, batch_size=16, shuffle=True, num_workers=2) | |
return data_loader |
然后定义训练函数 train_dp ,使用 DP 训练模型:
# train.py | |
def train_dp(): | |
device_ids = list(range(torch.cuda.device_count())) # 默认使用所有 GPU | |
model = get_model().to(device_ids[0]) # 将模型放到主 GPU 上 | |
model = nn.DataParallel(model, device_ids=device_ids) # 这里使用 DataParallel 包装模型 | |
data_loader = get_dp_dataset_loader() | |
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) | |
loss_fn = torch.nn.CrossEntropyLoss() | |
model.train() | |
for epoch in range(100): | |
epoch_loss = 0.0 | |
num_batches = 0 | |
for _, (feature, label) in enumerate(data_loader): | |
feature = feature.to(device_ids[0], non_blocking=True) # 将数据放到主 GPU 上 | |
label = label.to(device_ids[0], non_blocking=True) # 将数据放到主 GPU 上 | |
# 代码不变 | |
output = model(feature) | |
loss = loss_fn(output, label) | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
epoch_loss += loss.item() | |
num_batches += 1 | |
epoch_loss /= num_batches | |
print(f"Epoch {epoch} - Loss: {epoch_loss:.6f}") | |
if __name__ == "__main__": | |
train_dp() |
这里为了简代码,默认使用所有可用的 GPU 进行训练。
执行以下命令运行训练脚本:
python train.py |
DP 使用 torch.nn.DataParallel 包装模型。 torch.nn.DataParallel 包装后的模型内部实现了多 GPU 的数据拆分、模型复制和梯度聚合,对外的接口不变。另外,在模型初始化、数据初始化时都需要先放在主 GPU(默认是第一个 GPU)上,然后由主 GPU 负责复制模型,拆分数据并分发到各个 GPU 上进行计算,最后再将各个 GPU 上计算得到的梯度汇总到主 GPU 上进行参数更新。
因此,只需几行代码即可将单 GPU 训练代码改为多 GPU 训练代码。但是,DP 训练方式中,主 GPU 不仅要参与训练,还负责数据分发和梯度收集,低效的通信方式成为性能瓶颈,往往导致多卡利用率低。实测下来,DP 多卡训练甚至效率不如单卡训练,因此不推荐使用。
# Distributed Data Parallel
# 原理介绍
Distributed Data Parallel(DDP)并行化训练模型的原理类似于 DP,都是将一个批次的数据拆分到多个 GPU 上并发训练,最后汇总梯度更新模型,但采用了更加高效的实现方法:
- 在 DDP 模式下,有 N 个进程被启动,每个进程在一张卡上加载一个模型,这些模型的参数在数值上是相同的。
- 在模型训练时,各个进程通过一种叫 Ring-Reduce 的方法与其他进程通讯,交换各自的梯度,提高了通讯效率。
- 各个进程用平均后的梯度更新自己的参数,因为各个进程的初始参数、更新梯度是一致的,所以更新后的参数也是完全相同的。
一般来说,DDP 都是显著地比 DP 快,能达到略低于卡数的加速比(例如,四卡下加速 3 倍)。所以,其是目前最流行的多机多卡训练方法。
更精确来说,PyTorch 中使用 DDP 有如下三种情况:
- 每个进程一张卡。这是 DDP 的最佳使用方法。
- 每个进程多张卡,复制模式。一个模型复制在不同卡上面,每个进程都实质等同于 DP 模式。速度不如第一种方法,一般不采用。
- 每个进程多张卡,并行模式。一个模型的不同部分分布在不同的卡上面。这种场景,一般是因为我们的模型非常大,大到一张卡都塞不下一个模型。
在这里,我们仅讨论第一种情况。原因是,第二种情况完全没有必要,效率比第一种低;而第三种情况涉及到数据并行以外的并行方式(模型并行,张量并行,流水线并行等),相对更加复杂,不同模型之间的实现方法也各不相同,因此不作介绍。
参考资料:
# 代码实现
接着介绍 DDP 并行训练的实现过程。
首先定义数据加载器 get_ddp_dataset_loader ,与 DP 的数据加载器不同的是,这里使用了 torch.utils.data.distributed.DistributedSampler 来划分数据集,使得每个进程只处理自己负责的那一部分数据。此时 DataLoader 中的 batch_size 指的是每个进程处理的样本数量,并且不需要设置 shuffle=True ,因为 DistributedSampler 会自动处理数据的打乱。
# train.py | |
def get_ddp_dataset_loader(): | |
data = np.load("data.npz") | |
feature = data["feature"] | |
label = data["label"] | |
feature = torch.tensor(feature, dtype=torch.float32).permute(0, 3, 1, 2) / 255.0 | |
label = torch.tensor(label, dtype=torch.long) | |
dataset = torch.utils.data.TensorDataset(feature, label) | |
dataset_sampler = torch.utils.data.distributed.DistributedSampler(dataset) # 新增 | |
data_loader = torch.utils.data.DataLoader(dataset, batch_size=16, num_workers=2, | |
sampler=dataset_sampler) | |
return data_loader |
接着定义 train_ddp ,封装 DDP 的训练函数:
from torch.nn.parallel import DistributedDataParallel as DDP | |
def train_ddp(): | |
# 初始化分布式环境 | |
dist.init_process_group("nccl") | |
local_rank = int(os.environ["LOCAL_RANK"]) | |
torch.cuda.set_device(local_rank) | |
# 构造模型 | |
model = get_model().cuda(local_rank) | |
ckpt_path = None | |
if dist.get_rank() == 0 and ckpt_path is not None: | |
model.load_state_dict(torch.load(ckpt_path)) | |
model = DDP(model, device_ids=[local_rank]) # 这里使用 DDP 包装模型 | |
data_loader = get_ddp_dataset_loader() | |
# 用 DDP model 初始化 optimizer。 | |
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) | |
loss_fn = torch.nn.CrossEntropyLoss() | |
model.train() | |
for epoch in range(100): | |
data_loader.sampler.set_epoch(epoch) # 每个 epoch 调用,打乱数据 | |
epoch_loss = 0.0 | |
num_batches = 0 | |
for _, (feature, label) in enumerate(data_loader): | |
feature = feature.cuda(local_rank, non_blocking=True) # 数据放到对应 GPU | |
label = label.cuda(local_rank, non_blocking=True) | |
output = model(feature) | |
loss = loss_fn(output, label) | |
optimizer.zero_grad() | |
loss.backward() # 内部自动完成梯度同步 | |
optimizer.step() | |
epoch_loss += loss.item() | |
num_batches += 1 | |
epoch_loss /= num_batches | |
loss_tensor = torch.tensor(epoch_loss).cuda(local_rank) | |
dist.all_reduce(loss_tensor, op=dist.ReduceOp.SUM) # 使用 all_reduce 计算全局 loss | |
loss_avg = loss_tensor.item() / dist.get_world_size() | |
if local_rank == 0: | |
print(f"Epoch {epoch} - Global Loss: {loss_avg:.6f}") | |
dist.destroy_process_group() # 清理分布式环境 | |
if __name__ == "__main__": | |
train_dp() |
相比于 DP,DDP 的训练函数有以下几点不同:
- 初始化分布式环境:代码最开始需要使用
dist.init_process_group("nccl")来初始化分布式训练环境,指定通信后端为 NCCL(适用于 GPU 之间的高效通信)。各个进程会在这一步,与 master 节点进行握手,建立连接(单机环境默认 master 是第一个进程即 global rank 0)。 - 设置本地 GPU:通过读取环境变量
LOCAL_RANK来确定当前进程使用的 GPU,并调用torch.cuda.set_device(local_rank)设置当前进程的默认 GPU。 - 使用 DDP 包装模型:将模型移动到对应的 GPU 上,并使用
DistributedDataParallel包装模型,传入当前进程的 GPU ID。如果需要从某个 checkpoint 继续训练,那么需要在构造 DDP 模型之前使用load_state_dict,并且只需要在 master 上加载。 - 构造 DDP 模型后,再初始化对应的优化器。
- 数据加载器使用
DistributedSampler:确保每个进程只处理自己负责的数据子集,并在每个 epoch 开始时调用data_loader.sampler.set_epoch(epoch)来打乱数据,确保进程间的数据不重复。 - 训练过程中,每个进程都在对应的 GPU 上进行训练,因此数据也必须放到对应的 GPU 中。
- 使用
dist.all_reduce来在所有进程间同步数据,比如计算全局的平均损失。 - 仅在主进程(
local_rank == 0)打印日志,避免重复输出。 - 如果需要保存模型(上述示例中未包含保存模型的代码),只需在主进程上使用
model.module.state_dict()来获取模型参数。其中model.module是因为 DDP 包装后的模型,实际的模型在module属性中。 - 训练结束后调用
dist.destroy_process_group()清理分布式环境。
上述脚本有两种运行方法:
-
torch.distributed.launch:早期启动方法,基本已弃用。python -m torch.distributed.launch --nproc_per_node=4 train.py
-
torchrun:官方推荐的启动方式,它是对旧的torch.distributed.launch的替代。CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --nproc_per_node=4 train.py
CUDA_VISIBLE_DEVICES用于指定可见的 GPU 设备,--nproc_per_node指定每个节点使用的 GPU 数量,会自动设置每个进程的RANK、LOCAL_RANK等环境变量,一般与CUDA_VISIBLE_DEVICES中的 GPU 数量一致。对于多机多卡,
torchrun命令需要指定更多参数:# 节点 0CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --nnodes=2 --nproc_per_node=4 --node_rank=0 --master_addr="192.168.0.10" --master_port=23456 train.py
# 节点 1CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --nnodes=2 --nproc_per_node=4 --node_rank=1 --master_addr="192.168.0.10" --master_port=23456 train.py
其中
--nnodes=2表示总共有 2 台机器,--nproc_per_node=4:指定每台机器用 4 张 GPU;--node_rank指定当前机器是第几台(从 0 开始);--master_addr为主节点 IP(一般写 node 0 的 IP);--master_port为主节点的通信端口。
# 补充说明
至此,DDP 的并行训练的基本流程就介绍完了,下面再介绍一些 DDP 的其他相关知识。
首先来说明 PyTorch 模型中一类特殊的参数 Buffer,它们不属于 Parameter,反向传播计算过程中也不会计算这些参数的梯度,因此 Buffer 参数是不通过梯度进行更新的。常见的 Buffer 有 BatchNorm 中的 running_mean , running_var ,它们在训练计算时直接由计算规则更新。
那么存在一个问题,对于 DDP 来说,每个模型事实上都维护了自己的 Buffer 参数,而 Buffer 参数的更新取决于当前的训练数据,它不像梯度一样会被广播,这将导致不同进程间的模型状态不一致。对于这个问题,DDP 内部的实现方法是:直接将 master 上的模型 Buffer 广播复制给其他进程。不难想到,这么做存在一个问题:如果 Buffer 依赖于数据分布,那么 master 上的 Buffer 很可能和其他进程上的 Buffer 差异较大,直接广播会导致其他进程的 Buffer 变得不准确。所以 PyTorch 提供 SyncBatchNorm 来解决这个问题。
SyncBatchNorm 中各卡先本地算出自己的 mean /var,然后通过 all_reduce 算出全局的 mean /var,最后所有卡使用这个全局的 mean /var 来正则化激活,这样就保证了各卡的 BatchNorm 统计量更准确,训练更稳定。核心代码如下:在初始化 DDP 模型之前,使用 nn.SyncBatchNorm.convert_sync_batchnorm 函数搜索 model 里面的每一个模块,将 BatchNorm 替换成 SyncBatchNorm 。
model = nn.SyncBatchNorm.convert_sync_batchnorm(model) | |
model = DDP(model, device_ids=[local_rank]) |
此外,对于分支模型(例如 MoE 混合专家模型)来说,不同进程上可能会激活不同的分支,这样就会导致不同进程反向传播时模型激活的参数不一样,从而导致梯度不一致。对于这种情况,默认配置的 DDP 模型会报错,提示 Expected to have gradients for all parameters 。解决方法是使用 find_unused_parameters=True 参数来初始化 DDP 模型:
model = DDP(model, device_ids=[local_rank], find_unused_parameters=True) |
接下来我们探讨 Gradient Accumulation 的情况。在单卡情况下,将几个小批次的 loss 累计起来,等到累计到一定数量后再进行反向传播和参数更新,这样就相当于使用了更大的批次进行训练。对于单卡来说,代码大致是这样的:
optimizer.zero_grad() | |
for i, (feature, label) in enumerate(data_loader): | |
output = model(feature) | |
loss = loss_fn(output, label) / accumulation_steps # 归一化 | |
loss.backward() # 累计梯度 | |
if (i + 1) % accumulation_steps == 0: | |
optimizer.step() # 更新参数 | |
optimizer.zero_grad() # 清空梯度 |
但是对于 DDP 来说, loss_fn 内部会进行 all_reduce 操作来同步梯度,因此如果直接使用上述代码,会导致每个小批次的梯度会通讯一次,这导致计算效率非常低下。正确的做法是使用 no_sync 上下文管理器来跳过中间小批次的梯度同步,只有在最后一个小批次时才进行梯度同步:
optimizer.zero_grad() | |
for i, (feature, label) in enumerate(data_loader): | |
with model.no_sync() if (i + 1) % accumulation_steps != 0 else contextlib.nullcontext(): | |
output = model(feature) | |
loss = loss_fn(output, label) / accumulation_steps # 归一化 | |
loss.backward() # 累计梯度 | |
if (i + 1) % accumulation_steps == 0: | |
optimizer.step() # 更新参数 | |
optimizer.zero_grad() # 清空梯度 |

最后再来讨论 DDP 代码中的一个注意事项,也是易错点,即不同进程间随机种子的设定。一般来说,不同进程为了保证状态统一一般会考虑使用相同的随机种子,比如数据 Sampler 在每个 epoch 会使用相同的种子(epoch)进行划分。但是有个情况不应该使用相同的随机种子,那就是每个进程各自需要增强 / 采样数据的时候。比如在图像处理中,每个进程可能会从数据集中采样图像进行增强(旋转,擦除等),亦或者在强化学习中不同进程需要各自与环境交互获取经验,这些情况下,如果进程的随机种子设置一样,得到的数据也相同,那数据的利用会变得不均匀,导致训练效果变差。
下面写一个统一的管理随机种子的函数:
def setup_seed(base_seed: int, rank: int): | |
seed = base_seed + rank | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed_all(seed) | |
np.random.seed(seed) | |
random.seed(seed) |
其中 base_seed 是基本的种子,一般是脚本传入的参数, rank 对应不同进程编号。当我们需要使用不同的随机数时,传入进程的 rank ,当需要使用相同的随机数时,直接传入 rank=0 。不过一般来说,不同进程直接使用不同的随机种子不会对其他部分产生影响,这也是推荐做法。
# Accelerate
Accelerate 是一个由 Hugging Face 提供的高效工具库,意在简化 PyTorch 模型在各种设备和分布式配置上的训练与推理。通过少量代码即可实现分布式训练、混合精度训练,以及与 DeepSpeed 和 FSDP 的集成。
本文仅介绍核心的基本的使用方法,一些高级功能如 DeepSpeed、Megatron-LM、FSDP 的集成和细节请参考官方文档
相比于 DDP 需要管理分布式环境、进程间通信等细节,Accelerate 提供了更高层次的抽象,用户只需关注模型和训练逻辑,而不必处理底层的分布式实现细节。相比于 DDP,Accelerate 有如下特点:
- 不必手动设置
torch.distributed。 - 自动包装 DDP
DistributedDataParallel。 - 自动处理
DataLoader的DistributedSampler。 - 支持梯度累积、混合精度 (FP16/ BF16)。
下面来看 Accelerate 相关代码。
首先定义数据加载器 get_acc_dataset_loader ,其实现方式与 DP 几乎一致,不同的是这里的 batch_size 指的是每个进程处理的样本数量。
from torch.utils.data import DistributedSampler | |
def get_acc_dataset_loader(): | |
data = np.load("data.npz") | |
feature = data["feature"] | |
label = data["label"] | |
feature = torch.tensor(feature, dtype=torch.float32).permute(0, 3, 1, 2) / 255.0 | |
label = torch.tensor(label, dtype=torch.long) | |
dataset = torch.utils.data.TensorDataset(feature, label) | |
data_loader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True, num_workers=2) | |
return data_loader |
接下来写训练函数 train_acc :
def train_acc(): | |
accelerator = accelerate.Accelerator() # 初始化 Accelerator | |
model = get_model() | |
data_loader = get_acc_dataset_loader() | |
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) | |
loss_fn = torch.nn.CrossEntropyLoss() | |
# 使用 accelerator 准备模型、优化器和数据加载器 | |
model, optimizer, data_loader = accelerator.prepare(model, optimizer, data_loader) | |
model.train() | |
for epoch in range(100): | |
epoch_loss = 0.0 | |
num_batches = 0 | |
for feature, label in data_loader: | |
# 将数据转到正确设备 | |
feature = feature.to(accelerator.device) | |
label = label.to(accelerator.device) | |
output = model(feature) | |
loss = loss_fn(output, label) | |
optimizer.zero_grad() | |
accelerator.backward(loss) # 使用 accelerator 的 backward 方法 | |
optimizer.step() | |
epoch_loss += loss.item() | |
num_batches += 1 | |
epoch_loss /= num_batches | |
# 多卡聚合 loss | |
epoch_loss_tensor = torch.tensor(epoch_loss).to(accelerator.device) | |
accelerator.gather(epoch_loss_tensor) | |
if accelerator.is_main_process: | |
print(f"Epoch {epoch} - Global Loss: {epoch_loss_tensor.mean().item():.6f}") | |
dist.destroy_process_group() # 清理分布式环境 |
由于 Accelerate 本质是对 DDP 的更高一级的封装,所以一些使用细节和 DDP 代码是类似的,总结如下:
- 使用
accelerate.Accelerator()初始化对象。 - 使用
accelerator.prepare()准备模型、优化器和数据加载器 - 训练过程中,每个进程都在对应的 GPU 上进行训练,因此数据也必须放到对应的 GPU 中(使用
accelerator.device获取当前进程分配的 GPU)。 - 使用
accelerator.backward(loss)计算梯度,内部自动完成梯度同步。 - 仅在主进程打印日志,避免重复输出。Accelerate 提供了
accelerator.is_main_process来判断当前进程是否为主进程。 - 如果需要保存模型(上述示例中未包含保存模型的代码),只需在主进程上使用
accelerator.save(model.state_dict(), path)来保存模型参数。 - 训练结束后调用
dist.destroy_process_group()清理分布式环境。
上述代码通过下面脚本运行:
accelerate config # 第一次使用需要配置 | |
CUDA_VISIBLE_DEVICES=0,1 accelerate launch --num_processes=2 train.py |
如果第一次使用 Accelerate 库,需要先进行配置,生成对应的配置文件,之后就不再需要执行配置命令。对于多卡训练,使用 –num_processes 指定使用的 GPU 数量,一个进程对应一个 GPU。如果需要指定使用哪些 GPU,和 DDP 类似,就使用 CUDA_VISIBLE_DEVICES 环境变量来声明。要注意与 --num_processes 保持一致。
Accelerate 实现了更高层次的封装,优点在于高效和简洁,但缺点也很明显,就是不方便自定义设置,特别是某些特定场景下无法使用封装接口的情况(比如强化学习)。
# PyTorch 分布式 API
前面的各种训练方案,许多实现细节都被封装起来了,但它们内部都使用了 torch.distributed 的 API 来实现分布式训练。为方便查缺补漏,接下来介绍一些常用的 PyTorch 分布式 API。
# 初始化分布式环境与清理
标准的初始化分布式环境和清除函数:
import torch | |
import torch.distributed as dist | |
import os | |
def setup(): | |
local_rank = int(os.environ['LOCAL_RANK']) | |
torch.cuda.set_device(local_rank) | |
dist.init_process_group( | |
backend='nccl', # GPU 上常用 'nccl',CPU 可用 'gloo' | |
rank=int(os.environ['RANK']), | |
world_size=int(os.environ['WORLD_SIZE']), | |
device_id=torch.device(f"cuda:{local_rank}") | |
) | |
def cleanup(): | |
dist.destroy_process_group() |
setup 函数在分布式训练开始时调用, cleanup 函数在训练结束时调用。
# 分布式变量
与分布式进程有关的变量核心有四个,对应的获取方法见下表。
| 变量名 | 描述 | 获取 API |
|---|---|---|
rank |
全局进程 ID,唯一标识每个进程 | os.environ["RANK"] 或 dist.get_rank() |
local_rank |
当前节点内的进程编号 | os.environ["LOCAL_RANK"] |
world_size |
全局参与进程数 | os.environ["WORLD_SIZE"] 或 dist.get_world_size() |
node_rank |
当前机器节点编号 | os.environ["NODE_RANK"] |
上述变量获取后记得用 int 强转类型。
# 常用通信 API
PyTorch 的 torch.distributed 提供了丰富的通信原语,常用于参数同步、统计指标计算、分布式采样等。
send/recv:最基础的点对点通信方式,两个进程之间传输数据,一般很少直接使用。
dist.send(tensor, dst=1) # 将 tensor 发送到 rank=1 | |
dist.recv(tensor, src=0) # 从 rank=0 接收 tensor |
阻塞式通信,必须配对使用,否则程序会卡住。
all_reduce:所有进程的向量进行运算,都拿到同样的结果。 op 有其他操作算子,不过最常见的是累加和平均。
dist.all_reduce(tensor, op=dist.ReduceOp.SUM) # 或 .AVG |
reduce:和 all_reduce 类似,但结果只给一个进程。
dist.reduce(tensor, dst=0, op=dist.ReduceOp.SUM) # 在 rank0 汇总 |
broadcast:从一个进程复制向量到所有进程。
dist.broadcast(tensor, src=0) # rank0 想其他进程广播 |
all_gather:所有进程聚集所有数据,然后每个卡都拿到完整副本。
# all_gather 每个 GPU 收到完整数据列表 | |
tensor_list = [torch.zeros_like(tensor) for _ in range(world_size)] | |
dist.all_gather(tensor_list, tensor) |
gather:所有进程发送数据到目标进程,只有它接收。
# gather 只有 rank0 收到 | |
if rank == 0: | |
gather_list = [torch.zeros_like(tensor) for _ in range(world_size)] | |
else: | |
gather_list = None | |
dist.gather(tensor, gather_list, dst=0) |
barrier:等待所有进程都执行到此位置。
dist.barrier() |
