现如今,深度学习模型的规模和复杂度正以前所未有的速度增长,从简单的线性模型到如今动辄数百数千亿参数的大模型,单 GPU 的计算能力已难以满足高效训练的需求。并行训练技术应运而生,它合理分配计算任务到多个 GPU 或多台机器上,显著提升了训练效率。本文将介绍 PyTorch 中几种常见的并行训练方法,包括数据并行(Data Parallel)、分布式数据并行(Distributed Data Parallel,DDP)、Hugging Face 的 Accelerate 工具库等。通过对比和代码示例,帮助读者快速掌握这些技术的原理和使用方法。

# 前期准备

为了演示不同的并行训练方案,首先需要准备对应的训练数据和模型。由于现在不需要关注模型的准确率等指标,因此我们可以使用随机数据来进行训练。

# data.py
import torch
import numpy as np
n = 5000
feature = torch.randint(0, 256, (n, 64, 64, 3))
label = torch.randint(0, 10, (n,))
print(feature.shape, label.shape)
print(feature[0], label[0])
np.savez("data.npz", feature=feature.numpy(), label=label.numpy())

对于模型,使用简单的 ResNet18 即可。

# train.py
import os
import torch
import time
import numpy as np
import torchvision.models as models
import torch.distributed as dist
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
def get_model():
    model = models.resnet18(weights=None)
    model.conv1 = torch.nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
    model.fc = torch.nn.Linear(512, 10)
    return model

至此,数据和模型都准备好了,接下来我们就可以开始介绍不同的并行训练方案了。


# Data Parallel

# 原理介绍

Data Parallel(DP)数据并行是一种常见的分布式训练方法。在 DP 中,模型的副本会被复制到每个 GPU 上,每一批次的数据会被划分成多个子批次,分别分配到各个 GPU 上进行前向传播,反向传播计算出对应的梯度。然后,这些梯度会被汇总并应用到主模型上,从而更新模型的参数,然后再将更新后的参数广播到各个 GPU 上

在这里,DP 仅会创建一个进程,所以会受到 Python 全局解释器锁(GIL)的影响。不仅如此,由于所有参数的梯度需要汇总到主模型上,每次 optimizer.step() 都需要进行通信操作,因此 DP 的并行训练效率较低。

参考资料:深入理解 PyTorch 数据并行模块 DataParallel 及其反向传播细节 - 知乎

# 代码实现

DP 的训练方案实现最为简单,只需要在原有模型的基础上,使用 torch.nn.DataParallel 包装一下即可。接下来逐步介绍 DP 的实现过程。

首先定义数据加载器,这和之前的单 GPU 训练是一样的,其中的 batch_size 指的是每次从数据集中取出的样本数量,而不是每个 GPU 上的样本数量。

# train.py
def get_dp_dataset_loader():
    data = np.load("data.npz")
    feature = data["feature"]
    label = data["label"]
    feature = torch.tensor(feature, dtype=torch.float32).permute(0, 3, 1, 2) / 255.0
    label = torch.tensor(label, dtype=torch.long)
    dataset = torch.utils.data.TensorDataset(feature, label)
    data_loader = torch.utils.data.DataLoader(dataset, batch_size=16, shuffle=True, num_workers=2)
    return data_loader

然后定义训练函数 train_dp ,使用 DP 训练模型:

# train.py
def train_dp():
    device_ids = list(range(torch.cuda.device_count()))  # 默认使用所有 GPU
    model = get_model().to(device_ids[0])  # 将模型放到主 GPU 上
    model = nn.DataParallel(model, device_ids=device_ids)  # 这里使用 DataParallel 包装模型
    data_loader = get_dp_dataset_loader()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    loss_fn = torch.nn.CrossEntropyLoss()
    model.train()
    for epoch in range(100):
        epoch_loss = 0.0
        num_batches = 0
        for _, (feature, label) in enumerate(data_loader):
            feature = feature.to(device_ids[0], non_blocking=True)  # 将数据放到主 GPU 上
            label = label.to(device_ids[0], non_blocking=True)  # 将数据放到主 GPU 上
            # 代码不变
            output = model(feature)
            loss = loss_fn(output, label)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            num_batches += 1
        epoch_loss /= num_batches
        print(f"Epoch {epoch} - Loss: {epoch_loss:.6f}")
