华丽月色 發表於 2025-12-28 09:24:44

C++实现生产者与消费者模式方式

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li>多线程工作池</li><li>线程安全保障</li><li>任务分发逻辑</li><li>总结</li></ul></div><p class="maodian"></p><h2>多线程工作池</h2>
<ul><li>创建<code>workerCount</code>个工作线程(示例中为 3 个),每个线程执行相同的<code>workerLoop</code>逻辑。</li><li>线程通过<code>condition_variable</code>竞争任务队列中的任务,确保任务被均匀分发。</li></ul>
<p class="maodian"></p><h2>线程安全保障</h2>
<ul><li>任务队列的读写仍通过<code>std::mutex</code>保护,避免多线程竞争导致的数据错乱。</li><li><code>cv.notify_one()</code>每次唤醒一个线程处理任务,<code>cv.notify_all()</code>在停止时唤醒所有线程退出。</li></ul>
<p class="maodian"></p><h2>任务分发逻辑</h2>
<ul><li>生产者(主线程)提交任务时,通过<code>notify_one()</code>唤醒空闲线程,避免线程忙等。</li><li>多个工作线程同时消费任务,提升任务处理效率(尤其适合 CPU/IO 密集型任务)。</li></ul>
<div class="jb51code"><pre class="brush:cpp;">#include &lt;iostream&gt;
#include &lt;mutex&gt;
#include &lt;condition_variable&gt;
#include &lt;queue&gt;
#include &lt;functional&gt;
#include &lt;chrono&gt;
#include &lt;thread&gt;
#include &lt;vector&gt;

// 任务队列类型
using Task = std::function&lt;void()&gt;;

int main() {
    std::mutex mtx;
    std::condition_variable cv;
    std::queue&lt;Task&gt; taskQueue;
    bool stop = false; // 退出标志
    const size_t workerCount = 3; // 工作线程数量
    std::vector&lt;std::thread&gt; workers; // 工作线程列表

    // ========== 工作线程循环:多线程消费任务 ==========
    auto workerLoop = [&amp;](int workerId) {
      while (true) {
            Task task;

            // 加锁,获取任务或检测退出信号
            {
                std::unique_lock&lt;std::mutex&gt; lock(mtx);
               
                // 等待条件:有任务 或 需要停止
                cv.wait(lock, [&amp;]() {
                  return !taskQueue.empty() || stop;
                });

                // 若停止且任务队列为空,退出循环
                if (stop &amp;&amp; taskQueue.empty()) {
                  std::cout &lt;&lt; "[线程" &lt;&lt; workerId &lt;&lt; "] 退出工作循环..." &lt;&lt; std::endl;
                  break;
                }

                // 取出队列头部任务(多线程竞争,确保线程安全)
                task = std::move(taskQueue.front());
                taskQueue.pop();
                std::cout &lt;&lt; "[线程" &lt;&lt; workerId &lt;&lt; "] 取出任务,准备执行..." &lt;&lt; std::endl;
            } // 解锁,避免执行任务时持有锁

            // 执行任务
            if (task) {
                task();
            }
      }
    };

    // ========== 创建多个工作线程 ==========
    for (int i = 0; i &lt; workerCount; ++i) {
      workers.emplace_back(workerLoop, i);
    }

    // ========== 模拟提交任务(生产者逻辑) ==========
    auto submitTask = [&amp;](Task task) {
      std::lock_guard&lt;std::mutex&gt; lock(mtx);
      taskQueue.push(std::move(task));
      std::cout &lt;&lt; "提交任务,当前队列大小:" &lt;&lt; taskQueue.size() &lt;&lt; std::endl;
      cv.notify_one(); // 唤醒一个等待的工作线程
    };

    // 批量提交10个任务
    for (int i = 0; i &lt; 10; ++i) {
      submitTask(() {
            std::cout &lt;&lt; "执行任务" &lt;&lt; i &lt;&lt; ":线程ID=" &lt;&lt; std::this_thread::get_id() &lt;&lt; std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟任务耗时
      });
    }

    // 等待所有任务执行(可选,也可通过队列状态判断)
    std::this_thread::sleep_for(std::chrono::seconds(3));

    // ========== 停止所有工作线程 ==========
    {
      std::lock_guard&lt;std::mutex&gt; lock(mtx);
      stop = true;
      cv.notify_all(); // 唤醒所有等待的线程,确保全部退出
      std::cout &lt;&lt; "\n通知所有线程停止..." &lt;&lt; std::endl;
    }

    // 等待所有工作线程结束
    for (auto&amp; worker : workers) {
      if (worker.joinable()) {
            worker.join();
      }
    }

    std::cout &lt;&lt; "程序结束" &lt;&lt; std::endl;
    return 0;
}</pre></div>
<p class="maodian"></p><h2>总结</h2>
<p>以上为个人经验,希望能给大家一个参考,也希望大家多多支持琼殿技术社区。</p>
                           
                            <div class="art_xg">
                              <b>您可能感兴趣的文章:</b><ul><li>C++实现简单的生产者-消费者队列详解</li></ul>
                            </div>

                        </div>
                        <!--endmain-->
頁: [1]
查看完整版本: C++实现生产者与消费者模式方式