DeepSeek-R1源码解读
<p>最近和开发者做了很多DeepSeek-R1模型相关的推理项目,这两天抽时间把hugging face上面的源码拉下来仔细看了一遍,在这里做一个分享。主要是解析MOE部分的代码,包括EP并行的代码实现。<br>整体结构<br>
查看hugging face上面的modeling_deepseek.py文件和config.json文件,可以发现代码结构和DeepSeek-V3是完全相同的。DeepseekV3DecoderLayer类的forward函数如下:<br>
residual = hidden_states<br>
hidden_states = self.input_layernorm(hidden_states)</p>
<h1 id="self-attention">Self Attention</h1>
<p>hidden_states, self_attn_weights, present_key_value = self.self_attn(<br>
hidden_states=hidden_states,<br>
attention_mask=attention_mask,<br>
position_ids=position_ids,<br>
past_key_value=past_key_value,<br>
output_attentions=output_attentions,<br>
use_cache=use_cache,<br>
**kwargs,<br>
)<br>
hidden_states = residual + hidden_states</p>
<h1 id="fully-connected">Fully Connected</h1>
<p>residual = hidden_states<br>
hidden_states = self.post_attention_layernorm(hidden_states)<br>
hidden_states = self.mlp(hidden_states)<br>
hidden_states = residual + hidden_states<br>
outputs = (hidden_states,)<br>
if output_attentions:<br>
outputs += (self_attn_weights,)<br>
if use_cache:<br>
outputs += (present_key_value,)<br>
return outputs<br>
这是非常标准的transformer模型结构,由input_layernorm、attention、Fully Connected部分组成。DeepSeek最大的特点是Fully Connected使用了MOE结构,也就是代码中的self.mlp,调用的是DeepseekV3MoE类。DeepseekV3MoE的forward函数如下:<br>
identity = hidden_states<br>
orig_shape = hidden_states.shape<br>
topk_idx, topk_weight = self.gate(hidden_states)<br>
hidden_states = hidden_states.view(-1, hidden_states.shape[-1])<br>
flat_topk_idx = topk_idx.view(-1)<br>
if not self.training:<br>
y = self.moe_infer(hidden_states, topk_idx, topk_weight).view(*orig_shape)<br>
if self.config.n_shared_experts is not None:<br>
y = y + self.shared_experts(identity)<br>
上面代码的核心部分是self.gate和self.moe_infer。下面我们将详细看一下这2部分。如果你对MOE的原理不熟悉,建议先看一下之前我写的这篇文章,有助于对下面内容的理解:AI布道Mr.Jin:DeepSeek模型MOE结构代码详解 。<br>
Gate源码解读<br>
Gate的作用是为输入的token选择合适的expert进行计算,并且把expert的权重也计算出来。DeepSeek-R1的gate代码如下(可以复制运行):<br>
import numpy as np<br>
import torch<br>
import math<br>
import warnings<br>
from typing import List, Optional, Tuple, Union</p>
<p>import torch.nn.functional as F<br>
import torch.utils.checkpoint<br>
from torch import nn<br>
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss<br>
import torch.distributed as dist<br>
from transformers.activations import ACT2FN</p>
<p>batch_size = 2# 输入的batch_size<br>
seq_len = 16# 输入的seq_len<br>
hidden_dim = 10# attention输出的隐藏层维度<br>
n_routed_experts = 16# 专家总数<br>
top_k = 3# 每个token选择3个专家进行计算<br>
n_group = 4# 把专家分成4组进行处理<br>
topk_group = 2# 选择得分前2的专家组进行处理</p>
<h1 id="初始化输入">初始化输入</h1>
<p>hidden_states = torch.Tensor(np.random.random((batch_size, seq_len, hidden_dim)))</p>
<h1 id="初始化gate的权重">初始化gate的权重</h1>
<p>weight = torch.Tensor(np.random.random((n_routed_experts, hidden_dim)))<br>
e_score_correction_bias = torch.Tensor(np.random.random(n_routed_experts))</p>
<p>bsz, seq_len, h = hidden_states.shape</p>
<h3 id="compute-gating-score">compute gating score</h3>
<p>hidden_states = hidden_states.view(-1, h)</p>
<h1 id="bsz-seq_len-n_routed_experts"></h1>
<p>logits = F.linear(<br>
hidden_states.type(torch.float32), weight.type(torch.float32), None<br>
)</p>
<h1 id="得到各token路由到各专家的概率">得到各token路由到各专家的概率</h1>
<p>scores = logits.sigmoid()</p>
<p>topk_method = "noaux_tc"</p>
<h3 id="select-top-k-experts">select top-k experts</h3>
<h1 id="bszseq_len-n_routed_experts"></h1>
<p>scores_for_choice = scores.view(bsz * seq_len, -1) + e_score_correction_bias.unsqueeze(0)<br>
print("scores_for_choice: ", scores_for_choice)</p>
<h1 id="对专家进行分组计算各组专家得分之和">对专家进行分组,计算各组专家得分之和</h1>
<p>group_scores = (<br>
scores_for_choice.view(bsz * seq_len, n_group, -1).topk(2, dim=-1).sum(dim = -1)<br>
)# ,n就是batch_size*seq_len<br>
print("group_scores: ", group_scores)</p>
<h1 id="每个token选择-topk_group-里面的专家作为候选专家">每个token选择 topk_group 里面的专家作为候选专家</h1>
<p>group_idx = torch.topk(<br>
group_scores, k=topk_group, dim=-1, sorted=False<br>
)[<br>
1<br>
]# <br>
print("group_idx: ", group_idx)</p>
<p>group_mask = torch.zeros_like(group_scores)# <br>
group_mask.scatter_(1, group_idx, 1)# </p>
<h1 id="获取score_maskshape为n-n_routed_experts对于每个token被选中的专家组的专家位置的值都是1否则都是0">获取score_mask,shape为,对于每个token,被选中的专家组的专家位置的值都是1,否则都是0</h1>
<p>score_mask = (<br>
group_mask.unsqueeze(-1)<br>
.expand(<br>
bsz * seq_len, n_group, n_routed_experts // n_group<br>
)<br>
.reshape(bsz * seq_len, -1)<br>
)# <br>
print("score_mask: ", score_mask)</p>
<h1 id="把没有选中的专家的分数置为-inf">把没有选中的专家的分数置为-inf</h1>
<p>tmp_scores = scores_for_choice.masked_fill(~score_mask.bool(), float("-inf"))# <br>
print("tmp_scores: ", tmp_scores)</p>
<h1 id="得到选中的专家id">得到选中的专家id</h1>
<p>_, topk_idx = torch.topk(<br>
tmp_scores, k=top_k, dim=-1, sorted=False<br>
)<br>
print("topk_idx: ", topk_idx)</p>
<p>topk_weight = scores.gather(1, topk_idx)<br>
print("topk_weight: ", topk_weight)</p>
<h3 id="norm-gate-to-sum-1">norm gate to sum 1</h3>
<p>norm_topk_prob = True<br>
routed_scaling_factor = 2.5<br>
if top_k > 1 and norm_topk_prob:<br>
denominator = topk_weight.sum(dim=-1, keepdim=True) + 1e-20<br>
topk_weight = topk_weight / denominator<br>
topk_weight = topk_weight * routed_scaling_factor # must multiply the scaling factor<br>
print(topk_idx.shape, topk_weight.shape)<br>
可以看到,上面的处理流程和我之前分享的文章中的代码很相似,只是多了一个分组操作,可以加快专家选择的速度。<br>
接下来看一下选好专家后,如何计算。<br>
MOE推理源码解读<br>
MOE推理源码如下:</p>
<h1 id="定义每个专家的结构">定义每个专家的结构</h1>
<p>moe_intermediate_size = 5<br>
class DeepseekV3MLP(nn.Module):<br>
def <strong>init</strong>(self, hidden_size=None, intermediate_size=None):<br>
super().<strong>init</strong>()<br>
self.hidden_size = hidden_size<br>
self.intermediate_size = intermediate_size</p>
<pre><code> self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False)
self.act_fn = ACT2FN["silu"]
def forward(self, x):
down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x))
return down_proj
</code></pre>
<h1 id="构建专家组">构建专家组</h1>
<p>experts = nn.ModuleList(<br>
[<br>
DeepseekV3MLP(hidden_dim, moe_intermediate_size)<br>
for i in range(n_routed_experts)<br>
]<br>
)</p>
<p>x = hidden_states# <br>
topk_ids = topk_idx# <br>
cnts = topk_ids.new_zeros((topk_ids.shape, n_routed_experts))# <br>
print("cnts: ", cnts)</p>
<h1 id="cnts记录每个token的专家路由情况">cnts记录每个token的专家路由情况</h1>
<p>cnts.scatter_(1, topk_ids, 1)# <br>
print("cnts: ", cnts)</p>
<h1 id="统计每个专家的token数量">统计每个专家的token数量</h1>
<p>tokens_per_expert = cnts.sum(dim=0)# </p>
<h1 id="按照expert编号的顺序把每个expert对应的token下标取出来">按照expert编号的顺序,把每个expert对应的token下标取出来</h1>
<p>idxs = topk_ids.view(-1).argsort()<br>
print("idxs: ", idxs)</p>
<h1 id="按照expert编号的顺序把每个expert需要处理的token特征取出来">按照expert编号的顺序,把每个expert需要处理的token特征取出来</h1>
<p>sorted_tokens = x]<br>
print("sorted_tokens: ", sorted_tokens)</p>
<p>sorted_tokens_shape = sorted_tokens.shape</p>
<h1 id="这个脚本可以在单卡上运行">这个脚本可以在单卡上运行</h1>
<p>ep_size = 1</p>
<h1 id="所有专家都放在一个卡上">所有专家都放在一个卡上</h1>
<p>experts_per_rank = n_routed_experts<br>
print("tokens_per_expert.shape: ", tokens_per_expert.shape)</p>
<h1 id="多卡ep并行场景">多卡EP并行场景</h1>
<p>if ep_size > 1:<br>
# -><br>
tokens_per_ep_rank = tokens_per_expert.view(ep_size, -1).sum(dim=1)<br>
# <br>
tokens_per_expert_group = tokens_per_expert.new_empty(<br>
tokens_per_expert.shape<br>
)<br>
dist.all_to_all_single(tokens_per_expert_group, tokens_per_expert)# tokens_per_expert_group获取的是各个rank上分给本rank的token情况<br>
# -> <br>
output_splits = (<br>
tokens_per_expert_group.view(ep_size, -1)<br>
.sum(1)<br>
.cpu()<br>
.numpy()<br>
.tolist()<br>
)<br>
# , 存储所有需要在本rank上计算的Token。<br>
gathered_tokens = sorted_tokens.new_empty(<br>
tokens_per_expert_group.sum(dim=0).cpu().item(), sorted_tokens.shape<br>
)<br>
input_split_sizes = tokens_per_ep_rank.cpu().numpy().tolist()<br>
# gathered_tokens记录了所有需要在本rank上计算的Token<br>
dist.all_to_all(<br>
list(gathered_tokens.split(output_splits)),<br>
list(sorted_tokens.split(input_split_sizes)),<br>
)<br>
# , 记录的是所有节点发送给本rank上各expert的token数量,<br>
tokens_per_expert_post_gather = tokens_per_expert_group.view(<br>
ep_size, experts_per_rank<br>
).sum(dim=0)<br>
gatherd_idxs = np.zeros(shape=(gathered_tokens.shape,), dtype=np.int32)<br>
s = 0<br>
for i, k in enumerate(tokens_per_expert_group.cpu().numpy()):<br>
# 记录每个token对应的expert编号<br>
gatherd_idxs = i % experts_per_rank<br>
s += k<br>
gatherd_idxs = gatherd_idxs.argsort()# <br>
sorted_tokens = gathered_tokens# <br>
tokens_per_expert = tokens_per_expert_post_gather# <br>
tokens_per_expert = tokens_per_expert.cpu().numpy()<br>
print("tokens_per_expert: ", tokens_per_expert)</p>
<p>outputs = []<br>
start_idx = 0<br>
ep_rank = 0</p>
<h1 id="遍历每个专家进行计算">遍历每个专家进行计算</h1>
<p>for i, num_tokens in enumerate(tokens_per_expert):<br>
end_idx = start_idx + num_tokens<br>
if num_tokens == 0:<br>
continue<br>
expert = experts<br>
tokens_for_this_expert = sorted_tokens<br>
expert_out = expert(tokens_for_this_expert)<br>
outputs.append(expert_out)<br>
start_idx = end_idx</p>
<h1 id="把所有专家的计算结果concate起来">把所有专家的计算结果concate起来</h1>
<p>outs = torch.cat(outputs, dim=0) if len(outputs) else sorted_tokens.new_empty(0)<br>
print("outs: ", outs)# </p>
<h1 id="ep并行情况下需要把其他rank上的序列token在本rank上计算的结果返回">EP并行情况下,需要把其他rank上的序列token在本rank上计算的结果返回</h1>
<p>if ep_size > 1:<br>
new_x = torch.empty_like(outs)<br>
# 把输出按照原来的顺序排列,即各rank给本rank发送的token顺序<br>
new_x = outs<br>
gathered_tokens = new_x.new_empty(*sorted_tokens_shape)<br>
dist.all_to_all(<br>
list(gathered_tokens.split(input_split_sizes)),<br>
list(new_x.split(output_splits)),<br>
)<br>
outs = gathered_tokens</p>
<p>new_x = torch.empty_like(outs)</p>
<h1 id="把outs的顺序进行重排让从上到下是按token的顺序进行排列">把outs的顺序进行重排,让从上到下是按token的顺序进行排列</h1>
<p>new_x = outs</p>
<h1 id="把每个token的多个expert处理结果进行加权求和">把每个token的多个expert处理结果进行加权求和</h1>
<p>final_out = (<br>
new_x.view(*topk_ids.shape, -1)<br>
.type(topk_weight.dtype)<br>
.mul_(topk_weight.unsqueeze(dim=-1))<br>
.sum(dim=1)<br>
.type(new_x.dtype)<br>
)<br>
print("final_out: ", final_out)<br>
上面的代码可以直接运行,模拟单卡场景(ep_size=1)的计算流程。对于ep_size>1的情况,主要有以下不同:<br>
1,在EP并行的情况下,各个rank都会收到输入序列,该序列通过gate后,部分token需要发送给其他rank上面的expert进行处理;<br>
2,各rank收到其他rank发送过来的token后,进行计算,计算完成后,需要把结果发送回原来的rank。<br>
所以和单卡场景相比,EP场景下新增了3个分布式通信行为。<br>
第一次通信行为是各个设备之间进行token数量的交换,使用的是all_to_all_single()方法。举个例子,假如EP并行度为4,在0卡、1卡、2卡、3卡上分别部署了0号专家、1号专家、2号专家和3号专家。0卡上的输入序列可能有部分token被分配给了1、2、3号专家,那么就需要把对应的token数量告知其他设备上的专家,这样的话,每个rank就知道自己在下一步进行all_to_all()时的输入输出切割策略。<br>
第二次通信行为,是确定好token分配信息之后,各个设备把本地的token特征进行分割,然后互相发送、接收,这一步调用的是all_to_all()方法。举个例子,假如0卡上一共有12个token,经过Gate模块后,判定为0-2号token使用自己的expert处理,3-5号token使用1号专家处理,6-8号token使用2号专家处理,9-11号token使用3号专家,那么0号卡就会把这12个token的特征矩阵分成4份,把其中3份发给其他卡。同时,0号卡会接受其他卡发送过来的需要让0号专家处理的token特征。完成这一步后,各个卡上的专家就可以进行计算了。计算完成后,就需要进行第三次通信行为了。<br>
第三次通信行为使用的也是all_to_all()方法,目的是把计算结果传回token原属的节点。继续上面的例子,1号专家计算完0号卡发送过来的0-2号token后,需要把计算结果返回给0号卡;2号专家计算完0号卡发送过来的3-5号token后,也需要把计算结果返回给0号卡,以此类推。<br>
以上就是DeepSeek-R1 MOE模块的代码实现解析,大家还有什么问题呢?欢迎讨论!</p>
<blockquote>
<p>本文由博客一文多发平台 OpenWrite 发布!</p>
</blockquote><br><br>
来源:https://www.cnblogs.com/ai-mr-jin/p/18938465
頁:
[1]