if __name__ == "__main__":
    train_dp()

这里为了简代码,默认使用所有可用的 GPU 进行训练。

执行以下命令运行训练脚本:

python train.py

DP 使用 torch.nn.DataParallel 包装模型。 torch.nn.DataParallel 包装后的模型内部实现了多 GPU 的数据拆分、模型复制和梯度聚合,对外的接口不变。另外,在模型初始化、数据初始化时都需要先放在主 GPU(默认是第一个 GPU)上,然后由主 GPU 负责复制模型,拆分数据并分发到各个 GPU 上进行计算,最后再将各个 GPU 上计算得到的梯度汇总到主 GPU 上进行参数更新。

因此,只需几行代码即可将单 GPU 训练代码改为多 GPU 训练代码。但是,DP 训练方式中,主 GPU 不仅要参与训练,还负责数据分发和梯度收集,低效的通信方式成为性能瓶颈,往往导致多卡利用率低。实测下来,DP 多卡训练甚至效率不如单卡训练,因此不推荐使用。


# Distributed Data Parallel

# 原理介绍

Distributed Data Parallel(DDP)并行化训练模型的原理类似于 DP,都是将一个批次的数据拆分到多个 GPU 上并发训练,最后汇总梯度更新模型,但采用了更加高效的实现方法:

  1. 在 DDP 模式下,有 N 个进程被启动,每个进程在一张卡上加载一个模型,这些模型的参数在数值上是相同的。
  2. 在模型训练时,各个进程通过一种叫 Ring-Reduce 的方法与其他进程通讯,交换各自的梯度,提高了通讯效率。
  3. 各个进程用平均后的梯度更新自己的参数,因为各个进程的初始参数、更新梯度是一致的,所以更新后的参数也是完全相同的。

一般来说,DDP 都是显著地比 DP 快,能达到略低于卡数的加速比(例如,四卡下加速 3 倍)。所以,其是目前最流行的多机多卡训练方法。

更精确来说,PyTorch 中使用 DDP 有如下三种情况:

  • 每个进程一张卡。这是 DDP 的最佳使用方法。
  • 每个进程多张卡,复制模式。一个模型复制在不同卡上面,每个进程都实质等同于 DP 模式。速度不如第一种方法,一般不采用。
  • 每个进程多张卡,并行模式。一个模型的不同部分分布在不同的卡上面。这种场景,一般是因为我们的模型非常大,大到一张卡都塞不下一个模型。

在这里,我们仅讨论第一种情况。原因是,第二种情况完全没有必要,效率比第一种低;而第三种情况涉及到数据并行以外的并行方式(模型并行,张量并行,流水线并行等),相对更加复杂,不同模型之间的实现方法也各不相同,因此不作介绍。

参考资料:

# 代码实现

接着介绍 DDP 并行训练的实现过程。

首先定义数据加载器 get_ddp_dataset_loader ,与 DP 的数据加载器不同的是,这里使用了 torch.utils.data.distributed.DistributedSampler 来划分数据集,使得每个进程只处理自己负责的那一部分数据。此时 DataLoader 中的 batch_size 指的是每个进程处理的样本数量,并且不需要设置 shuffle=True ,因为 DistributedSampler 会自动处理数据的打乱。

# train.py
def get_ddp_dataset_loader():
    data = np.load("data.npz")
    feature = data["feature"]
    label = data["label"]
    feature = torch.tensor(feature, dtype=torch.float32).permute(0, 3, 1, 2) / 255.0
    label = torch.tensor(label, dtype=torch.long)
    dataset = torch.utils.data.TensorDataset(feature, label)
    dataset_sampler = torch.utils.data.distributed.DistributedSampler(dataset)  # 新增
    data_loader = torch.utils.data.DataLoader(dataset, batch_size=16, num_workers=2,
                                              sampler=dataset_sampler)
    return data_loader

接着定义 train_ddp ,封装 DDP 的训练函数:

