住在月球上 發表於 2025-9-23 08:47:50

使用 Rust 实现的基础的List 和 Watch 机制示例流程

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li>使用 Rust 实现的基础的List 和 Watch 机制</li><ul class="second_class_ul"><li>介绍</li><li>目标</li><li>理解问题</li><li>使用场景</li></ul><li>分析问题</li><ul class="second_class_ul"><li>关键问题</li><li>技术选型</li><li>设计代码结构</li><li>组件设计</li><li>基本原理</li><li>websocket路由入口</li><li>websocket连接处理</li><li>消息推送机制</li><li>客户端</li><li>使用验证</li><li>不足分析</li><li>示例流程</li><li>总结</li></ul></ul></div><p class="maodian"></p><h2>使用 Rust 实现的基础的List 和 Watch 机制</h2>
<p class="maodian"></p><h3>介绍</h3>
<p>在日常的开发过程中,有一个很重要的任务是能够通过Rust语言实现K8s中的各种生态组件,在这个过程中,既需要能过够了解K8S的工作原理也需要能够知道rust的语言特性。因此,在这个过程中有很多值得探讨的知识点。</p>
<p>在这里,第一步,我们将探索如何使用 Rust 实现一个类似于 Kubernetes 的&nbsp;<code>list</code>&nbsp;和&nbsp;<code>watch</code>&nbsp;机制。我们将通过 WebSocket 实现实时的消息推送,并使用一些关键的 Rust 异步编程模型来处理事件和连接管理。</p>
<p>我们首先默认大家能够了解rust语言的基本特性。下文中,将针对rust的知识点展开进行探讨。</p>
<p class="maodian"></p><h3>目标</h3>
<ul><li>理解 WebSocket 连接的建立和管理。</li><li>学习如何通过 WebSocket 推送消息。</li><li>掌握消息缓存和处理的实现方式。</li><li>了解如何使用 Rust 实现一个高效的事件分发系统。</li><li>理解K8S中的数据一致性保障方法</li><li>了解本机制的不足,以及后续如何进行改进</li></ul>
<p class="maodian"></p><h3>理解问题</h3>
<p>什么是&nbsp;<code>list</code>&nbsp;和&nbsp;<code>watch</code>?</p>
<ul><li><strong>List</strong>:列出当前所有资源的状态。</li><li><strong>Watch</strong>:实时监控资源的变化,一旦有资源变化,就会立即通知客户端。</li></ul>
<p class="maodian"></p><h3>使用场景</h3>
<ul><li><strong>自动化运维</strong>:实时监控系统资源状态,触发自动化运维操作。</li><li><strong>应用监控</strong>:实时获取应用状态,及时处理异常,在很多的系统设计场景中,能够减少耦合。</li><li><strong>K8S中的相应设计</strong>:K8S中,对相应资源的通知的基础即为list and watch机制。本人在学习K8S源码的第一步就是学习这一套设计架构。</li></ul>
<p class="maodian"></p><h2>分析问题</h2>
<p>\当然,通过简单的代码仅仅通过http进行主动连接也可实现这个功能。但在目前阶段,我们希望能够设计一个高效的、稳定的、可扩展的list and watch体系,因此我们需要考虑以下几个关键问题。</p>
<p class="maodian"></p><h3>关键问题</h3>
<ul><li><strong>如何建立和管理 我们服务器和客户端的连接?通过什么方式进行?</strong></li><li><strong>如何实现高效的消息推送机制?</strong></li><li><strong>如何处理消息缓存和订阅管理?</strong></li></ul>
<p class="maodian"></p><h3>技术选型</h3>
<ul><li><strong>语言</strong>:Rust</li><li><strong>Web 框架</strong>:warp框架</li><li><strong>WebSocket</strong>实现和框架:tokio-tungstenite、warp</li><li><strong>异步编程</strong>:tokio、管道机制</li></ul>
<p class="maodian"></p><h3>设计代码结构</h3>
<p>针对以上这个需求,结合目前kunos-system的需求我们阐释如下</p>
<ul><li><strong>有以下几个资源,Node、Task(Task是一个shell命令、镜像运行命令的载体)、Job(Task的上层资源,一个Job包含多个Task,类似于K8s中的replicaset)我们需要对这几个资源的状态进行推送。</strong></li><li><strong>能够在服务器建立起来一个watch and list服务器,能够推送各种事件</strong></li><li><strong>能够</strong></li></ul>
<p class="maodian"></p><h3>组件设计</h3>
<ul><li><strong>Broker</strong>:管理 WebSocket 订阅者和事件分发。
<div class="jb51code"><pre class="brush:plain;">pub struct Broker&lt;R: Resource + Clone + Serialize + Send + Sync+ 'static&gt; {
    // 下游的订阅者列表,用于发送websocket信息
    subscribers: Arc&lt;RwLock&lt;HashMap&lt;Topic, HashMap&lt;Uuid, WsSender&gt;&gt;&gt;&gt;,
    // 事件的缓冲流
    event_sender: UnboundedSender&lt;(Topic, WatchEvent&lt;R&gt;)&gt;,
}</pre></div></li><li><strong>Watcher</strong>:对不同资源类型进行管理和操作。<div class="jb51code"><pre class="brush:plain;">pub struct Watcher {
    // 为不同的事件建立不同的broker
    pub node_broker: Arc&lt;Broker&lt;Node&gt;&gt;,
    pub task_broker: Arc&lt;Broker&lt;Task&gt;&gt;,
    pub job_broker: Arc&lt;Broker&lt;Job&gt;&gt;,
    pub exec_broker: Arc&lt;Broker&lt;TaskExecRequest&gt;&gt;,
}</pre></div></li><li><strong>WebSocket 客户端</strong>:与服务器交互,接收实时事件。</li></ul>
<p class="maodian"></p><h3>基本原理</h3>
<p class="maodian"></p><h3>websocket路由入口</h3>
<div class="jb51code"><pre class="brush:plain;">let node_subscribe = warp::path!("watch" / "node").and(warp::ws()).map(
    move |ws: warp::ws::Ws| {
      let node_broker_clone = Arc::clone(&amp;node_broker_clone);
      ws.on_upgrade(move |socket| async move {
            node_broker_clone.subscribe("node".to_string(), socket).await;
      })
    },
);</pre></div>
<p>1.&nbsp;<code>warp::path!(&quot;watch&quot; / &quot;node&quot;)</code></p>
<p>*这部分代码定义了一个路径过滤器,用于匹配路径 /watch/node&nbsp;<code>的 HTTP 请求。</code>warp::path!<code>是 Warp 框架提供的一个宏,用于简化路径定义。这里的</code>&quot;watch&quot; / &quot;node&quot;<code>表示请求路径必须是</code>/watch/node` 才能匹配这个过滤器。</p>
<p>2.&nbsp;<code>.and(warp::ws())</code></p>
<p><strong>这一部分代码将路径过滤器与 WebSocket 协议过滤器组合起来。</strong><code>warp::ws()</code>&nbsp;过滤器会匹配 WebSocket 握手请求并提取一个&nbsp;<code>warp::ws::Ws</code>&nbsp;类型,表示 WebSocket 配置。这表示我们的这个路径将为一个websocket接口。</p>
<ul><li><code>warp::ws()</code>&nbsp;过滤器用于匹配并提取 WebSocket 握手请求,确保该请求是 WebSocket 协议请求。</li></ul>
<p>3.&nbsp;<code>.map(move |ws: warp::ws::Ws| { ... })</code></p>
<p><code>.map</code>&nbsp;方法用于将前面的过滤器组合结果映射到一个新的处理逻辑中。这里的&nbsp;<code>move |ws: warp::ws::Ws| { ... }</code>&nbsp;是一个闭包,用于处理 WebSocket 请求。</p>
<ul><li><code>move</code>&nbsp;关键字确保闭包捕获其环境中的所有变量的所有权,因为这些变量将在异步操作中使用。</li><li><code>ws: warp::ws::Ws</code>&nbsp;参数是从前面的&nbsp;<code>warp::ws()</code>&nbsp;过滤器中提取的 WebSocket 配置。</li></ul>
<p>4.&nbsp;<code>ws.on_upgrade(move |socket| async move { ... })</code></p>
<p><code>ws.on_upgrade</code>&nbsp;方法用于将 WebSocket 协议升级请求处理为 WebSocket 连接。它接受一个闭包作为参数,当 WebSocket 握手成功后,这个闭包会被调用。在官方定义中,这个方法主要用于自定义一个函数对建立后的websocket连接进行一定的操作,因此我们在这里将建立连接后一切操作,比如保持连接,发送信息等。</p>
<div class="jb51code"><pre class="brush:plain;">/// Finish the upgrade, passing a function to handle the `WebSocket`.
///
/// The passed function must return a `Future`.
pub fn on_upgrade&lt;F, U&gt;(self, func: F) -&gt; impl Reply
where
    F: FnOnce(WebSocket) -&gt; U + Send + 'static,
    U: Future&lt;Output = ()&gt; + Send + 'static,
{
    WsReply {
      ws: self,
      on_upgrade: func,
    }
}</pre></div>
<ul><li><code>move |socket| async move { ... }</code>&nbsp;是一个异步闭包,它将在 WebSocket 连接成功升级后执行。</li><li><code>socket</code>&nbsp;参数表示已经升级的 WebSocket 连接。</li></ul>
<p>5.&nbsp;<code>node_broker_clone.subscribe(&quot;node&quot;.to_string(), socket).await;</code></p>
<p>在异步闭包内部,调用 node_broker_clone&nbsp;<code>的</code>subscribe` 方法,将新的 WebSocket 连接订阅到节点(node)主题中。后续我们将展开讲解</p>
<ul><li><code>&quot;node&quot;.to_string()</code>&nbsp;将节点主题名称转换为字符串。</li><li><code>socket</code>&nbsp;参数表示当前的 WebSocket 连接。</li><li><code>await</code>&nbsp;关键字等待异步订阅操作完成。</li></ul>
<p class="maodian"></p><h3>websocket连接处理</h3>
<p>上面说到,我们通过&nbsp;<code>ws.on_upgrade(move |socket| async move { ... })</code>这个方法在连接建立之后进行处理,其中可以知道,我们处理的方法如下所示。</p>
<div class="jb51code"><pre class="brush:plain;">pub async fn subscribe(&amp;self, topic: Topic, socket: warp::ws::WebSocket) {
      let (ws_sender, mut ws_receiver) = socket.split();
      let (tx, mut rx) = mpsc::unbounded_channel::&lt;Message&gt;();
      let subscriber_id = Uuid::new_v4();

      {
            let mut subs = self.subscribers.write().await;
            subs.entry(topic.clone()).or_default().insert(subscriber_id, tx);
      }

      let subscribers = Arc::clone(&amp;self.subscribers);
      tokio::task::spawn(async move {
            while let Some(result) = ws_receiver.next().await {
                match result {
                  Ok(message) =&gt; {
                        // 处理有效的消息
                        if message.is_text() {
                            println!(
                              "Received message from client: {}",
                              message.to_str().unwrap()
                            );
                        }
                  }
                  Err(e) =&gt; {
                        // 处理错误
                        eprintln!("WebSocket error: {:?}", e);
                        break;
                  }
                }
            }
            println!("WebSocket connection closed");
            subscribers.write().await.get_mut(&amp;topic).map(|subscribers| subscribers.remove(&amp;subscriber_id));
      });

      tokio::task::spawn(async move {
            let mut sender = ws_sender;

            while let Some(msg) = rx.recv().await {
                let _ = sender.send(msg).await;
            }
      });
    }</pre></div>
