溯源 發表於 2025-9-23 09:03:48

Rust 中的 Tokio 线程同步机制详解

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li>Rust 中的 Tokio 线程同步机制</li><li>1. Mutex</li><li>2. RwLock</li><li>3. Barrier</li><li>4. Semaphore</li><li>5. Notify</li><li>6. oneshot 和 mpsc 通道</li><li>oneshot</li><li>mpsc</li><li>7. watch 通道</li><li>总结</li></ul></div><p class="maodian"></p><h3>Rust 中的 Tokio 线程同步机制</h3>
<p>在并发编程中,线程同步是一个重要的概念,用于确保多个线程在访问共享资源时能够正确地协调。Tokio 是一个强大的异步运行时库,为 Rust 提供了多种线程同步机制。以下是一些常见的同步机制:</p>
<ul><li><strong>Mutex</strong></li><li><strong>RwLock</strong></li><li><strong>Barrier</strong></li><li><strong>Semaphore</strong></li><li><strong>Notify</strong></li><li><strong>oneshot 和 mpsc 通道</strong></li><li><strong>watch 通道</strong></li></ul>
<p class="maodian"></p><h3>1. Mutex</h3>
<p><code>Mutex</code>(互斥锁)是最常见的同步原语之一,用于保护共享数据。它确保同一时间只有一个线程能够访问数据,从而避免竞争条件。</p>
<div class="jb51code"><pre class="brush:plain;">use tokio::sync::Mutex;
use std::sync::Arc;

#
async fn main() {
    let data = Arc::new(Mutex::new(0));

    let mut handles = vec![];
    for _ in 0..10 {
      let data = data.clone();
      let handle = tokio::spawn(async move {
            let mut lock = data.lock().await;
            *lock += 1;
      });
      handles.push(handle);
    }

    for handle in handles {
      handle.await.unwrap();
    }

    println!("Result: {}", *data.lock().await);
}</pre></div>
<p class="maodian"></p><h3>2. RwLock</h3>
<p><code>RwLock</code>(读写锁)允许多线程同时读取数据,但只允许一个线程写入数据。它比&nbsp;<code>Mutex</code>&nbsp;更加灵活,因为在读取多于写入的场景下,它能提高性能。功能上,他是读写互斥、写写互斥、读读兼容。</p>
<div class="jb51code"><pre class="brush:plain;">use tokio::sync::RwLock;
use std::sync::Arc;

#
async fn main() {
    let data = Arc::new(RwLock::new(0));

    let read_data = data.clone();
    let read_handle = tokio::spawn(async move {
      let lock = read_data.read().await;
      println!("Read: {}", *lock);
    });

    let write_data = data.clone();
    let write_handle = tokio::spawn(async move {
      let mut lock = write_data.write().await;
      *lock += 1;
      println!("Write: {}", *lock);
    });

    read_handle.await.unwrap();
    write_handle.await.unwrap();
}</pre></div>
<p class="maodian"></p><h3>3. Barrier</h3>
<p><code>Barrier</code>&nbsp;是一种同步机制,允许多个线程在某个点上进行同步。当线程到达屏障时,它们会等待直到所有线程都到达,然后一起继续执行。</p>
<div class="jb51code"><pre class="brush:plain;">use tokio::sync::Barrier;
use std::sync::Arc;

#
async fn main() {
    let barrier = Arc::new(Barrier::new(3));

    let mut handles = vec![];
    for i in 0..3 {
      let barrier = barrier.clone();
      let handle = tokio::spawn(async move {
            println!("Before wait: {}", i);
            barrier.wait().await;
            println!("After wait: {}", i);
      });
      handles.push(handle);
    }

    for handle in handles {
      handle.await.unwrap();
    }
}</pre></div>
<p class="maodian"></p><h3>4. Semaphore</h3>
<p><code>Semaphore</code>(信号量)是一种用于控制对资源访问的同步原语。它允许多个线程访问资源,但有一个最大并发数限制。</p>
<div class="jb51code"><pre class="brush:plain;">#
async fn test_sem() {
    let semaphore = Arc::new(Semaphore::new(3));

    let mut handles = vec![];
    for i in 0..5 {
      let semaphore = semaphore.clone();
      let handle = tokio::spawn(async move {
            let permit = semaphore.acquire().await.unwrap();
            let now = Local::now();
            println!("Got permit: {} at {:?}", i, now);
            println!(
                "Semaphore available permits before sleep: {}",
                semaphore.available_permits()
            );
            sleep(Duration::from_secs(5)).await;
            drop(permit);
            println!(
                "Semaphore available permits after sleep: {}",
                semaphore.available_permits()
            );
      });
      handles.push(handle);
    }

    for handle in handles {
      handle.await.unwrap();
    }
}</pre></div>
<p><strong>最终的结果如下</strong></p>
<blockquote><p>Got permit: 0 at 2024-08-08T21:03:04.374666+08:00<br />Semaphore available permits before sleep: 2<br />Got permit: 1 at 2024-08-08T21:03:04.375527800+08:00<br />Semaphore available permits before sleep: 1<br />Got permit: 2 at 2024-08-08T21:03:04.375563+08:00<br />Semaphore available permits before sleep: 0<br />Semaphore available permits after sleep: 0<br />Semaphore available permits after sleep: 0<br />Semaphore available permits after sleep: 1<br />Got permit: 3 at 2024-08-08T21:03:09.376722800+08:00<br />Semaphore available permits before sleep: 1<br />Got permit: 4 at 2024-08-08T21:03:09.376779200+08:00<br />Semaphore available permits before sleep: 1<br />Semaphore available permits after sleep: 2<br />Semaphore available permits after sleep: 3</p></blockquote>
<p class="maodian"></p><h3>5. Notify</h3>
<p><code>Notify</code>&nbsp;是一种用于线程间通知的简单机制。它允许一个线程通知其他线程某些事件的发生。</p>
<div class="jb51code"><pre class="brush:plain;">use tokio::sync::Notify;
use std::sync::Arc;