from torch.nn.parallel import DistributedDataParallel as DDP
def train_ddp():
    # 初始化分布式环境
    dist.init_process_group("nccl")
    local_rank = int(os.environ["LOCAL_RANK"])
    torch.cuda.set_device(local_rank)
    # 构造模型
    model = get_model().cuda(local_rank)
    ckpt_path = None
    if dist.get_rank() == 0 and ckpt_path is not None:
        model.load_state_dict(torch.load(ckpt_path))
    model = DDP(model, device_ids=[local_rank])  # 这里使用 DDP 包装模型
    data_loader = get_ddp_dataset_loader()
    # 用 DDP model 初始化 optimizer。
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    loss_fn = torch.nn.CrossEntropyLoss()
    model.train()
    for epoch in range(100):
        data_loader.sampler.set_epoch(epoch)  # 每个 epoch 调用,打乱数据
        epoch_loss = 0.0
        num_batches = 0
        for _, (feature, label) in enumerate(data_loader):
            feature = feature.cuda(local_rank, non_blocking=True)  # 数据放到对应 GPU
            label = label.cuda(local_rank, non_blocking=True)
            output = model(feature)
            loss = loss_fn(output, label)
            
            optimizer.zero_grad()
            loss.backward()  # 内部自动完成梯度同步
            optimizer.step()
            epoch_loss += loss.item()
            num_batches += 1
        epoch_loss /= num_batches
        loss_tensor = torch.tensor(epoch_loss).cuda(local_rank)
        dist.all_reduce(loss_tensor, op=dist.ReduceOp.SUM)  # 使用 all_reduce 计算全局 loss
        loss_avg = loss_tensor.item() / dist.get_world_size()
        if local_rank == 0:
            print(f"Epoch {epoch} - Global Loss: {loss_avg:.6f}")
    dist.destroy_process_group()  # 清理分布式环境
if __name__ == "__main__":
    train_dp()

相比于 DP,DDP 的训练函数有以下几点不同:

  1. 初始化分布式环境:代码最开始需要使用 dist.init_process_group("nccl") 来初始化分布式训练环境,指定通信后端为 NCCL(适用于 GPU 之间的高效通信)。各个进程会在这一步,与 master 节点进行握手,建立连接(单机环境默认 master 是第一个进程即 global rank 0)。
  2. 设置本地 GPU:通过读取环境变量 LOCAL_RANK 来确定当前进程使用的 GPU,并调用 torch.cuda.set_device(local_rank) 设置当前进程的默认 GPU。
  3. 使用 DDP 包装模型:将模型移动到对应的 GPU 上,并使用 DistributedDataParallel 包装模型,传入当前进程的 GPU ID。如果需要从某个 checkpoint 继续训练,那么需要在构造 DDP 模型之前使用 load_state_dict ,并且只需要在 master 上加载。
  4. 构造 DDP 模型后,再初始化对应的优化器。
  5. 数据加载器使用 DistributedSampler :确保每个进程只处理自己负责的数据子集,并在每个 epoch 开始时调用 data_loader.sampler.set_epoch(epoch) 来打乱数据,确保进程间的数据不重复。
  6. 训练过程中,每个进程都在对应的 GPU 上进行训练,因此数据也必须放到对应的 GPU 中。
  7. 使用 dist.all_reduce 来在所有进程间同步数据,比如计算全局的平均损失。
  8. 仅在主进程( local_rank == 0 )打印日志,避免重复输出。
  9. 如果需要保存模型(上述示例中未包含保存模型的代码),只需在主进程上使用 model.module.state_dict() 来获取模型参数。其中 model.module 是因为 DDP 包装后的模型,实际的模型在 module 属性中。
  10. 训练结束后调用 dist.destroy_process_group() 清理分布式环境。

上述脚本有两种运行方法:

  1. torch.distributed.launch :早期启动方法,基本已弃用

    python -m torch.distributed.launch --nproc_per_node=4 train.py
  2. torchrun官方推荐的启动方式,它是对旧的 torch.distributed.launch 的替代。

    CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --nproc_per_node=4 train.py

    CUDA_VISIBLE_DEVICES 用于指定可见的 GPU 设备, --nproc_per_node 指定每个节点使用的 GPU 数量,会自动设置每个进程的 RANKLOCAL_RANK 等环境变量,一般与 CUDA_VISIBLE_DEVICES 中的 GPU 数量一致。

    对于多机多卡torchrun 命令需要指定更多参数:

    # 节点 0 
    CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --nnodes=2 --nproc_per_node=4 --node_rank=0 --master_addr="192.168.0.10"  --master_port=23456 train.py
    # 节点 1
    CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --nnodes=2 --nproc_per_node=4 --node_rank=1 --master_addr="192.168.0.10"  --master_port=23456 train.py

    其中 --nnodes=2 表示总共有 2 台机器, --nproc_per_node=4 :指定每台机器用 4 张 GPU; --node_rank 指定当前机器是第几台(从 0 开始); --master_addr 为主节点 IP(一般写 node 0 的 IP); --master_port 为主节点的通信端口。

