wordpress前台登陆界面,谷歌seo优化公司,商务网站建设实训心得体会,海南三亚做网站Linux生产者消费者模型 Linux生产者消费者模型详解生产者消费者模型生产者消费者模型的概念生产者消费者模型的特点生产者消费者模型优点 基于BlockingQueue的生产者消费者模型基于阻塞队列的生产者消费者模型模拟实现基于阻塞队列的生产消费模型基础实现生产者消费者步调调整条… Linux生产者消费者模型 Linux生产者消费者模型详解生产者消费者模型生产者消费者模型的概念生产者消费者模型的特点生产者消费者模型优点 基于BlockingQueue的生产者消费者模型基于阻塞队列的生产者消费者模型模拟实现基于阻塞队列的生产消费模型基础实现生产者消费者步调调整条件唤醒优化基于计算任务的扩展 总结 Linux生产者消费者模型详解 生产者消费者模型
生产者消费者模型的概念
生产者消费者模型通过一个容器解决生产者与消费者的强耦合问题。
通信方式生产者不直接与消费者交互而是将数据放入容器消费者从容器取数据。容器作用缓冲区解耦生产者与消费者平衡双方处理能力。
生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的经典场景具有以下特点
三种关系 生产者与生产者互斥竞争容器访问。消费者与消费者互斥竞争容器访问。生产者与消费者互斥共享容器同步生产消费顺序。 两种角色生产者与消费者线程或进程。一个交易场所内存缓冲区如队列。
互斥原因容器是临界资源需用互斥锁保护多线程竞争访问。 同步原因
容器满时生产者需等待避免生产失败。容器空时消费者需等待避免消费失败。同步确保有序访问防止饥饿提高效率。
注意互斥保证数据正确性同步实现线程协作。
生产者消费者模型优点
解耦生产者与消费者独立运行通过容器间接交互。支持并发生产者生产时消费者可同时消费。支持忙闲不均容器缓冲数据平衡处理速度差异。
对比函数调用紧耦合生产者消费者模型是松耦合设计生产者无需等待消费者处理。 基于BlockingQueue的生产者消费者模型
基于阻塞队列的生产者消费者模型
在多线程编程中**阻塞队列Blocking Queue**是实现生产者消费者模型的常用数据结构。
与普通队列的区别 队列空时取元素操作阻塞直到有数据。队列满时放元素操作阻塞直到有空间。 应用场景类似管道通信。
模拟实现基于阻塞队列的生产消费模型
基础实现
以单生产者、单消费者为例使用C queue 实现阻塞队列
BlockQueue.hpp
#pragma once
#include iostream
#include pthread.h
#include queue#define NUM 5templateclass T
class BlockQueue {
private:bool IsFull() { return _q.size() _cap; }bool IsEmpty() { return _q.empty(); }
public:BlockQueue(int cap NUM) : _cap(cap) {pthread_mutex_init(_mutex, nullptr);pthread_cond_init(_full, nullptr);pthread_cond_init(_empty, nullptr);}~BlockQueue() {pthread_mutex_destroy(_mutex);pthread_cond_destroy(_full);pthread_cond_destroy(_empty);}void Push(const T data) {pthread_mutex_lock(_mutex);while (IsFull()) {pthread_cond_wait(_full, _mutex); // 队列满等待}_q.push(data);pthread_mutex_unlock(_mutex);pthread_cond_signal(_empty); // 唤醒消费者}void Pop(T data) {pthread_mutex_lock(_mutex);while (IsEmpty()) {pthread_cond_wait(_empty, _mutex); // 队列空等待}data _q.front();_q.pop();pthread_mutex_unlock(_mutex);pthread_cond_signal(_full); // 唤醒生产者}
private:std::queueT _q; // 阻塞队列int _cap; // 容量上限pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _full; // 满条件变量pthread_cond_t _empty; // 空条件变量
};main.cpp
#include BlockQueue.hpp
#include unistd.hvoid* Producer(void* arg) {BlockQueueint* bq (BlockQueueint*)arg;while (true) {sleep(1);int data rand() % 100 1;bq-Push(data);std::cout Producer: data std::endl;}return nullptr;
}
void* Consumer(void* arg) {BlockQueueint* bq (BlockQueueint*)arg;while (true) {sleep(1);int data;bq-Pop(data);std::cout Consumer: data std::endl;}return nullptr;
}
int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueueint* bq new BlockQueueint;pthread_create(producer, nullptr, Producer, bq);pthread_create(consumer, nullptr, Consumer, bq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}说明
单生产者单消费者无需维护生产者间或消费者间的互斥。互斥_mutex 保护队列。同步_full 和 _empty 条件变量控制生产消费顺序。条件判断用 while 防止伪唤醒。运行结果生产者与消费者步调一致每秒交替生产消费。
生产者消费者步调调整 生产快消费慢 void* Producer(void* arg) {BlockQueueint* bq (BlockQueueint*)arg;while (true) {int data rand() % 100 1;bq-Push(data);std::cout Producer: data std::endl;}
}
void* Consumer(void* arg) {BlockQueueint* bq (BlockQueueint*)arg;while (true) {sleep(1);int data;bq-Pop(data);std::cout Consumer: data std::endl;}
}生产者快速填满队列后等待消费者消费一个后唤醒生产者后续步调一致。 生产慢消费快 void* Producer(void* arg) {BlockQueueint* bq (BlockQueueint*)arg;while (true) {sleep(1);int data rand() % 100 1;bq-Push(data);std::cout Producer: data std::endl;}
}
void* Consumer(void* arg) {BlockQueueint* bq (BlockQueueint*)arg;while (true) {int data;bq-Pop(data);std::cout Consumer: data std::endl;}
}消费者初始等待生产者生产消费后继续等待步调随生产者。
条件唤醒优化
调整唤醒条件例如队列数据量超一半时唤醒消费者小于一半时唤醒生产者
void Push(const T data) {pthread_mutex_lock(_mutex);while (IsFull()) {pthread_cond_wait(_full, _mutex);}_q.push(data);if (_q.size() _cap / 2) {pthread_cond_signal(_empty); // 超一半唤醒消费者}pthread_mutex_unlock(_mutex);
}
void Pop(T data) {pthread_mutex_lock(_mutex);while (IsEmpty()) {pthread_cond_wait(_empty, _mutex);}data _q.front();_q.pop();if (_q.size() _cap / 2) {pthread_cond_signal(_full); // 少于一半唤醒生产者}pthread_mutex_unlock(_mutex);
}效果生产者快速填满队列后等待消费者消费至一半以下才唤醒生产者。
基于计算任务的扩展
将队列存储类型改为任务类扩展功能
Task.hpp
#pragma once
#include iostreamclass Task {
public:Task(int x 0, int y 0, char op 0) : _x(x), _y(y), _op(op) {}void Run() {int result 0;switch (_op) {case : result _x _y; break;case -: result _x - _y; break;case *: result _x * _y; break;case /: if (_y 0) { std::cout Warning: div zero! std::endl; result -1; }else { result _x / _y; } break;case %: if (_y 0) { std::cout Warning: mod zero! std::endl; result -1; }else { result _x % _y; } break;default: std::cout error operation! std::endl; break;}std::cout _x _op _y result std::endl;}
private:int _x, _y;char _op;
};main.cpp
#include BlockQueue.hpp
#include Task.hppvoid* Producer(void* arg) {BlockQueueTask* bq (BlockQueueTask*)arg;const char* ops -*/%;while (true) {int x rand() % 100;int y rand() % 100;char op ops[rand() % 5];Task t(x, y, op);bq-Push(t);std::cout Producer task done std::endl;}return nullptr;
}
void* Consumer(void* arg) {BlockQueueTask* bq (BlockQueueTask*)arg;while (true) {sleep(1);Task t;bq-Pop(t);t.Run();}return nullptr;
}
int main() {srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueueTask* bq new BlockQueueTask;pthread_create(producer, nullptr, Producer, bq);pthread_create(consumer, nullptr, Consumer, bq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bq;return 0;
}功能生产者生成计算任务消费者执行计算并输出结果。扩展性通过定义不同 Task 类实现多样化任务处理。 总结
模型核心通过容器解耦生产者与消费者支持并发与忙闲不均。实现关键阻塞队列结合互斥锁与条件变量确保互斥与同步。灵活性可调整步调、唤醒条件或扩展为复杂任务处理。