<p><strong>websocket连接处理</strong>&nbsp;<code>let (ws_sender, mut ws_receiver) = socket.split();</code>这里使用原生的代码,将已经建立起来的socket进行分割,因为websocket是双向连接,因此获得针对这个socket的发送端(ws_sender)和接收端(ws_receiver)。</p>
<p><strong>建立连接并保存</strong></p>
<div class="jb51code"><pre class="brush:plain;">let (tx, mut rx) = mpsc::unbounded_channel::&lt;Message&gt;();
let subscriber_id = Uuid::new_v4();

{
    let mut subs = self.subscribers.write().await;
    subs.entry(topic.clone()).or_default().insert(subscriber_id, tx);
}</pre></div>
<p>在这里,我们建立了个一个管道,并将subscriber的信息进行保存,这里的&nbsp;<code>mpsc::unbounded_channel::&lt;Message&gt;();</code>类似于golang中的channel,他会生成一个发送者、一个接收者,当往发送者发送消息的时候,接收者会受到该消息并进行一定处理。因此我们将subscriber的发送者(tx)保存至内存里。</p>
<p><strong>建立消息发送机制</strong></p>
<div class="jb51code"><pre class="brush:plain;">tokio::task::spawn(async move {
            let mut sender = ws_sender;

            while let Some(msg) = rx.recv().await {
                let _ = sender.send(msg).await;
            }
      });</pre></div>