# 补充说明

至此,DDP 的并行训练的基本流程就介绍完了,下面再介绍一些 DDP 的其他相关知识。

首先来说明 PyTorch 模型中一类特殊的参数 Buffer,它们不属于 Parameter,反向传播计算过程中也不会计算这些参数的梯度,因此 Buffer 参数是不通过梯度进行更新的。常见的 Buffer 有 BatchNorm 中的 running_meanrunning_var ,它们在训练计算时直接由计算规则更新。

那么存在一个问题,对于 DDP 来说,每个模型事实上都维护了自己的 Buffer 参数,而 Buffer 参数的更新取决于当前的训练数据,它不像梯度一样会被广播,这将导致不同进程间的模型状态不一致。对于这个问题,DDP 内部的实现方法是:直接将 master 上的模型 Buffer 广播复制给其他进程。不难想到,这么做存在一个问题:如果 Buffer 依赖于数据分布,那么 master 上的 Buffer 很可能和其他进程上的 Buffer 差异较大,直接广播会导致其他进程的 Buffer 变得不准确。所以 PyTorch 提供 SyncBatchNorm 来解决这个问题。

SyncBatchNorm 中各卡先本地算出自己的 mean /var,然后通过 all_reduce 算出全局的 mean /var,最后所有卡使用这个全局的 mean /var 来正则化激活,这样就保证了各卡的 BatchNorm 统计量更准确,训练更稳定。核心代码如下:在初始化 DDP 模型之前,使用 nn.SyncBatchNorm.convert_sync_batchnorm 函数搜索 model 里面的每一个模块,将 BatchNorm 替换成 SyncBatchNorm

model = nn.SyncBatchNorm.convert_sync_batchnorm(model)
model = DDP(model, device_ids=[local_rank])

此外,对于分支模型(例如 MoE 混合专家模型)来说,不同进程上可能会激活不同的分支,这样就会导致不同进程反向传播时模型激活的参数不一样,从而导致梯度不一致。对于这种情况,默认配置的 DDP 模型会报错,提示 Expected to have gradients for all parameters 。解决方法是使用 find_unused_parameters=True 参数来初始化 DDP 模型:

model = DDP(model, device_ids=[local_rank], find_unused_parameters=True)

接下来我们探讨 Gradient Accumulation 的情况。在单卡情况下,将几个小批次的 loss 累计起来,等到累计到一定数量后再进行反向传播和参数更新,这样就相当于使用了更大的批次进行训练。对于单卡来说,代码大致是这样的:

optimizer.zero_grad()
for i, (feature, label) in enumerate(data_loader):
    output = model(feature)
    loss = loss_fn(output, label) / accumulation_steps  # 归一化
    loss.backward()  # 累计梯度
    if (i + 1) % accumulation_steps == 0:
        optimizer.step()  # 更新参数
        optimizer.zero_grad()  # 清空梯度

但是对于 DDP 来说, loss_fn 内部会进行 all_reduce 操作来同步梯度,因此如果直接使用上述代码,会导致每个小批次的梯度会通讯一次,这导致计算效率非常低下。正确的做法是使用 no_sync 上下文管理器来跳过中间小批次的梯度同步,只有在最后一个小批次时才进行梯度同步:

optimizer.zero_grad()
for i, (feature, label) in enumerate(data_loader):
    with model.no_sync() if (i + 1) % accumulation_steps != 0 else contextlib.nullcontext():
        output = model(feature)
        loss = loss_fn(output, label) / accumulation_steps  # 归一化
        loss.backward()  # 累计梯度
    if (i + 1) % accumulation_steps == 0:
        optimizer.step()  # 更新参数
        optimizer.zero_grad()  # 清空梯度

Gradient Accumulation

