C++实现生产者与消费者模式方式
<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li>多线程工作池</li><li>线程安全保障</li><li>任务分发逻辑</li><li>总结</li></ul></div><p class="maodian"></p><h2>多线程工作池</h2><ul><li>创建<code>workerCount</code>个工作线程(示例中为 3 个),每个线程执行相同的<code>workerLoop</code>逻辑。</li><li>线程通过<code>condition_variable</code>竞争任务队列中的任务,确保任务被均匀分发。</li></ul>
<p class="maodian"></p><h2>线程安全保障</h2>
<ul><li>任务队列的读写仍通过<code>std::mutex</code>保护,避免多线程竞争导致的数据错乱。</li><li><code>cv.notify_one()</code>每次唤醒一个线程处理任务,<code>cv.notify_all()</code>在停止时唤醒所有线程退出。</li></ul>
<p class="maodian"></p><h2>任务分发逻辑</h2>
<ul><li>生产者(主线程)提交任务时,通过<code>notify_one()</code>唤醒空闲线程,避免线程忙等。</li><li>多个工作线程同时消费任务,提升任务处理效率(尤其适合 CPU/IO 密集型任务)。</li></ul>
<div class="jb51code"><pre class="brush:cpp;">#include <iostream>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <chrono>
#include <thread>
#include <vector>
// 任务队列类型
using Task = std::function<void()>;
int main() {
std::mutex mtx;
std::condition_variable cv;
std::queue<Task> taskQueue;
bool stop = false; // 退出标志
const size_t workerCount = 3; // 工作线程数量
std::vector<std::thread> workers; // 工作线程列表
// ========== 工作线程循环:多线程消费任务 ==========
auto workerLoop = [&](int workerId) {
while (true) {
Task task;
// 加锁,获取任务或检测退出信号
{
std::unique_lock<std::mutex> lock(mtx);
// 等待条件:有任务 或 需要停止
cv.wait(lock, [&]() {
return !taskQueue.empty() || stop;
});
// 若停止且任务队列为空,退出循环
if (stop && taskQueue.empty()) {
std::cout << "[线程" << workerId << "] 退出工作循环..." << std::endl;
break;
}
// 取出队列头部任务(多线程竞争,确保线程安全)
task = std::move(taskQueue.front());
taskQueue.pop();
std::cout << "[线程" << workerId << "] 取出任务,准备执行..." << std::endl;
} // 解锁,避免执行任务时持有锁
// 执行任务
if (task) {
task();
}
}
};
// ========== 创建多个工作线程 ==========
for (int i = 0; i < workerCount; ++i) {
workers.emplace_back(workerLoop, i);
}
// ========== 模拟提交任务(生产者逻辑) ==========
auto submitTask = [&](Task task) {
std::lock_guard<std::mutex> lock(mtx);
taskQueue.push(std::move(task));
std::cout << "提交任务,当前队列大小:" << taskQueue.size() << std::endl;
cv.notify_one(); // 唤醒一个等待的工作线程
};
// 批量提交10个任务
for (int i = 0; i < 10; ++i) {
submitTask(() {
std::cout << "执行任务" << i << ":线程ID=" << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟任务耗时
});
}
// 等待所有任务执行(可选,也可通过队列状态判断)
std::this_thread::sleep_for(std::chrono::seconds(3));
// ========== 停止所有工作线程 ==========
{
std::lock_guard<std::mutex> lock(mtx);
stop = true;
cv.notify_all(); // 唤醒所有等待的线程,确保全部退出
std::cout << "\n通知所有线程停止..." << std::endl;
}
// 等待所有工作线程结束
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
std::cout << "程序结束" << std::endl;
return 0;
}</pre></div>
<p class="maodian"></p><h2>总结</h2>
<p>以上为个人经验,希望能给大家一个参考,也希望大家多多支持琼殿技术社区。</p>
<div class="art_xg">
<b>您可能感兴趣的文章:</b><ul><li>C++实现简单的生产者-消费者队列详解</li></ul>
</div>
</div>
<!--endmain-->
頁:
[1]