<p>这个就是很简单了,通过如果rx收到了消息,则向websocket的subscriber进行发送。该任务是以新协程任务的方式启动的,在后台持续运行</p>
<p><strong>建立websocket连接保活机制</strong></p>
<div class="jb51code"><pre class="brush:plain;">let subscribers = Arc::clone(&amp;self.subscribers);
    tokio::task::spawn(async move {
      while let Some(result) = ws_receiver.next().await {
            match result {
                Ok(message) =&gt; {
                  // 处理有效的消息
                  if message.is_text() {
                        println!(
                            "Received message from client: {}",
                            message.to_str().unwrap()
                        );
                  }
                }
                Err(e) =&gt; {
                  // 处理错误
                  eprintln!("WebSocket error: {:?}", e);
                  break;
                }
            }
      }
      println!("WebSocket connection closed");
      subscribers.write().await.get_mut(&amp;topic).map(|subscribers| subscribers.remove(&amp;subscriber_id));
    });</pre></div>
<p>这里我们仍然在后台启动一个守护协程,用于保活websocket连接,一旦发生了连接失效,则注销消息发送机制,删除subscribers缓存中的订阅者。</p>
<p class="maodian"></p><h3>消息推送机制</h3>
<p><strong>事件推送</strong>事件推送时候将允许调用相关事件的推送地址,向推送端发送消息。</p>
<div class="jb51code"><pre class="brush:plain;">pub async fn produce_node_event(&amp;self, event: WatchEvent&lt;Node&gt;) {
      self.node_broker.produce("node".to_string(), event).await;
    }
    pub async fn produce_task_event(&amp;self, event: WatchEvent&lt;Task&gt;) {
      self.task_broker.produce("task".to_string(), event).await;
    }
    pub async fn produce_job_event(&amp;self, event: WatchEvent&lt;Job&gt;) {
      self.job_broker.produce("job".to_string(), event).await;
    }</pre></div>