最后再来讨论 DDP 代码中的一个注意事项,也是易错点,即不同进程间随机种子的设定。一般来说,不同进程为了保证状态统一一般会考虑使用相同的随机种子,比如数据 Sampler 在每个 epoch 会使用相同的种子(epoch)进行划分。但是有个情况不应该使用相同的随机种子,那就是每个进程各自需要增强 / 采样数据的时候。比如在图像处理中,每个进程可能会从数据集中采样图像进行增强(旋转,擦除等),亦或者在强化学习中不同进程需要各自与环境交互获取经验,这些情况下,如果进程的随机种子设置一样,得到的数据也相同,那数据的利用会变得不均匀,导致训练效果变差。

下面写一个统一的管理随机种子的函数:

def setup_seed(base_seed: int, rank: int):
    seed = base_seed + rank
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)

其中 base_seed 是基本的种子,一般是脚本传入的参数, rank 对应不同进程编号。当我们需要使用不同的随机数时,传入进程的 rank ,当需要使用相同的随机数时,直接传入 rank=0 。不过一般来说,不同进程直接使用不同的随机种子不会对其他部分产生影响,这也是推荐做法。


# Accelerate

Accelerate 是一个由 Hugging Face 提供的高效工具库,意在简化 PyTorch 模型在各种设备和分布式配置上的训练与推理。通过少量代码即可实现分布式训练、混合精度训练,以及与 DeepSpeed 和 FSDP 的集成。

本文仅介绍核心的基本的使用方法,一些高级功能如 DeepSpeed、Megatron-LM、FSDP 的集成和细节请参考官方文档

相比于 DDP 需要管理分布式环境、进程间通信等细节,Accelerate 提供了更高层次的抽象,用户只需关注模型和训练逻辑,而不必处理底层的分布式实现细节。相比于 DDP,Accelerate 有如下特点:

  • 不必手动设置 torch.distributed
  • 自动包装 DDP DistributedDataParallel
  • 自动处理 DataLoaderDistributedSampler
  • 支持梯度累积、混合精度 (FP16/ BF16)。

下面来看 Accelerate 相关代码。

首先定义数据加载器 get_acc_dataset_loader ,其实现方式与 DP 几乎一致,不同的是这里的 batch_size 指的是每个进程处理的样本数量

from torch.utils.data import DistributedSampler
def get_acc_dataset_loader():
    data = np.load("data.npz")
    feature = data["feature"]
    label = data["label"]
    feature = torch.tensor(feature, dtype=torch.float32).permute(0, 3, 1, 2) / 255.0
    label = torch.tensor(label, dtype=torch.long)
    dataset = torch.utils.data.TensorDataset(feature, label)
    data_loader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True, num_workers=2)
    return data_loader

接下来写训练函数 train_acc :

def train_acc():
    accelerator = accelerate.Accelerator()  # 初始化 Accelerator
    model = get_model()
    data_loader = get_acc_dataset_loader()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    loss_fn = torch.nn.CrossEntropyLoss()
    # 使用 accelerator 准备模型、优化器和数据加载器
    model, optimizer, data_loader = accelerator.prepare(model, optimizer, data_loader)
    model.train()
    for epoch in range(100):
        epoch_loss = 0.0
        num_batches = 0
        for feature, label in data_loader:
            # 将数据转到正确设备
            feature = feature.to(accelerator.device)
            label = label.to(accelerator.device)
            output = model(feature)
            loss = loss_fn(output, label)
            optimizer.zero_grad()
            accelerator.backward(loss)  # 使用 accelerator 的 backward 方法
            optimizer.step()
            epoch_loss += loss.item()
            num_batches += 1
        epoch_loss /= num_batches
        # 多卡聚合 loss
        epoch_loss_tensor = torch.tensor(epoch_loss).to(accelerator.device)
        accelerator.gather(epoch_loss_tensor)
        if accelerator.is_main_process:
            print(f"Epoch {epoch} - Global Loss: {epoch_loss_tensor.mean().item():.6f}")
    dist.destroy_process_group()  # 清理分布式环境

由于 Accelerate 本质是对 DDP 的更高一级的封装,所以一些使用细节和 DDP 代码是类似的,总结如下:

  1. 使用 accelerate.Accelerator() 初始化对象。
  2. 使用 accelerator.prepare() 准备模型、优化器和数据加载器
  3. 训练过程中,每个进程都在对应的 GPU 上进行训练,因此数据也必须放到对应的 GPU 中(使用 accelerator.device 获取当前进程分配的 GPU)。
  4. 使用 accelerator.backward(loss) 计算梯度,内部自动完成梯度同步。
  5. 仅在主进程打印日志,避免重复输出。Accelerate 提供了 accelerator.is_main_process 来判断当前进程是否为主进程。
  6. 如果需要保存模型(上述示例中未包含保存模型的代码),只需在主进程上使用 accelerator.save(model.state_dict(), path) 来保存模型参数。
  7. 训练结束后调用 dist.destroy_process_group() 清理分布式环境。

