半米日暮 發表於 2025-2-26 08:46:09

Rust并发编程之使用消息传递进行线程间数据共享方式

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li>一、通道(Channel)的基本概念</li><li>二、创建并使用通道</li><ul class="second_class_ul"><li>1. 基础用法</li><li>2. 在子线程中发送消息</li><li>3. 通道与所有权</li><li>4. 发送多个消息</li><li>5. 多个发送端(Multiple Producer)</li></ul><li>三、总结</li><ul class="second_class_ul"></ul></ul></div><p class="maodian"></p><h2>一、通道(Channel)的基本概念</h2>
<p>一个通道可以想象成一条单向水道或河流:有一个 <strong>发送端(transmitter)</strong> 和一个 <strong>接收端(receiver)</strong>。发送端好比河流上游,负责把&ldquo;橡皮鸭&rdquo;丢进水里;接收端在河流下游,收到这只&ldquo;橡皮鸭&rdquo;。在编程中,线程之间的通信即是这样&mdash;&mdash;把数据发到通道的一端,另外一个(或多个)线程在通道的另一端接收。</p>
<p>Rust 通过 <code>std::sync::mpsc</code>(Multiple Producer, Single Consumer)来提供通道功能:</p>
<ul><li><strong>Multiple Producer</strong>:可以有多个发送端同时发送数据;</li><li><strong>Single Consumer</strong>:但只能有一个接收端来接收数据。</li></ul>
<p>通过克隆发送端可以允许多个线程一起发送数据给同一个接收端。</p>
<p class="maodian"></p><h2>二、创建并使用通道</h2>
<p class="maodian"></p><h3>1. 基础用法</h3>
<p>创建一个 <code>mpsc::channel</code></p>
<div class="jb51code"><pre class="brush:bash;">use std::sync::mpsc;
use std::thread;

