SgLang代码细读-1.从req到batch
<h1 id="sglang代码细读-1从req到batch">SgLang代码细读-1.从req到batch</h1><h2 id="代码入口--初始化">代码入口 & 初始化</h2>
<p>sglang/python/sglang/srt/entrypoints/http_server.py <code>launch_server</code> 主要分4个步骤:</p>
<ol>
<li>启动下列进程 (_launch_subprocesses):
<ul>
<li>TokenizerManager: 把输入的query进行tokenize</li>
<li>DP=1: run_scheduler_process: 创建scheduler & 开始event_loop</li>
<li>DP>1: run_data_parallel_controller_process: 初始化带负载均衡的DataParallelController, 开始event_loop</li>
<li>Detokenizer: 重新将输出转为可阅读的文字输出, 返回http结果</li>
<li>launch_dummy_health_check_server: 心跳保活接口</li>
</ul>
</li>
<li>api鉴权 & warmup请求</li>
<li>进程启动完成后启动http服务.</li>
</ol>
<img src="https://img2023.cnblogs.com/blog/1439743/202505/1439743-20250519142855404-1515989260.png" alt="image-20250514113112380" style="zoom: 50%">
<h3 id="tokenizer--detokenizer">Tokenizer / Detokenizer</h3>
<p>sglang/python/sglang/srt/managers/tokenizer_manager.py</p>
<h4 id="1-词表加载">1. 词表加载</h4>
<p>get_tokenizer->AutoTokenizer.from_pretrained 从huggingface.io下载预训练好的tokenizer</p>
<p>词表的生成方式一般是通过统计方式取top词, 比较常见的有BPE, WordPiece, 例如BPE的训练步骤:</p>
<ol>
<li><strong>初始化词汇表</strong>: 基础单元为所有单字符</li>
<li><strong>迭代合并高频字符对</strong>:
<ul>
<li>统计所有相邻字符对的频率</li>
<li><strong>合并频率最高的字符对</strong>,将其加入词表, 一直重复这个步骤直到达到目标词汇量</li>
<li>另外还有一部分特殊标记: <code></code>(未知词)、<code></code>(填充)、<code></code>(分类)、<code></code>(分隔符)、<code>(掩码)、语言标记(</code><en><code>, </code><zh>`)等。</zh></en></li>
</ul>
</li>
<li><strong>保留合并操作</strong>: 记下所有的合并操作,用于后续对新文本的分词</li>
</ol>
<h4 id="2-请求构造">2. 请求构造</h4>
<p>每个节点都有一个http_server接受请求.<code>sglang/python/sglang/srt/entrypoints/http_server.py</code> generate_request()->_handle_batch_request()->send_to_scheduler.send_pyobj() 把请求发给rank0的scheduler</p>
<h3 id="scheduler">scheduler</h3>
<p>只看了DP>1的情况.</p>
<h4 id="dataparallelcontroller">DataParallelController</h4>
<p><strong>负载均衡</strong>: node间请求是通过zmq的方式进行通信. 在node_rank=0接受原始请求(TokenizedGenerateReqInput), 也就是上面的send_to_scheduler的请求, 然后进行dispatch. 负载均衡采用round_robin/shortest_queue的方式, 其他节点接到请求后, 在<code>handle_generate_request</code>里把请求放到waiting_queue里面</p>
<h4 id="launch_schedulers">Launch_schedulers</h4>
<p>核心是<code>launch_tensor_parallel_group</code>这个函数, 这里根据一个开关 <code>enable_dp_attention</code> 进行逻辑的区分. 这个优化原理参考文档, 当模型结果中有MLA时, 如果我们在attention层使用了TP, 会导致KV结果被复制到多张卡上, 极大地浪费显存, 通过这个开关可以吧attn单独使用DP实现, 经过attn all_gather后在MOE层再进入TP逻辑. 这个函数中会计算各个device自己的PP/DP/TP rank并初始化到配置里.</p>
<img src="https://img2023.cnblogs.com/blog/1439743/202505/1439743-20250519142850788-1173699426.png" alt="image-20250514143814480" style="zoom: 50%">
<p>完成初始化后, 根据算好的rank启动<code>run_scheduler_process</code>进程, 这个函数做了下面几个事:</p>
<ul>
<li>初始化logger</li>
<li>设置CPU亲和性: <code>SGLANG_SET_CPU_AFFINITY</code>, 当启动scheduler进程的时候, 把GPU与对应范围的CPU核做黏合,也就是说,做了亲和性的线程或进程,只会在这一个CPU核上运行,只在这一个CPU核上被调度,且不会切换到其他的核上运行。开超线程(HT)的时候, bind范围需要根据逻辑核心数进行设置.</li>
<li>根据prefill/decode启动不同的event_loop, 注意这里有个overlap配置, 参考设计文档, 如果不开这个优化, cpu和GPU逻辑完全串行, 比如batch1在GPU forward计算时, CPU空闲, 而实际上可以在这段时间跑batch2的前置batch逻辑(比如tokenizer)<img src="https://img2023.cnblogs.com/blog/1439743/202505/1439743-20250519142847144-1878905941.png" alt="image-20250514145813019" loading="lazy"></li>
</ul>
<h3 id="event_loop">Event_loop</h3>
<p>主要逻辑如图, 主要功能是把req处理成batch, 用于后续的计算backend开始forward</p>
<p><img src="https://img2023.cnblogs.com/blog/1439743/202505/1439743-20250519142841866-151216296.png" alt="image-20250516152136418" loading="lazy"></p>
<h4 id="prefill-sglangpythonsglangsrtdisaggregationprefillpy">prefill (sglang/python/sglang/srt/disaggregation/prefill.py)</h4>
<p>以<code>event_loop_normal_disagg_prefill</code>为基础看prefill中的逻辑. overlap省略</p>
<h5 id="1-接受处理请求recv_requests--process_input_requests">1. 接受处理请求(recv_requests & process_input_requests)</h5>
<p>接受请求, 分几种并行方式来:</p>
<ul>
<li>PP: rank0 节点从tokenizer里接受请求list, 非rank0节点等待从上一个pp_rank节点的点对点请求.</li>
<li>TP: 如果开启了enable_dp_attn, tp_rank0节点 把tokenizer相关的请求(TokenizedGenerateReqInput, TokenizedEmbeddingReqInput)视为work_req, 其他类型的视为control_req, 把这些请求都广播到同TP_group的其他节点</li>
</ul>
<p>处理请求: TypeBasedDispatcher在这个Dispatcher内定义了每个请求类型和对应的处理方法, 心跳请求忽略</p>
<h5 id="2-从waiting_queue中pop请求">2. 从waiting_queue中pop请求</h5>
<p>KVSender有这么几种状态: Bootstrapping(表示正在与远端节点建立连接和同步元数据), WaitingForInput(完成连接, 等待kvcache发送), Transferring(传输中)</p>
<p>核心函数: <code>pop_bootstrapped</code>, 步骤如下:</p>
<ul>
<li>poll_and_all_reduce: 调用每个请求里对应的kv_sender, 也就是mooncake/conn.py里的poll方法, 用来获取KVSender的当前状态, 通过all_reduce获取各个机器上的所有请求状态</li>
<li>把WaitingForInput状态的请求挑出来执行, 从内存池(ReqToMetadataIdxAllocator)中给这些请求分配首token的内存存储空间, 以及kv_sender初始化, TODO: 待弄清楚token_to_kv_pool分页的原因是否为保证访存连续性.</li>
</ul>
<h5 id="3处理process_prefill_chunk">3.处理process_prefill_chunk:</h5>
<p>把chunked请求从batch里过滤出来, 保留chunkCache</p>
<h5 id="4get_new_batch_prefill-从waitingqueue里根据优先级组合新batch-重要函数">4.get_new_batch_prefill 从waitingQueue里根据优先级组合新batch (重要函数)</h5>
<h6 id="一优先级计算schedule_policypy-calc_priority">一.优先级计算(schedule_policy.py) <code>calc_priority</code></h6>
<p>policy策略主要分为两大类, 和tree_cache相关的和无关的. 涉及到cache命中的相关数据结构与逻辑在后文单独说.</p>
<p>无关的有:</p>
<ul>
<li><strong>FCFS</strong>: first come first serve, 也是默认策略</li>
<li>LOF:based on the longest output, 把waiting_queue按最长输出排序.</li>
<li>RANDOM: 直接shuffle queue</li>
</ul>
<p>有关的有:</p>
<ul>
<li>LPM: longest prefix, 根据最长前缀token匹配数排序</li>
<li>DFS_WEIGHT: DFS方式计算req 最后一个node在treeCache中的权重再排序</li>
</ul>
<h6 id="二-处理chunked_req">二. 处理chunked_req</h6>
<p>如果存在chunked请求, 之前把它从batch里过滤出来, 在<code>init_next_round_input</code>根据cache中已经处理完的前缀, 确定下一个batch中要处理的chunk长度和token.</p>
<p>在<code>add_chunked_req</code>中变更ids的偏移, 并且把chunked_req加到can_run_list里</p>
<h6 id="三-遍历waiting_queue">三. 遍历waiting_queue</h6>
<pre><code class="language-python"> for req in self.waiting_queue:
#... 前面还有一些非核心逻辑,省略
if len(adder.can_run_list) >= self.get_num_allocatable_reqs(running_bs):
self.running_batch.batch_is_full = True
break
#对请求进行prefix_cache匹配, 确定在cache中的last_node, prefix_indices, 和input_len, 计算需要用多长的kvcache
req.init_next_round_input(
None if prefix_computed else self.tree_cache,
self.enable_hierarchical_cache)
#判断这个请求的token数和batch里已有的token总和是否打印max_tokens, 目标应该是防止显存超限
#对tree_cache.last_node加锁后操作, 增加last_node的引用计数, 防止被释放.
res = adder.add_one_req(
req, self.chunked_req, self.enable_hierarchical_cache)
</code></pre>
<p>当batch满或是要处理的token数满后, 停止这个遍历循环, 然后把在can_run_list中的req从waiting_queue里剔除.</p>
<h6 id="四构建schedulebatch">四.构建ScheduleBatch</h6>
<p><code>ScheduleBatch.init_new</code>: 从can_run_list中构建ScheduleBatch类</p>
<p><code>prepare_for_extend</code>: 为batch内的数据分配显存, 如input_ids/seq_lens 等</p>
<h5 id="5prepare_dp_attn_batch">5.prepare_dp_attn_batch</h5>
<p>这个函数主要作用是统计其他DP的batch状态, 首先统计local的batch信息, 比如<code>num_tokens</code>, <code>can_cuda_graph</code>等, 通过all_gather汇聚所有DP的状态, 只要有其中一个DP存在非空的batch, 就把当前如果是空的local_batch填充一个idle_batch, 这个idle_batch的作用就是使得所有DP的运行状态保持同步, 比如其他DP有AllToAll的需求, 就可以在idle_batch中能够把对应的集合通信状态给同步运行</p>
<h5 id="6run_batch">6.run_batch</h5>
<p><code>run_batch</code>: forward核心逻辑, 在下一个文章中详细解读</p>
<p><code>process_batch_result_disagg_prefill</code>: 把完成forward的prefill结果启动send_kv_cache, 并添加到disagg_prefill_inflight_queue里面</p>
<p><code>process_disagg_prefill_inflight_queue</code>: 使用all_reduce检查所有处于kv sending状态的请求, 对已经完成发送的req交给detokenizer进行处理.</p>
<h5 id="7开始下一轮循环">7.开始下一轮循环</h5>
<h4 id="decode-sglangpythonsglangsrtdisaggregationdecodepy">decode (sglang/python/sglang/srt/disaggregation/decode.py)</h4>
<p>以<code>event_loop_normal_disagg_decode</code>为基础. 重点看下和prefill有区别的地方</p>
<h5 id="1请求处理">1.请求处理</h5>
<p>recv_requests & process_input_requests, 和prefill逻辑一致</p>
<h5 id="2从decode_queue里拿取完成kvcache通信的请求process_decode_queue">2.从decode_queue里拿取完成kvcache通信的请求(process_decode_queue)</h5>
<p><code>disagg_decode_prealloc_queue.pop_preallocated</code>: Decode阶段请求进来后首先进入这个队列, 在pop时, 根据req的token数判断当前已pop出去的token总和是否大于阈值, 如果大于就暂停, 如果小于则可以对这个请求对应的kvcache分配显存, 主要目的是防止显存超限.</p>
<p><code>disagg_decode_transfer_queue.extend & pop_transferred</code>: 把prealloc_queue pop出来的请求再填入transfer_queue, 把其中已经完成kv通信的请求挑出来, 把这些请求, 最后把这些请求加到decode模块的waiting_queue里面</p>
<h5 id="3schedulebatch构建get_next_disagg_decode_batch_to_run">3.ScheduleBatch构建(get_next_disagg_decode_batch_to_run)</h5>
<p>decode整体是通过continus-batching的方式进行batch组合, 从而提升GPU利用率</p>
<p><img src="https://img2023.cnblogs.com/blog/1439743/202505/1439743-20250519142833370-1865802940.webp" alt="continus-batching" loading="lazy"></p>
<ul>
<li><code>last_batch.filter_batch()</code>: 把上个batch里已经处理完的请求剔除.</li>
<li><code>self.running_batch.merge_batch(last_batch)</code>: 把上个batch中没处理完的请求merge到running_batch里</li>
<li><code>get_new_prebuilt_batch</code>, 从waiting_queue里取出请求, 把当前batch填满到(min(self.req_to_token_pool.size, self.max_running_requests))</li>
<li><code>update_running_batch((self.running_batch))</code>: 检查batch里的请求是否有超显存的可能性, 如果有将部分请求拿出来再放回queue里(retract_decode), 分配decode tensor相关显存占用.</li>
</ul>
<p>4.run_batch & process_batch_result (同prefill)</p>
<h2 id="一些基础概念">一些基础概念</h2>
<p>MFU概念和计算方法:</p>
<ul>
<li>
<p><strong>模型实际FLOPs计算公式</strong>:<br>
<strong>总FLOPs = 参数量 × 前向计算次数 + 2 × 参数量 × 反向计算次数</strong>(反向传播通常需要2倍前向的FLOPs)</p>
</li>
<li>
<p><strong>GPU理论峰值FLOPs计算公式</strong>:<br>
<strong>理论峰值FLOPs = GPU核心数 × 核心频率 (GHz) × 每周期运算数 × 2</strong>(“×2”表示乘加运算计为2次浮点操作)</p>
</li>
<li>
<p>MFU计算:</p>
<p></p><div class="math display">\[MFU=\frac{模型实际FLOPs}{GPU理论峰值FLOPs * 训练时间(秒)} * 100\%
\]</div><p></p></li>
</ul>
<h2 id="参考">参考:</h2>
<p>sglang阅读笔记: https://www.zhihu.com/column/c_1710767953182674944</p><br><br>
来源:https://www.cnblogs.com/sunstrikes/p/18884152
頁:
[1]