上述代码通过下面脚本运行:

accelerate config  # 第一次使用需要配置
CUDA_VISIBLE_DEVICES=0,1 accelerate launch --num_processes=2 train.py

如果第一次使用 Accelerate 库,需要先进行配置,生成对应的配置文件,之后就不再需要执行配置命令。对于多卡训练,使用 –num_processes 指定使用的 GPU 数量,一个进程对应一个 GPU。如果需要指定使用哪些 GPU,和 DDP 类似,就使用 CUDA_VISIBLE_DEVICES 环境变量来声明。要注意与 --num_processes 保持一致。

Accelerate 实现了更高层次的封装,优点在于高效和简洁,但缺点也很明显,就是不方便自定义设置,特别是某些特定场景下无法使用封装接口的情况(比如强化学习)。


# PyTorch 分布式 API

前面的各种训练方案,许多实现细节都被封装起来了,但它们内部都使用了 torch.distributed 的 API 来实现分布式训练。为方便查缺补漏,接下来介绍一些常用的 PyTorch 分布式 API。

# 初始化分布式环境与清理

标准的初始化分布式环境和清除函数:

import torch
import torch.distributed as dist
import os
def setup():
    local_rank = int(os.environ['LOCAL_RANK'])
    torch.cuda.set_device(local_rank)
    dist.init_process_group(
        backend='nccl',          # GPU 上常用 'nccl',CPU 可用 'gloo'
        rank=int(os.environ['RANK']),
        world_size=int(os.environ['WORLD_SIZE']),
        device_id=torch.device(f"cuda:{local_rank}")
    )
def cleanup():
    dist.destroy_process_group()

setup 函数在分布式训练开始时调用, cleanup 函数在训练结束时调用。

# 分布式变量

与分布式进程有关的变量核心有四个,对应的获取方法见下表。

变量名 描述 获取 API
rank 全局进程 ID,唯一标识每个进程 os.environ["RANK"]dist.get_rank()
local_rank 当前节点内的进程编号 os.environ["LOCAL_RANK"]
world_size 全局参与进程数 os.environ["WORLD_SIZE"]dist.get_world_size()
node_rank 当前机器节点编号 os.environ["NODE_RANK"]

上述变量获取后记得用 int 强转类型。

# 常用通信 API

PyTorch 的 torch.distributed 提供了丰富的通信原语,常用于参数同步、统计指标计算、分布式采样等。

send/recv:最基础的点对点通信方式,两个进程之间传输数据,一般很少直接使用。

dist.send(tensor, dst=1)  # 将 tensor 发送到 rank=1
dist.recv(tensor, src=0)  # 从 rank=0 接收 tensor

阻塞式通信,必须配对使用,否则程序会卡住。

all_reduce:所有进程的向量进行运算,都拿到同样的结果。 op 有其他操作算子,不过最常见的是累加和平均。

dist.all_reduce(tensor, op=dist.ReduceOp.SUM)  # 或 .AVG

reduce:和 all_reduce 类似,但结果只给一个进程

dist.reduce(tensor, dst=0, op=dist.ReduceOp.SUM) # 在 rank0 汇总

broadcast:从一个进程复制向量到所有进程。

dist.broadcast(tensor, src=0) # rank0 想其他进程广播

all_gather:所有进程聚集所有数据,然后每个卡都拿到完整副本。

# all_gather 每个 GPU 收到完整数据列表
tensor_list = [torch.zeros_like(tensor) for _ in range(world_size)]
dist.all_gather(tensor_list, tensor)

gather:所有进程发送数据到目标进程,只有它接收。

# gather 只有 rank0 收到
if rank == 0:
    gather_list = [torch.zeros_like(tensor) for _ in range(world_size)]
else:
    gather_list = None
dist.gather(tensor, gather_list, dst=0)

barrier:等待所有进程都执行到此位置。

dist.barrier()