Megatron源码解读(不再着重理论分析,针对代码细节解读)

这里放置了一个CodeGeeX模型的结构图,可以看到这里采用的是与GPT类似的Only-Decoder的架构,以CodeGeeX模型为基础研究源码,codegeex采用的是8头TP,192头DP,共1536块GPU进行训练,采用的框架是Megatron+DeepSpeed ZeRo2。
1.预训练入口 pretrain_gpt.py
函数调用关系图
主程序入口
└── pretrain()
├── train_valid_test_datasets_provider() → 构建数据集
├── model_provider()
│ ├── CodeGeeXModel / CodeGeeXModelPipe → 构建模型
│ └── get_batch_pipe (引用) → 管道并行批次处理
└── forward_step()
├── get_batch() → 获取批次数据
├── model() → 模型前向传播
└── loss_func() → 计算损失
1.1model_provider
构建 CodeGeeX 模型实例,使用DeepSpeed ZeRO 初始化,包含非PP的model CodeGeeXModel和PPmodel CodeGeeXModelPipe。构建casual attention mask(下三角矩阵)供训练使用。可以通过model_path加载模型权重。
def model_provider(pre_process=True, post_process=True):
...
with deepspeed.zero.Init(
data_parallel_group=mpu.get_data_parallel_group(),
remote_device=None if args.remote_device == "none" else args.remote_device,
config_dict_or_path=args.deepspeed_config,
enabled=args.zero_stage == 3,
mpu=mpu,
):
attention_mask = torch.tril(
torch.ones(
(1, args.seq_length, args.seq_length),
device=torch.cuda.current_device(),
)
).view(1, 1, args.seq_length, args.seq_length)
# Convert attention mask to binary:
attention_mask = attention_mask < 0.5
if args.fp16:
attention_mask = attention_mask.half()
elif args.bf16:
attention_mask = attention_mask.bfloat16()
# Attention mask must be bool.
args.attn_mask = attention_mask.to(torch.bool)
if args.load_state is not None:
timers = get_timers()
print_rank_0("Loading warmstarting model states ...")
timers("load-model-states").start()
mp_rank = mpu.get_tensor_model_parallel_rank()
if os.path.isdir(args.load_state):
model_path = os.path.join(
args.load_state, "mp_rank_{:02d}_model_states.pt".format(mp_rank)
)
else:
model_path = args.load_state
print_rank_0(f"Loading model from {model_path} ...")
state_dict = torch.load(model_path, map_location="cpu")
if "module" in state_dict:
state_dict = state_dict["module"] # strip other client states
model.load_state_dict(state_dict)
2. get_batch
使用数据迭代器获取一个批次的数据,并且使用broadcast在多进程间广播数据。
def get_batch(data_iterator):
...
if data_iterator is not None:
data = next(data_iterator)
else:
data = None
data_b = mpu.broadcast_data(keys, data, datatype)
tokens_ = data_b["input_ids"].long()
labels = tokens_[:, 1:].contiguous()
tokens = tokens_[:, :-1].contiguous()
attention_mask, loss_mask, position_ids = get_ltor_masks_and_position_ids(
tokens,
tokenizer.eod,
args.reset_position_ids,
args.reset_attention_mask,
args.eod_mask_loss,
)
考虑 EOD(End of Document)标记的处理,来动态生成casual attention mask,因为通常训练时,会将多个句子拼接如下:
位置: 0 1 2 3 4 5 6 7 8 9
Token: A B C EOD D E F EOD G H
文档: [文档1] [文档2] [文档3]
如果直接用固定的mask,会导致D能看到前面A,B,C即文档1的内容,然后文档之间的数据应该是各自独立的,因此需要根据eod来设置casual attention mask,如下:
注意力掩码:
0 1 2 3 4 5 6 7 8 9
0 [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
1 [1, 1, 0, 0, 0, 0, 0, 0, 0, 0]
2 [1, 1, 1, 0, 0, 0, 0, 0, 0, 0]
3 [1, 1, 1, 1, 0, 0, 0, 0, 0, 0] ← EOD
4 [0, 0, 0, 0, 1, 0, 0, 0, 0, 0] ← D 只能看到自己!
5 [0, 0, 0, 0, 1, 1, 0, 0, 0, 0] ← E 只能看到 D, E
6 [0, 0, 0, 0, 1, 1, 1, 0, 0, 0]
7 [0, 0, 0, 0, 1, 1, 1, 1, 0, 0] ← EOD
8 [0, 0, 0, 0, 0, 0, 0, 0, 1, 0] ← G 只能看到自己!
9 [0, 0, 0, 0, 0, 0, 0, 0, 1, 1] ← H 只能看到 G, H
loss_mask则是用来将eod位置的token标记为0,表示不参与loss计算,其它位置的token都标记为1参与运算。
3. get_batch_pipe(data)
在pipeline parallel下使用,直接接收已获取的数据(data),而不是数据迭代器,DeepSpeed PipelineEngine会自动管理数据,从数据迭代器获取数据,并传递给get_batch_pipe。
def get_batch_pipe(data):
...
data_b = mpu.broadcast_data(keys, data, datatype)
# Unpack.
tokens_ = data_b["input_ids"].long()
labels = tokens_[:, 1:].contiguous()
tokens = tokens_[:, :-1].contiguous()
return (tokens, position_ids, attention_mask), (labels, loss_mask)
4. loss_func(loss_mask, output_tensor)
用于计算损失,并且会对dp的loss做reduce。
def loss_func(loss_mask, output_tensor):
losses = output_tensor.float()
loss_mask = loss_mask.view(-1).float()
loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
# Reduce loss for logging.
averaged_loss = average_losses_across_data_parallel_group([loss])
return loss, {"lm loss": averaged_loss[0]}
5. forward_step(data_iterator, model)
执行前向传播,返回输出张量和部分应用的损失函数(使用 partial 绑定 loss_mask)
def forward_step(data_iterator, model):
"""Forward step."""
args = get_args()
timers = get_timers()
# Get the batch.
timers("batch-generator").start()
tokens, labels, loss_mask, attention_mask, position_ids = get_batch(data_iterator)
timers("batch-generator").stop()
output_tensor = model(tokens, position_ids, attention_mask, labels=labels)
return output_tensor, partial(loss_func, loss_mask)
6. train_valid_test_datasets_provider(train_val_test_num_samples)
数据集构建:
- 调用
build_train_valid_test_datasets函数:- 使用配置的数据路径、数据实现方式、分割比例等参数
- 设置序列长度、随机种子等
- 控制内存映射预热(mmap_warmup)
- 打印数据集构建进度信息
def train_valid_test_datasets_provider(train_val_test_num_samples):
...
train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
train_valid_test_num_samples=train_val_test_num_samples,
seq_length=args.seq_length,
seed=args.seed,
skip_warmup=(not args.mmap_warmup),
)
7.主程序入口
Megatron-LM预训练的核心入口是pretrain函数。
if __name__ == "__main__":
pretrain(
train_valid_test_datasets_provider,
model_provider,
ModelType.encoder_or_decoder,
forward_step,
args_defaults={'tokenizer_type': 'GPT2BPETokenizer'},
)
8.pretrain函数
def pretrain(
train_valid_test_dataset_provider, # 数据集提供函数:根据数据集大小返回训练/验证/测试数据集
model_provider, # 模型提供函数:返回一个基础的模型实例(CPU上,无fp16或ddp)
forward_step_func, # 前向传播函数:接收数据迭代器和模型,返回损失值和监控信息字典
valid_forward_step_func=None, # 验证前向传播函数(可选)
extra_args_provider=None, # 额外参数提供函数:用于添加自定义命令行参数
args_defaults={}, # 参数字典:用于设置已解析的参数默认值
):
"""
主训练程序。
此函数将按以下顺序执行:
1) 初始化 Megatron。
2) 使用 model_provider 设置模型、 优化器和学习率调度器。
3) 调用 train_val_test_data_provider 获取训练/验证/测试数据集。
4) 使用 forward_step_func 训练模型。
参数说明:
train_valid_test_dataset_provider: 接收训练/验证/测试数据集大小的函数,
返回 `train, valid, test` 数据集。
model_provider: 返回模型基础版本的函数。基础版本指在CPU上的简单模型,
没有fp16或ddp。
forward_step_func: 接收 `数据迭代器` 和 `模型` 的函数,返回一个 `损失` 标量
和一个字典,字典的键值对是我们希望在训练期间监控的信息 ,例如
`lm-loss: value`。我们还要求此函数将 `batch generator` 添加到计时器类中。
extra_args_provider: 接收解析器并向其添加参数的函数。
用于程序添加自己的参数。
args_defaults: 从参数名到参数值的字典。用于设置已解析的参数。
"""
pretrain 函数是 CodeGeeX 预训练流程的核心协调函数,它按照固定的顺序执行以下任务:
1.初始化megatron框架,加载deepspeed配置,并且构建模型实例,优化器,以及学习率调度器。
2.根据是否使用PP,构建训练、验证、测试的数据
这里主要通过initialize_megatron来初始化megatron的分布式环境,在initialize_megatron中主要调用_initialize_distributed()方法对分布式环境进行初始化,如下:
def _initialize_distributed():
"""Initialize torch.distributed and mpu.
| Node1 | Node2 |
____________| p1 | p2 | p3 | p4 |
local_rank | 0 | 1 | 0 | 1 |
rank | 0 | 1 | 2 | 3 |
node: 物理结点,1台机器或者1个容器。图中2个物理结点
rank:进程在全局上的序号。图中4个进程
local_rank:进程在node上的序号。
torch.cuda.device_count():当前进程所在的node上可使用的GPU的数量
device:GPU在某个node上的编号
该函数作用:
1、设置分布式环境:初始化进程,分配GPU,并设置进程大组(group)
2、制定DP/TP/PP分组策略,设置进程子组(subgroup)
3、设置DeepSpeed ZeRO-R,对activation进行优化
"""
args = get_args()
device_count = torch.cuda.device_count() # 当前进程所在的node上可使用的GPU的数量
if torch.distributed.is_initialized(): # 如果已创建好分布式环境
if args.rank == 0: # 在0号进程上打印出“创建完毕”的日志
print(
"torch distributed is already initialized, "
"skipping initialization ...",
flush=True,
)
args.rank = torch.distributed.get_rank() # 取得当前进程的全局序号
args.world_size = torch.distributed.get_world_size() # 取得全局进程的个数
else: # 如果未创建好分布式环境
if args.rank == 0:
print("> initializing torch distributed ...", flush=True)
# 1. 初始化进程,分配GPU,并设置进程大组(group)
if device_count > 0:
device = args.rank % device_count # 1块进程1个GPU。device为GPU编号。例如图例中的进程9,其所在机器上有8块卡。因此进程9使用的gpu编号为8%9=1
if args.local_rank is not None:
assert (
args.local_rank == device
), "expected local-rank to be the same as rank % device-count."
else:
args.local_rank = device
if args.force_device is not None:
print(
f" > forcefully set the device to {args.force_device}, originally {device}"
)
device = args.force_device
torch.cuda.set_device(device) # 为当前进程分配GPU
# 设置进程大组
init_method = "tcp://"
master_ip = os.getenv("MASTER_ADDR", "localhost") # 获取rank=0进程的ip
master_port = os.getenv("MASTER_PORT", "6000") # 获取rank=0进程的端口
init_method += master_ip + ":" + master_port
print(
f" > (rank={args.rank}) initializing process group: "
f"world_size={args.world_size} "
f"backend={args.distributed_backend} "
f"init_method={init_method}",
flush=True,
)
timeout = datetime.timedelta(minutes=args.dist_timeout)
torch.distributed.init_process_group(
backend=args.distributed_backend,
world_size=args.world_size,
rank=args.rank,
init_method=init_method,
timeout=timeout
)
print(f" > (rank={args.rank}) process group initialized")
# 2、制定DP/TP/PP分组策略,设置进程子组(subgroup)
if device_count > 0:
if mpu.model_parallel_is_initialized():
print("model parallel is already initialized")
else:
mpu.initialize_model_parallel( # megatron/mpu/initialize.py
args.tensor_model_parallel_size,
args.pipeline_model_parallel_size,
args.virtual_pipeline_model_parallel_size,
)
# 设置DeepSpeed ZeRO-R,对activation进行优化
if args.deepspeed and args.deepspeed_activation_checkpointing:
setup_deepspeed_random_and_activation_checkpointing(args)
总的来说上述代码实现了3个目的:
1.设置分布式环境,初始化进程,分配GPU,设置进程大组(一整个混合并行策略组)
2.指定DP,TP,PP分组策略,设置进程子组
3.设置DeepSpeed ZeRo-R,对activation进行优化。
torch.distributed.init_process_group函数实现了设置进程大组的功能,主要由以下几个概念组成:
backend::即后端。本质上是定义IPC通信机制,对数据实现reduce,gather,broadcast等通信操作。取值有gloo,nccl等。简单来讲可以在使用CPU时选择gloo,使用GPU时,使用nccl。
world_size: 即整个训练项目的所有GPU数量,比如有2台机器,每台4张卡,则最终world_size就是8.
rank: 即每个卡的全局编号, 比如2台机器,每台4张卡,则rank编号会从0编号到7,即机器之间也是连续编号。
**init_method:**这个参数指明一个地址,进程组间的进程通过该低质存放的信息进行交流,主要包括:哪些进程应该通信,以及各进程计算进度。比如DP组的两张卡需要做allreduce,则一张卡计算完,可以去这个地址找自己应该与谁通信,并查看当前与自己通信的卡是否计算完毕。为了避免冗余,信息一般只存一份,即存在rank0的进程上。
**time_out:**每个进程间计算速度不一致,因此需要设置一个最大等待时间,不能无限等待。
与之对应的还有一些概念:
**local_rank:**即在本机上的编号,比如2台机器,每台4张卡,每天机器分别有0~3的local_rank编号。