#
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();

    let handle = tokio::spawn(async move {
      notify_clone.notified().await;
      println!("Received notification");
    });

    notify.notify_one();
    handle.await.unwrap();
}</pre></div>
<p class="maodian"></p><h3>6. oneshot 和 mpsc 通道</h3>
<p><code>oneshot</code>&nbsp;通道用于一次性发送消息,而&nbsp;<code>mpsc</code>&nbsp;通道则允许多个生产者发送消息到一个消费者。一般地onshot用于异常通知、启动分析等功能。mpsc用于实现异步消息同步</p>
<p class="maodian"></p><h3>oneshot</h3>
<div class="jb51code"><pre class="brush:plain;">use tokio::sync::oneshot;

#
async fn main() {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
      tx.send("Hello, world!").unwrap();
    });

    let message = rx.await.unwrap();
    println!("Received: {}", message);
}</pre></div>
<p class="maodian"></p><h3>mpsc</h3>
<div class="jb51code"><pre class="brush:plain;">use tokio::sync::mpsc;

#
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    tokio::spawn(async move {
      tx.send("Hello, world!").await.unwrap();
    });

    while let Some(message) = rx.recv().await {
      println!("Received: {}", message);
    }
}</pre></div>
<p class="maodian"></p><h3>7. watch 通道</h3>
<p><code>watch</code>&nbsp;通道用于发送和接收共享状态的更新。它允许多个消费者监听状态的变化。</p>
<div class="jb51code"><pre class="brush:plain;">use tokio::sync::watch;

#
async fn main() {
    let (tx, mut rx) = watch::channel("initial");

    tokio::spawn(async move {
      tx.send("updated").unwrap();
    });

    while rx.changed().await.is_ok() {
      println!("Received: {}", *rx.borrow());
    }
}</pre></div>
<p>​<strong>watch通道</strong>​:</p>
<ul><li><strong>用于广播状态更新,一个生产者更新状态,多个消费者获取最新状态。</strong></li><li><strong>适合配置变更、状态同步等场景。</strong></li></ul>
<p>​<strong>mpsc通道</strong>​:</p>
<ul><li><strong>用于传递消息队列,多个生产者发送消息,一个消费者逐条处理。</strong></li><li><strong>适合任务队列、事件驱动等场景。</strong></li></ul>
<p class="maodian"></p><h3>总结</h3>
<p><strong>Rust 中的 Tokio 提供了丰富的线程同步机制,可以根据具体需求选择合适的同步原语。常用的同步机制包括:</strong></p>
<ul><li><code>Mutex</code>:互斥锁,保护共享数据。</li><li><code>RwLock</code>:读写锁,允许并发读,写时独占。</li><li><code>Barrier</code>:屏障,同步多个线程在某一点。</li><li><code>Semaphore</code>:信号量,控制并发访问资源。</li><li><code>Notify</code>:通知机制,用于线程间通知。</li><li><code>oneshot</code>&nbsp;和&nbsp;<code>mpsc</code>&nbsp;通道:消息传递机制。</li><li><code>watch</code>&nbsp;通道:状态更新机制。</li></ul>
<p><strong>通过这些同步机制,可以在 Rust 中编写高效、安全的并发程序。</strong></p>
<p>到此这篇关于Rust 中的 Tokio 线程同步机制的文章就介绍到这了,更多相关rust tokio线程同步内容请搜索琼殿技术社区以前的文章或继续浏览下面的相关文章希望大家以后多多支持琼殿技术社区!</p>
                           
                            <div class="art_xg">
                              <b>您可能感兴趣的文章:</b><ul><li>Rust语言从入门到精通之Tokio的Channel深入理解</li><li>Rust中多线程 Web 服务器的项目实战</li><li>Rust&nbsp;中单线程&nbsp;Web&nbsp;服务器的实现</li><li>浅析Rust多线程中如何安全的使用变量</li><li>Rust&nbsp;多线程编程的实现</li><li>rust 创建多线程web server的详细过程</li></ul>
                            </div>

                        </div>
                        <!--endmain-->
頁: [1]
查看完整版本: Rust 中的 Tokio 线程同步机制详解