veRL代码阅读-2.Ray
<p>看VeRL代码之前发现代码里主要使用了ray框架来进行调度和通信. 所以先对ray进行初步学习, 后续有空闲时间再细看下Ray的代码.</p><h2 id="框架原理">框架原理</h2>
<h3 id="构成">构成</h3>
<p>架构图如下, ray里主要分为系统层面的layer和应用层的layer.</p>
<img src="https://img2023.cnblogs.com/blog/1439743/202506/1439743-20250623145213225-155541292.png" alt="image-20250621143321962" style="zoom: 50%">
<p>系统层layer:</p>
<ul>
<li>
<p>GCS(Global Control Store): 中心数据存储,是 Worker 之间传递消息的纽带, 储存了代码, 输入参数, 返回值.</p>
</li>
<li>
<p>Scheduler: 分成Global和Local两种, 严重怀疑VeRL论文里说的中心节点其实就是这个Global. 待确认. Local是每个单机上的调度器(又名Raylet), worker通过Local和Global进行通信. 下图是一个交互示意图:</p>
<img src="https://img2023.cnblogs.com/blog/1439743/202506/1439743-20250623145208440-1756021924.png" alt="image-20250621150252128" style="zoom: 50%">
</li>
<li>
<p>Object Store: 主要作用是通过RPC传递worker间的数据,<img src="https://img2023.cnblogs.com/blog/1439743/202506/1439743-20250623145205126-179169797.png" alt="image-20250621151323919" style="zoom: 33%"></p>
</li>
</ul>
<p>应用层Layer:</p>
<ul>
<li>Driver: 执行用户程序的进程</li>
<li>Worker: 无状态的执行remote task的进程, worker是框架自动启动的. 当用户声明了一个remote方法时, 这个remote方法会被发布到所有的worker上</li>
<li>Actor: 有状态进程(这里的actor概念和rl里的actor不一样注意一下),在被调用时只执行其暴露的函数。与worker不同的地方在于,actor需要worker或driver显式实例化, 特殊点在于每个方法的执行依赖于前一个方法产生的状态</li>
</ul>
<h3 id="调用流程">调用流程</h3>
<p>以一个remote请求发送和获取结果为例. <strong>发送</strong>的步骤依次为:</p>
<p>driver把remote函数提交给localScheduler -> localScheduler把任务告知global -> globalScheduler查询GCS拿到对应的函数参数存的位置 -> global把这个任务调度有参数b的Node2 -> Node2的LocalScheduler检查所有的参数是不是本地都有 -> 把没有的参数查GCS,从而知道需要的参数存在哪个节点上 -> 从对应的节点拉取本地没有的参数 -> 所有入参数据ready后, 执行计算步骤 -> 把计算的结果存到本地的ObjectStore里</p>
<img src="https://img2023.cnblogs.com/blog/1439743/202506/1439743-20250623145159952-2060180468.png" alt="image-20250621151734668" style="zoom: 67%">
<p>接收计算结果(ray.get)的步骤:</p>
<p>通过localScheduler在ObjectStore里检查返回的future对象<span class="math inline">\(id_c\)</span>是否在本地 -> 向GCS查询<span class="math inline">\(id_c\)</span>的存储位置, 如果这时候<span class="math inline">\(id_c\)</span>还没产出, N1就会往GCS注册一个回调, 当产出时通知 -> N2完成计算后把结果存储到本地ObjectStore, 把<span class="math inline">\(id_c\)</span>的meta信息添加到GCS, 这时候就会触发之前注册的回调 -> GCS通知N1数据可用,并且发送位置信息 -> N1向N2发送RPC请求拉取<span class="math inline">\(id_c\)</span>, 整个过程完成.</p>
<img src="https://img2023.cnblogs.com/blog/1439743/202506/1439743-20250623145155962-657345727.png" alt="image-20250623110602975" style="zoom: 67%">
<h2 id="使用方法">使用方法</h2>
<ol>
<li>task模式, 也就是上面说的worker, 适合无状态的逻辑执行</li>
</ol>
<pre><code class="language-python">@ray.remote#定义无状态分布式任务
def add(x, y):
return x + y
# 异步提交任务,立即返回 future 对象(对象引用)
future = add.remote(1, 2)
results = ray.get() #走上一章节说的get模式从remote拉取结果
</code></pre>
<ol start="2">
<li>Actor模式, 能够维护状态并封装方法。适合需要有状态的场景,如参数服务器、计数器. 特点是同一个Actor的方法调用按顺序执行,保证状态一致性, 而不同Actor实例之间可以并行执行</li>
</ol>
<pre><code class="language-python">@ray.remote# 使用 @ray.remote 将类转换为分布式 Actor, 实例在其生命周期内可以维持状态
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
def get_value(self):
return self.value
counter = Counter.remote()# 创建 Actor 实例, 存储在GCS里
future1 = counter.increment.remote()# 第一次增加
future2 = counter.increment.remote()# 第二次增加
future3 = counter.get_value.remote()# 获取当前值
print(ray.get())# 输出:
print(ray.get(future3))# 输出: 2
</code></pre>
<ol start="3">
<li>主要接口:</li>
</ol>
<table>
<thead>
<tr>
<th>接口</th>
<th>功能</th>
</tr>
</thead>
<tbody>
<tr>
<td>ray.put()</td>
<td>通过对象存储实现跨节点数据访问, put到remote的同时在GCS注册, 返回对象引用</td>
</tr>
<tr>
<td>ray.get()</td>
<td>同上, 功能变为数据拉取</td>
</tr>
<tr>
<td>ray.exceptions & @ray.remote(max_retries=3)</td>
<td>异常处理与重试, 搭配使用提升容错</td>
</tr>
<tr>
<td>@ray.remote(num_gpus=1)<br>def gpu_task():</td>
<td>静态资源配置</td>
</tr>
<tr>
<td>future = task.options(num_cpus=2, num_gpus=1).remote()</td>
<td>动态资源配置</td>
</tr>
<tr>
<td>ray.kill(Actor)</td>
<td>强制终止 Actor 实例</td>
</tr>
<tr>
<td>refs = <br>ready_refs, remaining_refs = ray.wait(refs, num_returns=len(refs))</td>
<td>等待异步提交的任务完成</td>
</tr>
</tbody>
</table>
<h2 id="参考">参考</h2>
<p>ray论文: https://arxiv.org/abs/1712.05889</p><br><br>
来源:https://www.cnblogs.com/sunstrikes/p/18944340
頁:
[1]