fn main() {
    // 创建一个通道
    let (tx, rx) = mpsc::channel();

    // 这里的 tx 是 transmitter(发送端),rx 是 receiver(接收端)。
    // 我们先不发送任何数据,因此代码暂时无法编译,
    // 因为编译器不知道通道要发送什么类型的数据。
}</pre></div>
<p><code>mpsc::channel()</code> 函数会返回一个元组 <code>(tx, rx)</code>,分别代表发送端和接收端。在之后,我们会看到如何把 <code>tx</code> 移动到不同线程去发送消息,<code>rx</code> 则留在当前线程用于接收消息。</p>
<p class="maodian"></p><h3>2. 在子线程中发送消息</h3>
<p>下面的例子中,我们在主线程创建了通道,然后把发送端 <code>tx</code> 移动(<code>move</code>)到子线程中,子线程通过 <code>tx.send()</code> 发送一条字符串&ldquo;hi&rdquo;给主线程。</p>
<div class="jb51code"><pre class="brush:bash;">use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
      let val = String::from("hi");
      tx.send(val).unwrap();
      // 如果接收端已关闭,则 send 会返回错误,这里直接 unwrap 处理
    });

    // 在主线程接收消息
    let received = rx.recv().unwrap();
    // recv 会阻塞主线程,直到收到一条消息或者发送端被关闭
    println!("Got: {}", received);
}</pre></div>
<p>运行后,主线程会打印:</p>
<blockquote><p>Got: hi</p></blockquote>
<p>这表示主线程成功地收到了子线程通过通道发送的字符串。</p>
<p class="maodian"></p><h3>3. 通道与所有权</h3>
<p>当我们调用 <code>tx.send(val)</code> 时,<code>send</code> 方法会<strong>获取 <code>val</code> 的所有权</strong>。这样做能够避免在另一个线程修改数据后,我们在原线程又使用这段数据的潜在风险。</p>
<p>例如,下面这段示例代码(示意)试图在发送之后继续使用 <code>val</code>,就会导致编译错误:</p>
<div class="jb51code"><pre class="brush:bash;">use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();
    let val = String::from("hello");

    thread::spawn(move || {
      tx.send(val).unwrap();
      // 发送后 val 的所有权已转移到通道
      // println!("val is: {}", val); // 编译错误: val 所有权已经被移动
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}</pre></div>
<p>由于所有权已经转移,主线程可以安全地接收并处理这条消息,而子线程也不会再访问已经移出的数据。这种严格的所有权规则能有效避免数据竞争和其他并发错误。</p>
<p class="maodian"></p><h3>4. 发送多个消息</h3>
<p>我们可以让子线程发送不止一条消息。下面的例子让子线程依次发送多条字符串,并在发送之间加上 <code>sleep</code> 用来模拟耗时操作:</p>
<div class="jb51code"><pre class="brush:bash;">use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
      let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
      ];

      for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
      }
    });

    // 主线程中,通过 iter 的方式持续接收消息
    for received in rx {
      println!("Got: {}", received);
    }
}</pre></div>
<p>由于接收端可以被当做迭代器来使用,<strong>当所有发送端都关闭</strong>时,<code>for</code> 循环会自动结束。这段程序会像下面这样依次打印每条消息:</p>
<blockquote><p>Got: hi<br />Got: from<br />Got: the<br />Got: thread</p></blockquote>
<p class="maodian"></p><h3>5. 多个发送端(Multiple Producer)</h3>
<p><code>mpsc</code> 的含义之一就是 Multiple Producer。如果我们希望有<strong>多个不同的子线程</strong>来发送消息给同一个接收端,只需要克隆发送端即可。如下:</p>
<div class="jb51code"><pre class="brush:bash;">use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
      let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("first thread"),
      ];

      for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
      }
    });

    thread::spawn(move || {
      let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
      ];

      for val in vals {
            tx.send(val).unwrap(); // 这里使用原先的 tx
            thread::sleep(Duration::from_secs(1));
      }
    });

    for received in rx {
      println!("Got: {}", received);
    }
}</pre></div>
<ul><li>第一个子线程使用 <code>tx1</code>;</li><li>第二个子线程使用原本的 <code>tx</code>;</li></ul>
<p>所有发送过来的数据都将流向同一个 <code>rx</code>(接收端)。运行结果每次可能都不一样,因为线程的调度顺序不可预测,这也正是并发编程&ldquo;有趣&rdquo;且需要谨慎之处。</p>
<p class="maodian"></p><h2>三、总结</h2>
<ol><li><strong>创建通道</strong>:使用 <code>mpsc::channel()</code> 创建通道,获得 <code>(tx, rx)</code>(发送端和接收端)。</li><li><strong>发送数据</strong>:<code>tx.send(data)</code> 会转移 <code>data</code> 的所有权,若接收端已关闭,<code>send</code> 会返回错误。</li><li><strong>接收数据</strong>:<code>rx.recv()</code> 会阻塞等待数据;<code>rx.try_recv()</code> 则不会阻塞,可用于非阻塞检查。也可将 <code>rx</code> 当做迭代器使用,以便持续接收数据,直到通道被关闭。</li><li><strong>所有权规则保障安全</strong>:发送端在 <code>send</code> 时会移动数据的所有权,避免了多线程中对同一数据的潜在不安全访问。</li><li><strong>多发送端</strong>:通过克隆发送端(<code>tx.clone()</code>),多个线程可以各自发送数据到同一个接收端,从而实现复杂的多生产者单消费者架构。</li></ol>
<p>Rust 的通道借助所有权系统,帮助我们轻松规避了许多并发陷阱。通过消息传递的思路,不再需要小心翼翼地管理锁和共享数据,编程思路也往往更加清晰简洁。在实际项目中,若需要多个线程之间相互通信,不妨考虑一下通道(channel)方案,也许能带来更加优雅和可靠的并发架构。</p>
<p>以上为个人经验,希望能给大家一个参考,也希望大家多多支持琼殿技术社区。</p>
                           
                            <div class="art_xg">
                              <b>您可能感兴趣的文章:</b><ul><li>浅析Rust多线程中如何安全的使用变量</li><li>Rust&nbsp;多线程编程的实现</li><li>Rust使用Channel实现跨线程传递数据</li><li>rust 创建多线程web server的详细过程</li><li>Rust多线程Web服务器搭建过程</li><li>Rust如何使用线程同时运行代码</li></ul>
                            </div>

                        </div>
                        <!--endmain-->
頁: [1]
查看完整版本: Rust并发编程之使用消息传递进行线程间数据共享方式