查看: 65|回覆: 0

[教程] C++实现生产者与消费者模式方式

[複製鏈接]

6

主題

0

回帖

0

積分

热心网友

金币
0
閲讀權限
220
精華
0
威望
0
贡献
0
在線時間
0 小時
註冊時間
2010-10-25
發表於 2025-12-28 09:24:44 | 顯示全部樓層 |閲讀模式

多线程工作池

  • 创建workerCount个工作线程(示例中为 3 个),每个线程执行相同的workerLoop逻辑。
  • 线程通过condition_variable竞争任务队列中的任务,确保任务被均匀分发。

线程安全保障

  • 任务队列的读写仍通过std::mutex保护,避免多线程竞争导致的数据错乱。
  • cv.notify_one()每次唤醒一个线程处理任务,cv.notify_all()在停止时唤醒所有线程退出。

任务分发逻辑

  • 生产者(主线程)提交任务时,通过notify_one()唤醒空闲线程,避免线程忙等。
  • 多个工作线程同时消费任务,提升任务处理效率(尤其适合 CPU/IO 密集型任务)。
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>
#include <vector>

// 任务队列类型
using Task = std::function<void()>;

int main() {
    std::mutex mtx;
    std::condition_variable cv;
    std::queue<Task> taskQueue;
    bool stop = false; // 退出标志
    const size_t workerCount = 3; // 工作线程数量
    std::vector<std::thread> workers; // 工作线程列表

    // ========== 工作线程循环:多线程消费任务 ==========
    auto workerLoop = [&](int workerId) {
        while (true) {
            Task task;

            // 加锁,获取任务或检测退出信号
            {
                std::unique_lock<std::mutex> lock(mtx);
                
                // 等待条件:有任务 或 需要停止
                cv.wait(lock, [&]() {
                    return !taskQueue.empty() || stop;
                });

                // 若停止且任务队列为空,退出循环
                if (stop && taskQueue.empty()) {
                    std::cout << "[线程" << workerId << "] 退出工作循环..." << std::endl;
                    break;
                }

                // 取出队列头部任务(多线程竞争,确保线程安全)
                task = std::move(taskQueue.front());
                taskQueue.pop();
                std::cout << "[线程" << workerId << "] 取出任务,准备执行..." << std::endl;
            } // 解锁,避免执行任务时持有锁

            // 执行任务
            if (task) {
                task();
            }
        }
    };

    // ========== 创建多个工作线程 ==========
    for (int i = 0; i < workerCount; ++i) {
        workers.emplace_back(workerLoop, i);
    }

    // ========== 模拟提交任务(生产者逻辑) ==========
    auto submitTask = [&](Task task) {
        std::lock_guard<std::mutex> lock(mtx);
        taskQueue.push(std::move(task));
        std::cout << "提交任务,当前队列大小:" << taskQueue.size() << std::endl;
        cv.notify_one(); // 唤醒一个等待的工作线程
    };

    // 批量提交10个任务
    for (int i = 0; i < 10; ++i) {
        submitTask(() {
            std::cout << "执行任务" << i << ":线程ID=" << std::this_thread::get_id() << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟任务耗时
        });
    }

    // 等待所有任务执行(可选,也可通过队列状态判断)
    std::this_thread::sleep_for(std::chrono::seconds(3));

    // ========== 停止所有工作线程 ==========
    {
        std::lock_guard<std::mutex> lock(mtx);
        stop = true;
        cv.notify_all(); // 唤醒所有等待的线程,确保全部退出
        std::cout << "\n通知所有线程停止..." << std::endl;
    }

    // 等待所有工作线程结束
    for (auto& worker : workers) {
        if (worker.joinable()) {
            worker.join();
        }
    }

    std::cout << "程序结束" << std::endl;
    return 0;
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持琼殿技术社区。

您可能感兴趣的文章:
  • C++实现简单的生产者-消费者队列详解
回覆

使用道具 舉報

您需要登錄後才可以回帖 登錄 | 立即注册

本版積分規則

相关侵权、举报、投诉及建议等,请发 E-mail:qiongdian@foxmail.com

Powered by Discuz! X5.0 © 2001-2026 Discuz! Team.

在本版发帖返回顶部