如图为PP,DP,TP并行,其中MP其实是由TP+PP共同决定,一组MP其实就表示一组完整的模型参数,我们知道了MP的数量就可以确定DP的数量,这也是为什么设定一个MP的概念。
TP、DP和PP的设定原则,因为三者的通信量一般而言TP>DP>PP,TP backward的时候需要allreduce得到全局梯度,才能继续下一层的计算,而DP每一层backward是没有依赖的,而PP只有在每一个chunk计算完后,才需要通信将结果给到下一个pp_stage,所以尽量将TP,DP不跨机,而PP跨机。
**DeepSpeed ZeRO-R:**这个主要是针对TP的优化,而不是DP,因为经过TP处理后的输出,会使用allreduce得到全局输出,此时TP rank上都会有同一份activation,这时候的DeepSpeed的处理是将其分片保存,需要时,再allgather回来。
9.MPU(Model Parallel Unit)
mpu.initialize_model_parallel(
args.tensor_model_parallel_size,
args.pipeline_model_parallel_size,
args.virtual_pipeline_model_parallel_size,
args.pipeline_model_parallel_split_rank,
...
)
MPU的核心是Megatron-LM分布式训练的调度员,负责组织和管理不同并行方式的GPU进程组,mpu.initialize_model_parallel()就是在创建并行训练的基本结构。
因为在torch.distributed里,默认情况下,GPU进程都是平等的,但在Megatron-LM里,GPU进程角色不同,可以有数据并行组,张量并行组,流水线并行组,Megatron-LM需要手动创建这些并行组,并且分别给予不同的工作,mpu就是用来管理这些并行组的初始化、查询和使用的。
那么initialize_model_parallel具体做了什么呢?
def initialize_model_parallel(
tensor_model_parallel_size_=1,
pipeline_model_parallel_size_=1,
virtual_pipeline_model_parallel_size_=None,
):
"""
Initialize model data parallel groups.
Arguments:
tensor_model_parallel_size: number of GPUs used to parallelize model tensor.
pipeline_model_parallel_size: number of GPUs used to parallelize model pipeline.
Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we
use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
the model pipeline. The present function will
create 8 tensor model-parallel groups, 4 pipeline model-parallel groups
and 8 data-parallel groups as:
8 data_parallel groups:
[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
8 tensor model-parallel groups:
[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
4 pipeline model-parallel groups:
[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]
2 model-parallel group:
[g0, g1, g4, g5, g8, g9, g12, g13], [g2, g3, g6, g7, g10, g8, g14, g15]
Note that for efficiency, the caller should make sure adjacent ranks
are on the same DGX box. For example if we are using 2 DGX-1 boxes
with a total of 16 GPUs, rank 0 to 7 belong to the first box and
ranks 8 to 15 belong to the second box.
"""
if torch.distributed.get_rank() == 0:
print(
"> initializing tensor model parallel with size {}".format(
tensor_model_parallel_size_
)
)
print( # 打印出流水线模型并行的度
"> initializing pipeline model parallel with size {}".format(
pipeline_model_parallel_size_
)
)
# Get world size and rank. Ensure some consistencies.
assert torch.distributed.is_initialized() # 确保torch已经做了分布式初始化
world_size = torch.distributed.get_world_size() # 得到全局进程的总数
tensor_model_parallel_size = min(tensor_model_parallel_size_, world_size)
pipeline_model_parallel_size = min(pipeline_model_parallel_size_, world_size)
ensure_divisibility( # 后者表示一个完整模型所占的gpu数,我们要保证前者能被后者整除
world_size, tensor_model_parallel_size * pipeline_model_parallel_size
)
# 在codegeex中,TP_size=8, PP_size=1,world_size = 1536,因此DP_size是1536/(8*1) = 192
data_parallel_size = world_size // ( # 根据TP_size和PP_size,求出DP_size
tensor_model_parallel_size * pipeline_model_parallel_size
)
num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size # TP的组数
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size # PP的组数
num_data_parallel_groups = world_size // data_parallel_size # DP的组数
if virtual_pipeline_model_parallel_size_ is not None:
global _VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK
global _VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE
_VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = 0
_VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = (
virtual_pipeline_model_parallel_size_
)
rank = torch.distributed.get_rank() # 获取当前进程的全局rank
# Build the data-parallel groups.(设置DP组)
global _DATA_PARALLEL_GROUP # 保存DP组,如[[0,2], [1,3]...],数字表示进进程的全局序号
assert _DATA_PARALLEL_GROUP is None, "data parallel group is already initialized"
all_data_parallel_group_ranks = []
for i in range(pipeline_model_parallel_size):
start_rank = i * num_pipeline_model_parallel_groups
end_rank = (i + 1) * num_pipeline_model_parallel_groups
for j in range(tensor_model_parallel_size):
ranks = range(start_rank + j, end_rank, tensor_model_parallel_size)
all_data_parallel_group_ranks.append(list(ranks))
group = torch.distributed.new_group(ranks) # 设置DP组
if rank in ranks:
_DATA_PARALLEL_GROUP = group
# Build the model-parallel groups.(设置MP组)
global _MODEL_PARALLEL_GROUP # 保存MP组
assert _MODEL_PARALLEL_GROUP is None, "model parallel group is already initialized"
for i in range(data_parallel_size):
ranks = [
data_parallel_group_ranks[i]
for data_parallel_group_ranks in all_data_parallel_group_ranks
]
group = torch.distributed.new_group(ranks) # 设置MP组
if rank in ranks:
_MODEL_PARALLEL_GROUP = group
# Build the tensor model-parallel groups.(设置TP组)
global _TENSOR_MODEL_PARALLEL_GROUP # 保存TP组
assert (
_TENSOR_MODEL_PARALLEL_GROUP is None
), "tensor model parallel group is already initialized"
for i in range(num_tensor_model_parallel_groups):
ranks = range(
i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size
)
group = torch.distributed.new_group(ranks) # 设置TP组
if rank in ranks:
_TENSOR_MODEL_PARALLEL_GROUP = group
# Build the pipeline model-parallel groups and embedding groups
# (first and last rank in each pipeline model-parallel group).(设置PP组与embedding组)
global _PIPELINE_MODEL_PARALLEL_GROUP # 设置PP组
global _PIPELINE_GLOBAL_RANKS
assert (
_PIPELINE_MODEL_PARALLEL_GROUP is None
), "pipeline model parallel group is already initialized"
global _EMBEDDING_GROUP
assert _EMBEDDING_GROUP is None, "embedding group is already initialized"
for i in range(num_pipeline_model_parallel_groups):
ranks = range(i, world_size, num_pipeline_model_parallel_groups)
group = torch.distributed.new_group(ranks) # 设置PP组
if rank in ranks:
_PIPELINE_MODEL_PARALLEL_GROUP = group
_PIPELINE_GLOBAL_RANKS = ranks
# Setup embedding group (to exchange gradients between
# first and last stages).
if len(ranks) > 1:
embedding_ranks = [ranks[0], ranks[-1]]
else:
embedding_ranks = ranks
group = torch.distributed.new_group(embedding_ranks) # 设置embedding组
if rank in embedding_ranks:
_EMBEDDING_GROUP = group
上面的代码通过torch.distributed.new_group在进程大组下创建子组,即tp,dp,pp的group。
注意除了TP,DP,PP的group,还有一个embedding_group,为什么会有这个?其实因为在计算完梯度,更新embedding权重前,输入和输出层需要进行通信,因为输入和输出共用一份权重,需要将它们的grad进行allreduce。
那么混合并行时,每张GPU的全局索引怎么算的呢?可以抽象为一个高维的坐标系(其实跟二维矩阵求索引是类似的):
其中:
R:全局 rank
T:张量并行索引
D:数据并行索引
P:流水线并行索引
分别是张量、数据、流水线并行的进程数量。
2.模型并行实现细节
1.模型切割设计思想
在megatron中模型切割有两套方案:
1.方案一:先定义出完整的模型,并对模型参数做初始化,然后根据进程id取出相应子模型,搬运到GPU上
2.方案二:直接根据进程id,设计好当前子模型,做参数初始化,搬运到GPU上
这两者的核心差别,在于“随机种子”的设定。
在分布式训练中,随机种子是非常重要的,它关系到模型是否能够复现。例 如我们采取activation checkpoint的技术来节省显存时,在backward过程中我们需要重算forward得到activation,这时候就需要我们完整复现之前forward的过程,各类参数的初始化结果也要和之前完全一致。例如TP时,若两个TP_rank分别初始化一部分参数,则需要使用不同的随机种子,否则两部分参数随机初始化后的结果一样,这不等价于将一整块参数随机初始化再分割。一般在TP/PP组内,设定不同的随机种子。而在DP组内,设定相同的随机种子。
2.定义并搬运模型(get_model)

1.在CPU上定义模型。pytorch默认在CPU上定义模型(nn.Module)。model_provider 是一个函数,调用它即可返回CPU版的模型,也就是一个CodeGeeX类。
2.把模型从CPU搬运至GPU上。这里有两种方法可供选择:
方案一:借助deepspeed进行管理。
方案二:手动搬运管理。:
-
- 显式搬运。即手动将模型搬运到当前进程所对应的GPU上
- 权重精度设定。由ZeRO的思想可知,在模型训练中,把权重精度从fp32降至fp16,是一种节省显存的好办法。如果确定使用这种优化办法,将模型搬运到GPU上后,我们需要修改精度。
- 初始化DP组