<p>当收到消息的时候,不直接处理消息,而是将放入缓存队列中(一个消息无界流)</p>
<div class="jb51code"><pre class="brush:plain;">pub async fn produce(&amp;self, topic: Topic, event: WatchEvent&lt;R&gt;) {
      if let Err(e) = self.event_sender.send((topic.clone(), event.clone())) {
            eprintln!("Failed to send event: {}", e);
      }
    }</pre></div>
<p><strong>事件分发</strong>同样的。将启动一个协程,用于从和event_sender对应的event_receiver中获取消息,推送给订阅者。</p>
<div class="jb51code"><pre class="brush:plain;">fn start_event_dispatcher(broker: Arc&lt;Self&gt;, mut event_receiver: UnboundedReceiver&lt;(Topic, WatchEvent&lt;R&gt;)&gt;) {
      tokio::spawn(async move {
            while let Some((topic, event)) = event_receiver.recv().await {
                let event_json = serde_json::to_string(&amp;event).unwrap();
                let subscribers_list;
                {
                  let subscribers = broker.subscribers.read().await;
                  subscribers_list = subscribers.get(&amp;topic).cloned().unwrap_or_default();
                }
                let mut invalid_subscribers = vec![];
                for (id, ws_sender) in subscribers_list {
                  if ws_sender.send(warp::ws::Message::text(event_json.clone())).is_err() {
                        invalid_subscribers.push(id);
                  }
                }
                if !invalid_subscribers.is_empty() {
                  let mut subscribers = broker.subscribers.write().await;
                  if let Some(subscribers) = subscribers.get_mut(&amp;topic) {
                        for id in invalid_subscribers {
                            subscribers.remove(&amp;id);
                        }
                  }
                }
            }
      });
    }</pre></div>
