分布式通信与并行策略
1.通信方式
1.1 Broadcast

1.2 AllGather

1.3 AllReduce

1.4 Reduce

1.5 ReduceScatter



目前比较常用的就是一个ring结构;reduceScater遵循以下规则,假设有n 个rank,首先会将每个rank上的tensor,切分成个块,紧接着将其按照如下规则,每一个时间步,都按以下规则发送和接收块tensor,并且接收的块做reduce,最终rank_i得到了第(rank_i+1)%num_ranks的结果。
规则:
第 k 步 (k 从 0 到 N-2, 这里 N=4):
- 发送:将本地缓冲区的第
(rank - k) % N块发送给下一个邻居(rank+1)。 - 接收:从上一个邻居(
rank-1)接收一个数据块,并把这个数据块与本地缓冲区的第(rank - k - 1) % N块相加(Reduce)。
1.6 总结

这里怎么理解AllReduce = ReduceScatter + AllGather,主要是因为ReduceScatter 把每张卡的值汇总起来,做reduce(可能是sum,average等),然后把结果分成rank个sub_result部分,分别给每个rank,而AllGather再把这rank个sub_result给聚合起来到每个rank上,则每个rank都有完整的result。其机制就跟直接做allreduce一样,对每个rank的数据汇总做reduce,然后把result分到每个rank上。
reduce_scatter使得每个rank_i获得了第(rank_i+1)%num_ranks的结果。这是需要n-1步,然后all gather需要把每张卡上reduce的结果,分发到其它卡,再走n-1步,所以一共的通信步是2*(n-1)。
2. Paddle三类通信区分动手,动半,静态全自动

同时,在动手的案例中不会出现dist_tensor,shard这些东西,因为这是动半特有的,对通信进行标记使用shard,而tensor被shard处理后,即变成了dist_tensor。
3.分布式并行方式
分布式并行方式包括:数据并行(DP),张量并行(tp),流水并行(pp),专家并行(ep),序列并行(sp),上下文并行(Context Parallel)。
3.1 基础并行层
3.1.1 ColumnParallelLayer与RowParallelLayer同时使用的关系
ColumnParallelLayer

RowParallelLayer

可以看到,RowParallelLayer在计算的过程中,需要把输入拆分成两列分别在两张卡上做计算,最终两张卡都得到Parital状态的数据,而如果上一层是ColumnParallel则其计算的结果刚好分配到两个设备上(即结果被按列切分),而此结果正是RowParallelLayer需要的输入,那么就无需做通信,直接继续计算最后再做allreduce即可。
3.1.2 ColumnParallelLayer与RowParallelLayer的w和bias的切分

注意,在做y=x*W^T+b的计算时,首先乘积得到的数据是[batchsize,output_size],每一行表示一个数据,而bias是分别和每一行相加,因此bias是一个一维的向量,因此,当W按列切分时,bias需要按行切分,从而保持正确的计算关系。
当添加了bias的时候,做RowParallelLayer和ColumnParallelLayer情况如下:
RowParallelLayer:

RowParallelLayer只切w,不切bias
ColumnParallelLayer:

ColumnParallelLayer切w的axis=1,切bias的axis=0
3.1.3 Vocab Parallel Embedding
Vocab Embedding
示例:
文本输入
用户输入: "Hello world, how are you?"
分词(Tokenization)
分词结果: ["Hello", "world", ",", "how", "are", "you", "?"]
词汇表映射(Vocabulary Mapping)
词汇表: {"<PAD>": 0, "<UNK>": 1, "<BOS>": 2, "<EOS>": 3,
"Hello": 4, "world": 5, ",": 6, "how": 7, "are": 8, "you": 9, "?": 10, ...}
映射结果: [4, 5, 6, 7, 8, 9, 10]
输入到模型为词汇ID序列
模型接收的输入: x = [4, 5, 6, 7, 8, 9, 10] (词汇ID序列)
因此,Vocab Embedding接收到的输入x是[batch_size,seq_length],即多组词汇ID序列。Vocab Embedding的大小为(vocab_size,hidden_dim),vocab_size这个维度即表示了所有token的ID,根据token_id找到对应的行,该行所有的列则表示当前这个token的向量化表示,即包含其数学维度信息的特征向量。初始时,这些数据是没有意义的,经过训练后,则可以表示每个词的空间特征,越相近的词,在空间上离得越近。
根据 x,在vocab表中查找对应token的向量化表示,最终输出为[batch_size,sequence_lenth,hidden_dim]。
-216d8f25a1b93772a12f71188b17a807.png)
如图,以batch_size=1为例,查表后,通过当前输入的sequence_lenth中存在的token_ids,选择k行,最终构成[batch_size,sequence_lenth,hidden_dim]的输出。
反向的grad形状与forward输出时形状相同,而因为Vocab_Embedding的操作是从整个词表中选出k行数据,因此最后更新grad的时候,也是指有这k行数据有grad,而其它行均默认为0,若有相同token_id,则做梯度累加。
Vocab Parallel Embedding

如果是词表并行,则按sequence_lenth做切分,多路并行时,在forward阶段需要做all_reduce,因为只在本rank的词表查找,若查找到,则返回词表的这一行值,即当前token的向量化表示;若超过此范围,则返回的是默认值0。all_reduce之后,每个rank都能拿到全部token的向量化表示。反向梯度更新的时候无需通信,直接选当前rank的词表范围内的梯度进行累积,并更新词表对应行的数据即可。
3.2 数据并行。

1通信为了让数据并行模型参数保持一致,3通信是为了让梯度累加;在反向过程中,本层的∇x梯度计算完毕后,可以继续计算下一层的∇x梯度,而不需要等待∇w的梯度计算,当对∇x梯度做allreduce通信时,可以做∇w的梯度计算;而做∇x梯度的计算时,又可以做∇w的allreduce通信,从而达到通信与计算重叠。
注意: 数据并行分两种,上面是DDP,即Distribute_DP是基于多进程实现的,还有一种是DP,是单进程多线程实现,如下图所示,它使用一个进程来计算模型权重。

注意: 二者效果是等价的,因为假设我 们DP实现batchsize为64的数据量,对于DP来说,它会在计算loss时,汇总64条数据的output,然后和lable计算出loss,并得到一个平均的loss,再传给各GPU进行反向传播,此时每个grad就是平均之后的,即除以了64的。而DDP 则分给两个GPU各自去计算梯度,每个GPU计算的梯度也是mean的,不过是除以32的,所以要再allreduce一下,两个相加除以2,即同样是除以64。
补充说明:ZeRo(DeepSpeed,微软提出) 以及FSDP(Pytorch最新数据并行方案,ZeRo3和FSDP思想等价)
Group Sharded:
(这里存在一个组shard的概念,即按param的属性,将其划分到一个group,这些param在group里面同步和通信,与其它group相对独立,可以看一下记录的Dynamic optimizer shardingV1,V2的Fused_Buffer的概念)