<p><strong>获取订阅者的列表并依次发送</strong></p>
<p><strong>如果发现发送失败,则将这个订阅者从缓存中删除</strong></p>
<p class="maodian"></p><h3>客户端</h3>
<p>客户端的代码就是建立起来一个订阅者关注相关事件的动态。在相应的代码中,可以使用该方法。本方法最终返回的是一个无界流&nbsp;<code>Stream&lt;Item = WatchEvent&lt;R&gt;&gt;</code>,用于得到服务器推送过来的事件类型</p>
<div class="jb51code"><pre class="brush:plain;">pub async fn list_and_watch&lt;R&gt;(api_client: &amp;ApiClient, resource_name: &amp;str) -&gt; impl Stream&lt;Item = WatchEvent&lt;R&gt;&gt;
where
    R: Resource + Clone + DeserializeOwned + 'static + Send,
{
    // 先通过 HTTP 获取资源列表
    let initial_resources = get_resource_list::&lt;R&gt;(api_client).await;
    // 解析要连接WebSocket服务器的URL
    let url = Url::parse(&amp;*format!("{}/{}", api_client.watch_url, resource_name)).expect("Invalid URL");
    // 连接到WebSocket服务器
    println!("watch url is {}", url);
    let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
    println!("Watch client connected");
    let (mut write, read) = ws_stream.split();
    let (tx, rx) = mpsc::unbounded_channel();
    // 先发送初始资源列表
    match initial_resources {
      Ok(res) =&gt; tx.send(WatchEvent::Restarted(res)).unwrap(),
      Err(e) =&gt; eprintln!("list resource failed, {}", e),
    };
    // 将 WebSocket 读流转换为消息事件流
    tokio::spawn(async move {
      read.for_each(|message| async {
            match message {
                Ok(msg) =&gt; {
                  if msg.is_text() {
                        let text = msg.to_text().unwrap();
                        match serde_json::from_str::&lt;WatchEvent&lt;R&gt;&gt;(text) {
                            Ok(event) =&gt; {
                              tx.send(event).unwrap();
                            }
                            Err(e) =&gt; {
                              eprintln!("Failed to parse message: {:?}", e);
                            }
                        }
                  }
                }
                Err(e) =&gt; {
                  eprintln!("Error receiving message: {:?}", e);
                }
            }
      }).await;
    });
    // 保持 WebSocket 连接活跃
    tokio::spawn(async move {
      loop {
            if let Err(e) = write.send(WatchMessage::Text(String::new())).await {
                eprintln!("Error sending ping: {:?}", e);
                break;
            }
            tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
      }
    });
    tokio_stream::wrappers::UnboundedReceiverStream::new(rx)
}</pre></div>
<p class="maodian"></p><h3>使用验证</h3>
<p class="maodian"></p><h3>不足分析</h3>
<p>经过上面的介绍,我们可以看到这个基础的list and watch机制能够正确运行。但是,和K8S、ETCD中广泛使用的list and watch相比仍然缺少一个机制来保证list和watch的一致性。</p>
<p>请考虑这样一种情况我们的服务器中会源源不断地产生数据d1,d2,d3,...,dn。当我们使用list时候,能够感知到d1,d2,d3,此时我们完成list,开始建立watch。加入在开始建立watch这个阶段,即使可能是几毫秒的时间但服务器生成了d4,而在watch建立起来后,只能接收到d5,d6,...。这就导致了数据的遗失。</p>
<p>在 Kubernetes 中,<code>List</code>&nbsp;和&nbsp;<code>Watch</code>&nbsp;操作结合使用时,需要使用一个revision机制以确保资源的变更不会被遗漏。理解&nbsp;<code>List</code>&nbsp;和&nbsp;<code>Watch</code>&nbsp;操作时&nbsp;<code>revision</code>(即&nbsp;<code>resourceVersion</code>)的具体含义和管理方式对于保证一致性至关重要。revision的存在有着如下的意义:</p>
<ul><li><strong>数据版本控制</strong>:<code>revision</code>&nbsp;是 Etcd 的全局递增计数器,用于标识数据的当前版本。当进行数据的修改、更新操作时候,revision会+1</li><li><strong>一致性视图</strong>:确保返回的数据是一致的快照视图,表示在该&nbsp;<code>revision</code>&nbsp;之前的所有操作都已完成。</li></ul>
<p><code>revision</code>&nbsp;与&nbsp;<code>List</code>&nbsp;和&nbsp;<code>Watch</code>&nbsp;的关系</p>
<ul><li><strong>List 操作</strong>:
<ul><li>返回资源列表和当前的全局&nbsp;<code>revision</code>,作为&nbsp;<code>resourceVersion</code>。</li><li>确保获取到的资源是该&nbsp;<code>revision</code>&nbsp;时刻的一致视图。</li></ul></li><li><strong>Watch 操作</strong>:<ul><li>使用 List&nbsp;<code>操作返回的</code>resourceVersion` 作为起点。</li><li>从该 resourceVersion&nbsp;<code>开始监听资源的变化,确保在</code>List&nbsp;<code>和</code>Watch` 之间的变更不会丢失。</li></ul></li></ul>
<p><code>List</code>&nbsp;操作的&nbsp;<code>revision</code></p>
<p>当进行 List&nbsp;<code>操作时,Kubernetes API Server 从 Etcd 获取当前资源的状态及其</code>resourceVersion&nbsp;<code>。这个&nbsp;</code>resourceVersion&nbsp;<code>是 Etcd 当前的全局</code>revision&nbsp;<code>。它表示在此&nbsp;</code>revision&nbsp;<code>之前的所有操作都已经完成,并确保返回的数据是这个</code>revision` 时刻的一致视图。</p>
<p><code>Watch</code>&nbsp;操作的&nbsp;<code>revision</code></p>
<p><code>Watch</code>&nbsp;操作使用&nbsp;<code>List</code>&nbsp;操作返回的&nbsp;<code>resourceVersion</code>&nbsp;作为起点,从该版本开始监听资源的变化。这确保了从&nbsp;<code>List</code>&nbsp;到&nbsp;<code>Watch</code>&nbsp;之间的变更不会被遗漏。</p>
<p class="maodian"></p><h3>示例流程</h3>
<ul><li><strong>List 操作</strong>:
<ul><li><strong>API Server 从 Etcd 获取指定资源的当前状态。</strong></li><li>Etcd 返回包含所有资源对象的列表和一个全局 revision&nbsp;<code>,这个&nbsp;</code>revision&nbsp;<code>将作为</code>resourceVersion`。</li></ul></li><li><strong>Watch 操作</strong>:<ul><li>API Server 使用&nbsp;<code>List</code>&nbsp;操作返回的&nbsp;<code>resourceVersion</code>(revision) 作为起点,开始监听资源的变化。</li><li>Etcd 返回从指定 revision` 开始的所有变更事件。</li></ul></li></ul>
<p class="maodian"></p><h3>总结</h3>
<ul><li><code>revision</code>:标识数据版本,确保数据一致性。</li><li><code>List</code>&nbsp;和&nbsp;<code>Watch</code>:<code>List</code>&nbsp;获取资源和&nbsp;<code>revision</code>,<code>Watch</code>&nbsp;从该&nbsp;<code>revision</code>&nbsp;开始监听变化,确保变更的连续性和一致性。</li></ul>
<p>到此这篇关于使用 Rust 实现的基础的List 和 Watch 机制的文章就介绍到这了,更多相关Rust List 和 Watch 机制内容请搜索琼殿技术社区以前的文章或继续浏览下面的相关文章希望大家以后多多支持琼殿技术社区!</p>
                           
                            <div class="art_xg">
                              <b>您可能感兴趣的文章:</b><ul><li>Rust中用enum实现多参数Hook机制完整代码</li><li>详解Python和Rust中内存管理机制的实现与对比</li><li>Rust 所有权机制原理深入剖析</li><li>Rust指南之生命周期机制详解</li><li>详解Rust中的所有权机制</li></ul>
                            </div>

                        </div>
                        <!--endmain-->
頁: [1]
查看完整版本: 使用 Rust 实现的基础的List 和 Watch 机制示例流程