做债的网站,黑色赚钱的网站,如何做 行业社交类网站,wordpress手机端兼容目录
前言#xff1a;
一、什么是生产者消费者模型
#xff08;一#xff09;概念
#xff08;二#xff09;生产者消费者之间的关系
#xff08;三#xff09;生产者消费者模型特点
#xff08;四#xff09;生产者消费者模型的优点
二、基于阻塞队列实现生产…目录
前言
一、什么是生产者消费者模型
一概念
二生产者消费者之间的关系
三生产者消费者模型特点
四生产者消费者模型的优点
二、基于阻塞队列实现生产者消费者模型
一阻塞队列
二单生产单消费模型
三多生产多消费模型 三、POSIX信号量
一信号量相关操作
五、基于环形队列实现生产者消费者模型
一环形队列
二单生产单消费模型
三多生产多消费模型 前言
生产者消费者模型Producer-consumer problem在软件开发中有着广泛的应用特别是在并发编程和多线程环境中。通过使用这个模型可以更加有效地管理资源、提高系统的吞吐量和响应速度并降低因并发访问导致的冲突和错误。
一、什么是生产者消费者模型
一概念
生产者消费者模型是操作系统中一种重要的模型主要描述的是等待和通知的机制。其是通过一个容器来解决生产者与消费者的强耦合关系生产者与消费者之间不直接进行通讯而是利用容器来进行通讯。
在这个模型中生产者负责生产数据或物品并将其放入一个共享的缓冲区或仓库中消费者则从该缓冲区或仓库中取出数据或物品进行消费。
这个模型的关键在于平衡生产者和消费者的处理能力确保数据或物品的流畅生产和消费。 我们再借助一个例子进行深刻理解
超市的工作模式
超市从工厂进货工厂需要向超市提供商品顾客在超市选购超市需要向顾客提供商品
超市盈利的关键在于 平衡顾客与工厂间的供需关系
简单来说就是要做到 顾客可以在超市买到想要购买的商品工厂也能同超市完成足量的需求订单满足条件后超市就可以盈利了超市盈利的同时可以给供给双方带来便利。
顾客不需要跑到工厂购买商品工厂也不需要将商品配送到顾客手中
通过引入超市这个中介解决了生产者与消费者间的强耦合关系。
得益于 超市 做缓冲区整个 生产消费 的过程十分高效即便顾客没有在超市中找到想要的商品也可借助超市之手向工厂进行反映从而生产对应的商品即 允许生产消费步调不一。
现实中的 超市工作模式 就是一个生动形象的 生产者消费者模型
顾客 - 消费者工厂 - 生产者超市 - 交易场所容器
生产者消费者模型的本质忙闲不均
其中的 交易场所 是进行 生产消费 的容器起到了平衡生产和消费速率的作用。通常是一种特定的 缓冲区常见的有 阻塞队列 和 环形队列。
超市不可能只面向一个顾客及一个工厂交易场所 也是如此会被多个 生产者消费者多个线程 看到也就是说 交易场所 注定是一个共享资源在多线程环境中需要保证 共享资源被多线程并发访问时的安全。 二生产者消费者之间的关系
在模型中生产者和生产者之间、消费者和消费者之间以及生产者和消费者之间都存在互斥关系或者同步关系。 在生产者消费者模型中无论是生产者还是消费者他们都需要面向共享的缓冲区或仓库进行操作。当多个生产者或消费者试图同时访问和操作这个共享资源时为了避免数据冲突或不一致就需要使用互斥机制来确保在任意时刻只有一个生产者或消费者能够访问该资源。同时生产者和消费者之间还需要保持同步以确保当缓冲区满时生产者停止生产并等待当缓冲区空时消费者停止消费并等待。 回归现实中多个工厂供应同一种商品时为了抢占更多的市场总会通过一些促销手段来排除竞品比如经典的 泡面巨头 康师傅与统一 的大战市场超市中的货架位置是有限的在工厂竞争之下势必有一家工厂失去市场因此可以得出 生产者与生产者之前需要维持 互斥 关系。
生产者与生产者互斥 对于生产者之间他们可能会同时尝试将产品放入缓冲区。如果缓冲区容量有限多个生产者之间的竞争就会导致互斥关系的产生。 张三和李四在超市偶遇俩人同时看中了 某个新品剃须刀但最近超市货源紧张这个商品仅有一份张三李四互不谦让都在奋力争夺这个商品显然当商品只有一份时 消费者与消费者之间也需要维护 互斥 关系。
消费者与消费者互斥 同样对于消费者之间当缓冲区中的产品数量有限时多个消费者可能会同时尝试从中取出产品这也产生了互斥关系。 某天张三又来到了超市打算购买他最喜欢的 老坛酸菜牛肉面但好巧不巧超市的最后一桶 老坛酸菜牛肉面 已经售出张三只能通知超市进行备货超市老板记下了这个需求张三失落的回了家刚到家张三的肚子就饿的咕咕叫十分想念 老坛酸菜牛肉面于是火速赶往超市看看超市是否有货答案是没有。张三是一个执着的人总是反复跑到超市查看是否有货导致张三这一天什么事也干不成只想着自己的 老坛酸菜牛肉面其实张三不必这样做只需要在第一次告诉超市老板自己的需求并添加老板的联系方式让老板在商品备货完成后通知张三前来购买将商品信息同步给消费者这样可以避免张三陷入循环。同理对于工厂来说超市老板也应该添加工厂负责人的联系方式将商品信息同步给生产者也就是说 生产者与消费者之间存在 同步 关系除此之外在超市备货期间张三是不能来购买的即 生产者与消费者之间还存在 互斥 关系。
生产者与消费者同步、互斥 首先互斥关系主要源于生产者和消费者都需要访问共享的资源即缓冲区。其次同步关系则是基于生产者和消费者之间的依赖关系。生产者生产产品并将其放入缓冲区消费者从缓冲区取出产品进行消费。这意味着消费者必须等待生产者生产并放入产品后才能进行消费即生产者必须先于消费者进行。这种先后顺序的依赖关系构成了同步关系。 注意 生产者与消费者之间的互斥 关系不是必备的目的是为了让 生产、消费 之间存在顺序。
生产者消费者模型 是一个存在 生产者、消费者、交易场所 三个条件以及不同角色间的 同步、互斥 关系的高效模型。
三生产者消费者模型特点
生产者消费者模型 的最根本特点是 321原则 3 种关系 - 生产者与生产者互斥 - 消费者与消费者互斥 - 生产者与消费者互斥与同步 2 种角色 - 生产者 - 消费者 1 个交易场所 - 通常是一个特定的缓冲区阻塞队列、环形队列 注321 原则并非众所周知的概念仅供辅助记忆 生产者消费者模型 的特点。
任何 生产者消费者模型 都离不开这些必备特点 生产者与消费者间的同步关系 生产者不断生产交易场所堆满商品后需要通知消费者进行消费。消费者不断消费交易场所为空时需要通知生产者进行生产。 通知线程需要用到条件变量即维护 同步 关系。 其实之前在中学习到的 Linux 进程间通信 中的 管道 本质上就是一个天然的 生产者消费者模型因为它允许多个进程同时访问并且不会出现问题意味着它维护好了 互斥、同步 关系当写端写满管道时无法再写通知读端进行读取当管道为空时无法读取通知写端写入数据。
四生产者消费者模型的优点 解耦合通过引入缓冲区作为中介生产者消费者模型降低了生产者和消费者之间的耦合度。这使得生产者和消费者可以独立地进行设计和优化提高了系统的灵活性和可扩展性。并发性生产者消费者模型允许生产者和消费者作为两个独立的并发主体运行互不干扰。这种并发性可以充分利用系统资源提高系统的吞吐量和效率。平衡性当生产者和消费者的处理速度不一致时缓冲区可以起到平衡作用。如果生产者速度快于消费者缓冲区可以暂存多余的产品反之如果消费者速度快于生产者缓冲区可以提供产品以供消费。这种平衡性确保了系统的稳定性和可靠性。可扩展性生产者消费者模型可以方便地扩展生产者和消费者的数量。当需要增加生产或消费能力时只需添加相应的生产者或消费者即可无需对整个系统进行大规模修改。简化编程通过使用生产者消费者模型程序员可以更加专注于生产者和消费者的核心逻辑而无需过多关注它们之间的同步和互斥问题。这有助于简化编程工作降低出错的可能性。 生产者消费者模型 为何高效
生产者、消费者 可以在同一个交易场所中进行操作。生产者在生产时无需关注消费者的状态只需关注交易场所中是否有空闲位置。消费者在消费时无需关注生产者的状态只需关注交易场所中是否有就绪数据。可以根据不同的策略调整生产者于与消费者间的协同关系。
生产者消费者模型 可以根据供需关系灵活调整策略做到 忙闲不均。
生产者消费者模型通过解耦合、并发性、平衡性、可扩展性和简化编程等优点提高了系统的性能、稳定性和可靠性降低了开发和维护的复杂度。这使得它在多线程编程、并发控制、资源管理等领域得到了广泛应用。
二、基于阻塞队列实现生产者消费者模型
一阻塞队列
阻塞队列 Blocking Queue 是一种特殊的队列作为队列家族的一员它具备 先进先出 FIFO 的基本特性与普通队列不同的是 阻塞队列 的大小是固定的也就说它存在 容量 的概念。
阻塞队列可以为空也可以为满
将其带入 「生产者消费者模型」 中入队 就是 生产商品而 出队 则是 消费商品
阻塞队列为满时无法入队 - 无法生产阻塞阻塞队列为空时无法出队 - 无法消费阻塞 二单生产单消费模型
首先来实现最简单的 单生产单消费者模型首先搭好 阻塞队列类 的框架 创建 BlockingQueue.hpp 头文件 #pragma once#include queue
#include mutex
#include pthread.h
#include iostream#define BQ_SIZE 5templateclass T
class MyBlockQueue
{
public:MyBlockQueue(size_t cap BQ_SIZE):_cap(cap){// 初始化锁与条件变量pthread_mutex_init(_mtx, nullptr);pthread_cond_init(_cond, nullptr);}~MyBlockQueue(){// 销毁锁与条件变量pthread_mutex_destroy(_mtx);pthread_cond_destroy(_cond);}// 生产数据入队void push(const T indata);// 消费数据出队void pop(T *outdata);private:// 判断队列是否为满bool Isfull();// 判断队列是否为空bool Isempty();
private:std::queueT _queue;size_t _cap; // 阻塞队列的容量pthread_mutex_t _mtx;pthread_cond_t _cond;
}
如何判断阻塞队列是否为空判断 queue 是否为空 如何判断阻塞队列是否为满判断 queue 的大小是否为 _cap
使用 互斥锁 条件变量 实现互斥与同步获得工具框架后接下来搭建 生产与消费 的代码因为是 单生产、单消费只需要手动创建两个线程即可
#include unistd.h
#include iostream
#include BlockingQueue.hppvoid *Producer(void *args)
{BlockingQueueint *bq static_castBlockingQueueint*(args);while(true){// 1. 生产商品(通过某种渠道获取数据)// ...// 2. 将物品push到阻塞队列// bq-push(data);}pthread_exit((void*)0);
}void *Consumer(void *args)
{BlockingQueueint *bq static_castBlockingQueueint*(args);while(true){// 1. 从阻塞队列获取商品// b1-pop(data);// 2. 消费商品(结合某种具体业务进行处理)// ...}pthread_exit((void)*0);
}int main()
{BlockingQueueint *bq new BlockingQueueint;pthread_t pro, con;// 生产者消费者线程pthread_create(pro, nullptr, Producer, bq);pthread_create(con, nullptr, Consumer, bq);pthread_join(pro, nullptr);pthread_join(con, nullptr);delete bq;return 0;
}
注意
生产者、消费者需要看到同一个阻塞队列可以通过线程的回调函数参数进行传递其他具体实现细节仍需填充 以上就是 「生产者消费者模型」 所需要的大体框架具体细节实现可以接着往下看不过在这之前需要先理解 为什么生产、为什么消费
数据就像能量一样不会凭空产生也不会凭空消失因此生产者线程在生产 商品数据 时一定是从某种渠道获取的比如客户发出的 HTTP请求、程序猿发出的 SQL 语句、涉及复杂运算的计算任务等总之 生产者需要先获取数据才能将其放入阻塞队列中等待处理。
同理消费者线程在获取 商品数据 后也需要结合业务逻辑做出不同的动作比如根据 HTTP 请求进行响应、返回 SQL 查询结果、返回计算结果等一句话总结生产者生产商品、消费者消费商品都是需要时间的并非单纯地对阻塞队列进行操作。
这是一个十分重要的概念它能帮助我们正确看待生产者、消费者的作用这是被大多数教材忽略的重要概念。 补充细节 BlockQueue 的成员变量问题互斥锁、条件变量如何分配 在 生产者消费者模型 中有 满、空 两个条件这两个条件是 绝对互斥 的不可能同时满足 生产者关心 是否为满消费者 关心是否为空两者关注的点不一样也就是说不能只使用一个条件变量来控制两个条件而是需要 一个生产者条件变量、一个消费者条件变量 MyBlockQueue(size_t cap BQ_SIZE):_cap(cap){// 初始化锁与条件变量pthread_mutex_init(_mtx, nullptr);pthread_cond_init(_pro_cond, nullptr);pthread_cond_init(_con_cond, nullptr);}~MyBlockQueue(){// 销毁锁与条件变量pthread_mutex_destroy(_mtx);pthread_cond_destroy(_pro_cond);pthread_cond_destroy(_con_cond);}std::queueT _queue;size_t _cap; // 阻塞队列的容量pthread_mutex_t _mtx;pthread_cond_t _pro_cond;pthread_cond_t _con_cond;
创建两个条件变量是阻塞队列的精髓之一。
条件变量需要两个锁是否也需要两把呢答案是不需要因为无论是生产者 还是 消费者 它们需要看到同一个阻塞队列因此使用一把互斥锁进行保护就行了。 BlockQueue 的 push、pop 函数简单实现 首先来看看 push函数的简单实现push函数的功能就是将传入的数据 indata 添加到 阻塞队列 中因为 阻塞队列 是一个 临界资源在访问前必须加锁 // 生产数据入队void push(const T indata){// 加锁解锁pthread_mutex_lock(mtx);_queue.push(indata);pthread_mutex_unlock(mtx);}
这样写只是对 普通临界资源 的访问但这里是 阻塞队列插入数据的前提是 有空间不然是要被 阻塞 的所以我们在进行数据插入前应该先判断条件是否满足不满足就得 阻塞等待条件满足。 // 判断队列是否为满bool Isfull(){return _queue.size() _cap;}// 生产数据入队void push(const T indata){// 加锁解锁pthread_mutex_lock(mtx);// 容量满了就等待if(Isfull()){pthread_cond_wait(_pro_cond, mtx);}_queue.push(indata);pthread_mutex_unlock(mtx);}当条件不满足时生产者线程进入等待状态
这里可以解释一下为什么需要给 pthread_cond_wait 函数传入互斥锁
首先要明白判断条件是否满足是在临界区内进行的也就是说当前线程持有锁。当条件不满足时当前线程进入条件等待状态也就意味着它现在无法向后运行将锁释放。此时其他线程就得不到锁资源了程序就会进入了死锁状态。解决方法就是 将锁资源传递给 pthread_cond_wait 函数使其拥有 释放锁、获取锁 的能力这样就能保证不会出现 死锁。
这就是 同步 能解决 死锁 问题的关键因为它可以主动让出 锁资源。
过了一段时间当条件满足时消费者已经消费数据了代码从 pthread_cond_wait 函数之后继续运行生产者可以正常进行生产可以确保一定有空位一切看起来似乎很和谐但此时存在一个致命问题如果是消费者先阻塞阻塞队列为空消费不了数据生产者正常进行生产当生产满后生产者也进入了阻塞状态此时就尴尬了彼此都陷入了阻塞等待状态。
造成此问题的根本原因是生产者在生产结束后没有唤醒消费者让其进行正常消费所以在 生产完成 后需要唤醒消费者进行消费 // 生产数据入队void push(const T indata){// 加锁解锁pthread_mutex_lock(mtx);// 容量满了就等待if(Isfull()){pthread_cond_wait(_pro_cond, mtx);}_queue.push(indata);// 可以加一些策略比如生产了一半就唤醒消费者pthread_cond_signal(_con_cond);pthread_mutex_unlock(mtx);}
注意 生产者唤醒的是消费者也就是需要传递 _con_cond 当消费者没有 wait 等待生产者仍然进行唤醒时是否会出现问题 答案是不会唤醒一个没有 wait 的线程是不会有影响的同时因为唤醒线程这个操作不需要加锁保护本身就持有锁资源句柄pthread_cond_signal 函数可以放到 pthread_mutex_unlock 语句之后。 有了 push 的实现经验后pop 的实现就很简单了照葫芦画瓢简单实现如下 // 判断队列是否为空bool Isempty(){return _queue.empty();}// 消费数据出队void pop(T *outdata){// 加锁解锁pthread_mutex_lock(_mtx);// 容量为空就等待if(Isempty()){pthread_cond_wait(_con_cond, _mtx);}*outdata _queue.front();_queue.pop();// 可以加一些策略比如消费了一半就唤醒生产者pthread_cond_signal(_pro_cond);pthread_mutex_unlock(_mtx);}注意 消费者唤醒的是生产者需要传递 _pro_cond
单生产、单消费场景中的 push、pop 可以这样写其他场景就需要稍微进行修改。 cp.cc 的使用填充 因为这里没有具体的业务场景所以我们就使用 生成一个随机数 作为待插入的数据打印数字 作为获取数据后的操作
#include iostream
#include unistd.h
#include time.h
#include BlockingQueue.hpp
using namespace std;void *Producer(void *args)
{MyBlockingQueueint *bq static_castMyBlockingQueueint*(args);while(true){// 1. 生产商品(通过某种渠道获取数据)int num rand() % 10;// 2. 将物品push到阻塞队列bq-push(num);std::cout Producer 生产了一个数据: num std::endl std::endl;}pthread_exit((void*)0);
}void *Consumer(void *args)
{MyBlockingQueueint *bq static_castMyBlockingQueueint*(args);while(true){// 1. 从阻塞队列获取商品int num 0;bq-pop(num);// 2. 消费商品(结合某种具体业务进行处理)std::cout Consumer 消费了一个数据: num std::endl std::endl;}pthread_exit((void*)0);
}int main()
{MyBlockingQueueint *bq new MyBlockingQueueint();// 随机数种子srand((size_t)time(nullptr));pthread_t pro, con;// 生产者消费者线程pthread_create(pro, nullptr, Producer, bq);pthread_create(con, nullptr, Consumer, bq);pthread_join(pro, nullptr);pthread_join(con, nullptr);delete bq;return 0;
}
此时可以编译并运行程序可以看到 生产者疯狂生产消费者疯狂消费 这样不容易观察到 阻塞队列 的特点我们可以通过 睡眠 的方式模拟效果
策略1消费者每隔一秒消费一次生产者疯狂生产
应该观察到的现象是 生产者很快就把阻塞队列填满了只能阻塞等待1 秒之后消费者进行消费消费结束后唤醒生产者两者进入同步状态生产者生产一个数据、消费者消费一个数据
void *Consumer(void *args)
{MyBlockingQueueint *bq static_castMyBlockingQueueint*(args);while(true){// 消费者每隔一秒进行一次消费sleep(1);// 1. 从阻塞队列获取商品int num 0;bq-pop(num);// 2. 消费商品(结合某种具体业务进行处理)std::cout Consumer 消费了一个数据: num std::endl std::endl;}pthread_exit((void*)0);
} 策略2生产者每隔一秒生产一次消费者不断消费
预期结果为 刚开始阻塞队列为空消费者无法进行消费只能阻塞等待一秒后生产者生产了一个数据并立即通知消费者进行消费两者协同工作消费者消费的就是生产者刚刚生产的数据
void *Producer(void *args)
{MyBlockingQueueint *bq static_castMyBlockingQueueint*(args);while(true){// 生产者每隔一秒进行一次生产sleep(1);// 1. 生产商品(通过某种渠道获取数据)int num rand() % 10;// 2. 将物品push到阻塞队列bq-push(num);std::cout Producer 生产了一个数据: num std::endl std::endl;}pthread_exit((void*)0);
}
发现打印时会串行因为屏幕也是文件也是共享资源。 所以我们在Producer和Consumer的打印加上锁即可
// 类内再加一个get函数
pthread_mutex_t* getMutex() { return _mtx; }void *Producer(void *args)
{// ...pthread_mutex_lock(bq-getMutex());std::cout Producer 生产了一个数据: num std::endl; std::cout std::endl; pthread_mutex_unlock(bq-getMutex()); // ...
}void *Consumer(void *args)
{// ...pthread_mutex_lock(bq-getMutex());std::cout Consumer 消费了一个对象并获得结果: ret std::endl;std::cout std::endl;pthread_mutex_unlock(bq-getMutex());// ...
}
生产者生产一个数据就立马被消费者消费了 两种策略都符合预期证明当前的 生产者消费者模型 是可用的单生产单消费场景中。
三多生产多消费模型
在上面的 单生产者单消费者模型中存在一些细节问题
细节1只有当条件满足时才能进行 生产/消费
之前单纯使用一个 if 进行判断过于草率
理由如下
pthread_cond_wait 函数可能调用失败误唤醒、伪唤醒此时如果是 if 就会向后继续运行导致在条件不满足的时候进行了 生产/消费。在多线程场景中可能会使用 pthread_cond_broadcast 唤醒所有等待线程如果在只生产了一个数据的情况下唤醒所有线程会导致只有一个线程进行了合法操作其他线程都是非法操作了。 关于当前代码使用 if 判读在多线程环境中广播 pthread_cond_broadcast 的理解 这就好比食堂里有很多人等待出餐当阿姨仅做好一份饭后就通知所有同学过来取餐直接导致其他同学白跑一趟带入程序中直接影响就是 生产者/消费者 在 队列满/队列空 的情况下仍然进行了 数据生产/数据消费。 所以需要把条件判断改成 while直到条件满足后才向后运行 // 生产数据入队void push(const T indata){// ...// 容量满了就等待while(Isfull()){pthread_cond_wait(_pro_cond, _mtx);}// ...}// 消费数据出队void pop(T *outdata){// ...// 容量为空就等待while(Isempty()){pthread_cond_wait(_con_cond, _mtx);}// ...} 细节2生产者消费者模型的高效体现在 解耦
生产、消费 的过程是加锁的、串行化执行可能有的人无法 get 到 生产者消费者模型 的高效这是因为没有对 生产者消费者模型 进行一个全面的理解。
单纯的向队列中放数据、从队列中取数据本身效率就很高但 生产者从某种渠道获取数据、消费者获取数据后进行某种业务处理这是效率比较低的操作生产者消费者模型 做到了这两点。 1. 消费者在进行业务处理时生产者可以直接向队列中 push 数据 比如 消费者 在获取到数据后需要进行某种高强度的运算当然这个操作与 生产者 是没有任何关系的得益于 阻塞队列 作为缓冲区生产者 可以在 消费者 进行运算时 push 数据避免生产者等待消费者处理完才能继续生产的情况。。
这就好比你买了一桶泡面回家吃厂商并不需要关心你吃完没有直接正常向超市供货就行了 2. 生产者在进行数据生产时消费者可以直接向队列中 pop 数据 同上消费者 不需要关心 生产者 的状态只要 阻塞队列 中还有数据正常 pop 获取就行了避免空闲等待也就是说你在超市购物时无需关心工厂的生产情况因为这与你无关
一句话总结生产者不必关心消费者的消费情况消费者也不需要关心生产者的生产情况
而这就是生产者消费者模型高效的体现也是对模型的全面理解。 这有点像 冯·诺依曼体系结构 中的 内存扮演着中间商的角色使得 CPU 能和 外设 协同高效工作。 细节3阻塞队列中不止能放 int还能放对象 创建 Task.hpp 头文件 #pragma once#include stringtemplateclass T
class Task
{
public:Task(T x 0, T y 0, char op ):_x(x), _y(y), _op(op), _result(0), _err(0){}// 重载运算操作void operator()(){// 加减乘除switch(_op){case : _result _x _y;break;case -: _result _x - _y;break;case *: _result _x * _y;break;case /: _result _x / _y;break;case %:if(_y 0) _err -2;else _result _x % _y;break;default:_err -3;break;}}// 获取计算结果std::string getResult(){std::string ret std::to_string(_x) _op std::to_string(_y);if(_err){ret error ;// 判断是 / 错误还是 % 错误if(_err -1) ret [-1] \t / 0 引发了错误;if(_err -2) ret [-2] \t % 0 引发了错误;else ret [-3] \t 不合法的操作符只能为 [-*/%];}else{ret std::to_string(_result);}return ret;}
private:T _x;T _y;char _op; // 运算符T _result; // 结果int _err; // 错误标识
};得到这样一个任务类后就可以更改 cp.cc 中生产者、消费者线程的处理逻辑了
这里就简单修改随机获取两个数和一个运算符并计算出结果
#include iostream
#include unistd.h
#include time.h
#include BlockingQueue.hpp
#include Task.hppvoid *Producer(void *args)
{MyBlockingQueueTaskint *bq static_castMyBlockingQueueTaskint*(args);// 运算符集std::string opers -*/%;while(true){// 生产者每隔一秒进行一次生产sleep(1); // 1. 生产商品(通过某种渠道获取数据)// 随机获取两个数可以改为输入int x rand() % 100;int y rand() % 100;// 随机获取一种运算符char ops[] {, -, *, /, %};char op opers[rand() % opers.size()];// 2.将商品推送至阻塞队列中// 创建匿名对象并 push 到阻塞队列中bq-push(Taskint(x, y, op));std::cout Producer 生产了: x y op 构成的对象 std::endl;std::cout ---------------------------- std::endl; }pthread_exit((void*)0);
}void *Consumer(void *args)
{MyBlockingQueueTaskint *bq static_castMyBlockingQueueTaskint*(args);while(true){// 1.从阻塞队列中获取商品Taskint task;bq-pop(task);// 进行业务处理task();std::string ret task.getResult();// 2. 消费商品(结合某种具体业务进行处理)std::cout Consumer 消费了一个对象并获得结果: ret std::endl;std::cout std::endl;}pthread_exit((void*)0);
}int main()
{MyBlockingQueueTaskint *bq new MyBlockingQueueTaskint;// 随机数种子srand((size_t)time(nullptr));pthread_t pro, con;// 生产者消费者线程pthread_create(pro, nullptr, Producer, bq);pthread_create(con, nullptr, Consumer, bq);pthread_join(pro, nullptr);pthread_join(con, nullptr);delete bq;return 0;
}
为了避免打印到显示器时的格式错乱问题屏幕也是临界资源理论上也需要加锁保护这里让 生产者 每隔一秒生产一次进而控制 消费速度。 这里故意把 _y 指定为 0查看运算出错的情况 这里只是放了一个简单计算的任务我们实际还可以放入更复杂的任务比如 网络请求、SQL 查询、并行 IO。尤其是 IO使用 生产者消费者模型 可以大大提高效率包括后面的 多路转接也可以接入 生产者消费者模型 来提高效率。 现在可以尝试修改代码以适应 多生产多消费场景 了在原有代码的基础上直接多创建几个线程
int main()
{MyBlockingQueueTaskint *bq new MyBlockingQueueTaskint;// 随机数种子srand((size_t)time(nullptr));// 创建多个线程生产者、消费者pthread_t pro[2], con[3];for(int i 0; i 2; i)pthread_create(proi, nullptr, Producer, bq);for(int i 0; i 3; i)pthread_create(coni, nullptr, Consumer, bq);for(int i 0; i 2; i)pthread_join(pro[i], nullptr);for(int i 0; i 3; i)pthread_join(con[i], nullptr);delete bq;return 0;
}
运行结果如下可以看到确实有多个线程在运行运行结果也没有问题 为什么当前代码设计中不需要修改就能适用于 多生产多消费场景 呢
原因有两点
生产者、消费者都是在对同一个 _queue 操作用一把锁保护一个临界资源足够了。当前的 _queue 始终是被当作一个整体使用的无需再增加锁区分。
其实分别给 生产者和消费者 各配一把锁也是可以的但在当前代码设计中使用同一个 _queue 完全没有必要 三、POSIX信号量
互斥、同步 不只能通过 互斥锁、条件变量 实现还能通过 信号量 sem、互斥锁 实现出自 POSIX 标准。
「信号量」 的本质就是一个 计数器
申请到资源计数器 --P 操作释放完资源计数器 V 操作
信号量 的 PV 操作都是原子的假设将 信号量 的值设为 1用来表示 生产者消费者模型 中 阻塞队列 _queue 的使用情况
当 sem 值为 1 时线程可以进行 「生产 / 消费」sem --当 sem 值为 0 时线程无法进行 「生产 / 消费」只能阻塞等待
此时的 信号量 只有两种状态1、0可以实现类似 互斥锁 的效果即实现 线程互斥像这种只有两种状态的信号量称为 二元信号量。
信号量 不止可以用于 互斥它的主要目的是 描述临界资源中的资源数目比如我们可以把 阻塞队列 切割成 N 份初始化 信号量 的值为 N当某一份资源就绪时sem--资源被释放后sem如此一来可以像 条件变量 一样实现 同步
当 sem N 时阻塞队列已经空了消费者无法消费。当 sem 0 时阻塞队列已经满了生产者无法生产。
用来实现 互斥、同步 的信号量称为 多元信号量
综上所述在使用 多元信号量 访问资源时需要先申请 信号量只有申请成功了才能进行资源访问否则会进入阻塞等待即当前资源不可用 在实现 互斥、同步 时该如何选择结合业务场景进行分析如果待操作的共享资源是一个整体比较适合使用 互斥锁条件变量 的方案但如果共享资源是多份资源使用 信号量 就比较方便。 其实 信号量 的工作机制类似于 买电影票是一种 预订机制只要你买到票了即使你晚点到达电影院你的位置也始终可用买到票的本质是将对应的座位进行了预订。
对于 信号量 的第一层理解申请信号量实际是一种资源预订机制
如果将 信号量 实际带入我们之前写的 生产者消费者模型 代码中是不需要进行资源条件判断的因为 信号量 本身就已经是资源的计数器了。
对于 信号量 的第二层理解使用信号量时就已经把资源条件判断转化成了信号量的申请行为
比如可以直接这样写
// 生产数据入队
void Push(const T inData)
{// 申请信号量 P操作// ..._queue.push(inData);// ...// 释放信号量 V操作
}一信号量相关操作
有了之前 互斥锁、条件变量 的使用基础信号量 的接口学习是释放简单的依旧是只有四个接口初始化、销毁、申请、释放 初始化信号量 #include semaphore.hint sem_init(sem_t *sem, int pshared, unsigned int value);
参数1需要初始化的信号量sem_t 实际就是一个联合体里面包含了一个 char 数组以及一个 long int 成员
typedef union
{char __size[__SIZEOF_SEM_T];long int __align;
} sem_t;参数2表示当前信号量的共享状态传递 0 表示线程间共享传递 非0 表示进程间共享
参数3信号量的初始值可以设置为双元或多元信号量
返回值初始化成功返回 0失败返回 -1并设置错误码 销毁信号量 int sem_destroy(sem_t *sem);参数待销毁的信号量
返回值成功 0失败 -1 并设置错误码 申请信号量等待信号量主要使用 sem_wait int sem_wait(sem_t *sem);int sem_trywait(sem_t *sem);int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);参数表示从哪个信号量中申请
返回值成功 0失败 -1 并设置错误码
其他两种申请方式分别是尝试申请如果没有申请到资源就会放弃申请每隔一段时间进行申请即 timeout。 释放信号量发布信号量 int sem_post(sem_t *sem);参数将资源释放到哪个信号量中
返回值成功 0失败 -1 并设置错误码
这批接口参数、返回值含义基本都相同非常容易上手接下来直接用信号量实现 生产者消费者模型。
五、基于环形队列实现生产者消费者模型
一环形队列
生产者消费者模型 中的交易场所是可更换的不仅可以使用 阻塞队列还可以使用 环形队列所谓的 环形队列 并非 队列而是用数组模拟实现的 “队列” 并且它的 判空、判满 比较特殊。
如何让 环形队列 “转” 起来
可以通过取模的方式可以重复获取一段区间值确定下标
环形队列 如何判断当前为满、为空
策略一多开一个空间head、tail 位于同一块空间中时表示当前队列为空在进行插入、获取数据时都是对下一块空间中的数据进行操作因为多开了一块空间当待生产的数据落在 head 指向的空间时就表示已经满了
策略二参考阻塞队列搞一个计数器当计数器的值为 0 时表示当前为空当计数器的值为容量时表示队列为满
这两种策略都可以确保 环形队列 正确判空和判满至于这里肯定是选择策略二因为 信号量 本身就是一个天然的计数器。
在 环形队列 中生产者 和 消费者 关心的资源不一样生产者只关心是否有空间放数据消费者只关心是否能从空间中取到数据。
除非两者相遇其他情况下生产者、消费者可以并发运行同时访问环形队列。
两者错位时正常进行生产消费就好了但两者相遇时需要特殊处理也就是处理 空、满 两种情况这就是 环形队列 的运转模式。
这里可以引入一个例子来辅助理解 环形队列 的运转模式
假设存在一个大圆桌上面摆放了一圈空盘子可以往上面放苹果也可以取上面的苹果。张三和李四打算展开一场 苹果追逐赛张三作为 追逐方目标是移动并获取盘子中的苹果李四作为 被追逐方目标是往盘子中放苹果并向下一个空盘子移动
注意这里的移动指顺时针移动一次只能走一格不能跳格这是游戏核心规则。
游戏基本规则
当两者相遇且圆桌中没有苹果时被追逐方李四先跑对方张三阻塞当两者相遇且圆桌中全是苹果时追逐方张三先跑对方李四阻塞被追逐方李四不能套圈追逐方张三同时追逐方张三也不能超过被追逐方李四
现在游戏开始张三和李四处于同一块空间中起点此时两人处于一种特殊情况中不能同时进行 苹果拾取/苹果放置由于是刚开始作为 被追逐方 的李四理应先走否则两者就都阻塞了张三追上李四时的情况与刚开始的情况一致。
所以可以得出结论环形队列为空时生产者需要先生产数据消费者阻塞
李四先跑边跑边放苹果此时因为张三还没有追上李四所以张三也是边跑边拾取苹果两者展开了激烈的追逐赛高效率
在追逐过程中张三李四都能同时对圆桌中的格子进行操作这是非常高效的环形队列不为空、不为满时生产者、消费者可以同时进行并发操作
游戏进行到白热化阶段张三一不注意摔了一跤导致拾取苹果的速度不断减慢李四见状火力全开不断放置苹果很快张三就被李四追上了此时场上已经摆满了苹果规定一个盘子只能放置一个苹果李四无法在放置苹果只能阻塞等待张三进行苹果拾取
场上摆满苹果的情况对应着 环形队列为满的情况生产者不能再生产消费者需要进行消费
ok游戏到这里就可以结束了因为已经足够总结出 环形队列 的运作模式了
被追逐方李四 - 生产者追逐方张三 - 消费者大圆桌 - 环形队列空盘 - 无数据可生产苹果 - 有数据可消费
运作模式
环形队列为空时消费者阻塞只能由生产者进行生产生产完商品后消费者可以消费商品环形队列为满时生产者阻塞只能由消费者进行消费消费完商品后生产者可以生产商品其他情况生产者、消费者并发运行各干各的事互不影响
张三和李四也就只能在 满、空 时相遇了 将 环形队列 的运行模式带入 生产者消费者模型
可以使用 信号量 标识资源的使用情况但生产者和消费者关注的资源并不相同所以需要使用两个 信号量 来进行操作
生产者信号量标识当前有多少可用空间消费者信号量标识当前有多少数据
如果说搞两个 条件变量 是 阻塞队列 的精髓那么搞两个 信号量 就是 环形队列 的精髓显然刚开始的时候生产者信号量初始值为环形队列的大小消费者信号量初始值为 0
无论是生产者还是消费者只有申请到自己的 信号量 资源后才进行 生产 / 消费
比如上图中的 pro_sem 就表示 生产者还可以进行 3 次生产con_sem 表示 消费者还可以消费 5 次。
生产者、消费者对于 信号量 的申请可以这样理解
// 生产者
void Producer()
{// 申请信号量空位 - 1sem_wait(pro_sem);// 生产商品// ...// 释放信号量商品 1sem_post(con_sem);
}// 消费者
void Consumer()
{// 申请信号量商品 - 1sem_wait(con_sem);// 消费商品// ...// 释放信号量空位 1sem_post(pro_sem);
}生产者和消费者指向同一个位置时保证线程安全其他情况保证并发度。
二单生产单消费模型 创建 RingQueue.hpp 头文件 #pragma once#include vector
#include semaphore.h#define RQ_CAP 5templateclass T
class MyRingQueue
{
private:void P(sem_t *sem){sem_wait(sem);}void V(sem_t *sem){sem_post(sem);}
public:MyRingQueue(size_t cap RQ_CAP):_ring(cap), _cap(cap), _con_step(0), _pro_step(0){// 初始化信号量sem_init(_pro_sem, 0, _cap);sem_init(_con_sem, 0, 0);}~MyRingQueue(){// 销毁信号量sem_destroy(_pro_sem);sem_destroy(_con_sem);}// 生产商品void push(const T indata){// 申请信号量P(_pro_sem);// 生产_ring[_pro_step] indata;_pro_step % _cap;// 释放信号量V(_con_sem);}// 消费商品void pop(T *outdata){// 申请信号量P(_con_sem);// 消费*outdata _ring[_con_step];_con_step % _cap;// 释放信号量V(_pro_sem);}pthread_mutex_t* getMutex() { return _mtx;}
private:std::vectorT _ring; // 数组模拟环形队列size_t _cap; // 队列容量sem_t _pro_sem; // 生产者信号量sem_t _con_sem; // 消费者信号量size_t _pro_step; // 生产者下标size_t _con_step; // 消费者下标pthread_mutex_t _mtx;
};
细节
生产者的信号量初始值为 RQ_CAP消费者的信号量初始值为 0生产者、消费者的起始下标都为 0
在没有 互斥锁 的情况下除去打印时需要互斥锁是如何 确保生产者与消费者间的互斥关系的 通过两个信号量当两个信号量都不为 0 时双方可以并发操作这是 环形队列 最大的特点当 生产者信号量为 0 时生产者陷入阻塞等待等待消费者消费同理当 消费者信号量为 0 时消费者也会阻塞住在这里阻塞就是 互斥 的体现。当对方完成 生产 / 消费 后自己会解除阻塞状态而这就是 同步 创建 Main.cc 源文件 #include iostream
#include unistd.h
#include time.h
#include pthread.h
#include RingQueue.hppusing namespace std;void *Producer(void *args)
{MyRingQueueint *rq static_castMyRingQueueint*(args);while(true){// 每隔一秒生产sleep(1);// 1. 生产商品(通过某种渠道获取数据)int num rand() % 10 1; // 1-10的数// 2. 将商品push至队列中rq-push(num);pthread_mutex_lock(rq-getMutex());std::cout Producer 生产了一个数据: num std::endl;std::cout ------------------------ std::endl;pthread_mutex_unlock(rq-getMutex());}pthread_exit((void*)0);
}void *Consumer(void *args)
{MyRingQueueint *rq static_castMyRingQueueint*(args);while(true){// 每隔一秒消费// sleep(1);// 1. 从队列中获取数据int num;rq-pop(num);// 2. 消费商品(结合某种具体业务进行处理)pthread_mutex_lock(rq-getMutex());std::cout Consumer 消费了一个数据: num std::endl;std::cout ------------------------ std::endl;pthread_mutex_unlock(rq-getMutex());}
}int main()
{// 种子srand((size_t)time(nullptr));MyRingQueueint *rq new MyRingQueueint();// 创建两个线程 生产者/消费者pthread_t pro, con;pthread_create(pro, nullptr, Producer, rq);pthread_create(con, nullptr, Consumer, rq);pthread_join(pro, nullptr);pthread_join(con, nullptr);delete rq;return 0;
}
编译并运行程序。为了使结果更加清晰分别展示 生产者每隔一秒生产一次、消费者每隔一秒消费一次的结果。
生产者每隔一秒生产一次的场景 消费者每隔一秒消费一次 的场景 这里的运行结果与 阻塞队列 那边的一模一样证明当前的 生产者消费者模型 没有问题单生产单消费场景中 。
注如果想要提高并发度可以增大环形队列的容量。
三多生产多消费模型
环形队列 中可不止能放整数还能放 Task 任务我们可以把之前的 Task.hpp 引入重新测试 环形队列 创建 cp.cc在之前的基础上略微修改 #include iostream
#include string
#include unistd.h
#include time.h
#include pthread.h
#include RingQueue.hppusing namespace std;#include Task.hppvoid *Producer(void *args)
{MyRingQueueTaskint *rq static_castMyRingQueueTaskint*(args);// 运算符集string opers -*/%;while(true){// 生产者每隔一秒进行一次生产sleep(1); // 1. 生产商品(通过某种渠道获取数据)// 随机获取两个数(可以改为输入)int x rand() % 100;int y rand() % 100;// 随机获取一种运算符char ops[] {, -, *, /, %};char op opers[rand() % opers.size()];// 生产需要时间usleep(10000);// 2. 消费商品(结合某种具体业务进行处理)// 创建匿名对象并 push 入阻塞队列中rq-push(Taskint(x, y, op));pthread_mutex_lock(rq-getMutex());std::cout Producer 生产了: x y op 构成的对象 std::endl;std::cout ------------------------ std::endl;pthread_mutex_unlock(rq-getMutex());}pthread_exit((void*)0);
}void *Consumer(void *args)
{MyRingQueueTaskint *rq static_castMyRingQueueTaskint*(args);while(true){// 1. 从阻塞队列获取数据Taskint task;rq-pop(task);// 进行业务处理task();// 消费商品也需时间string ret task.getResult();// 2. 消费商品(结合某种具体业务进行处理)pthread_mutex_lock(rq-getMutex());std::cout Consumer 消费了一个对象并获得结果 ret std::endl;std::cout ------------------------ std::endl;pthread_mutex_unlock(rq-getMutex()); }pthread_exit((void*)0);
}int main()
{// 种子srand((size_t)time(nullptr));// MyRingQueueint *rq new MyRingQueueint();MyRingQueueTaskint *rq new MyRingQueueTaskint();// 创建两个线程 生产者/消费者pthread_t pro, con;pthread_create(pro, nullptr, Producer, rq);pthread_create(con, nullptr, Consumer, rq);pthread_join(pro, nullptr);pthread_join(con, nullptr);delete rq;return 0;
} 接下来可以实现 多生产多消费场景 中的 CP 模型了多生产多消费无非就是增加了 消费者与消费者、生产者与生产者 间的 互斥 关系加锁就行了现在问题是加几把锁
答案是 两把因为当前的 生产者和消费者 关注的资源不一样一个关注剩余空间另一个关注是否有商品一把锁是无法锁住两份不同资源的所以需要给 生产者、消费者 各配一把锁。
#pragma once#include vector
#include semaphore.h#define RQ_CAP 5templateclass T
class MyRingQueue
{
private:void P(sem_t *sem){sem_wait(sem);}void V(sem_t *sem){sem_post(sem);}void lock(pthread_mutex_t *lock){pthread_mutex_lock(lock);}void unlock(pthread_mutex_t *lock){pthread_mutex_unlock(lock);}
public:MyRingQueue(size_t cap RQ_CAP):_ring(cap), _cap(cap), _con_step(0), _pro_step(0){// 初始化信号量sem_init(_pro_sem, 0, _cap);sem_init(_con_sem, 0, 0);// 初始化互斥锁pthread_mutex_init(_pro_mtx, nullptr);pthread_mutex_init(_con_mtx, nullptr);}~MyRingQueue(){// 销毁信号量sem_destroy(_pro_sem);sem_destroy(_con_sem);// 销毁互斥锁pthread_mutex_destroy(_pro_mtx);pthread_mutex_destroy(_con_mtx);}// 生产商品void push(const T indata){// 申请信号量P(_pro_sem);lock(_pro_mtx);// 生产_ring[_pro_step] indata;_pro_step % _cap;unlock(_pro_mtx);// 释放信号量V(_con_sem);}// 消费商品void pop(T *outdata){// 申请信号量P(_con_sem);lock(_con_mtx);// 消费*outdata _ring[_con_step];_con_step % _cap;unlock(_con_mtx);// 释放信号量V(_pro_sem);}pthread_mutex_t* getMutex() { return _mtx;}
private:std::vectorT _ring; // 数组模拟环形队列size_t _cap; // 队列容量sem_t _pro_sem; // 生产者信号量sem_t _con_sem; // 消费者信号量size_t _pro_step; // 生产者下标size_t _con_step; // 消费者下标pthread_mutex_t _mtx;pthread_mutex_t _pro_mtx;// 生产者锁pthread_mutex_t _con_mtx;// 消费者锁
}; 细节 加锁行为放在信号量申请成功之后可以提高并发度。 在 环形队列 中可以在申请 信号量 前进行加锁也可以在申请 信号量 后进行加锁这里比较推荐的是 在申请 信号量 后加锁。 如何理解 这就好比一群学生在进行座位编排可以先放一个学生进入教室再给他确定座位也可以先给每个人确定好自己的座位一人一座然后排队进入教室对号入座即可。先申请信号量 相当于先确定座位避免进入教室加锁后还得选座位。加锁意味着串行化一定会降低效率但因为信号量的操作是原子的可以确保线程安全也就不需要加锁保护也就是可以并发申请信号量再串行化访问临界资源。 修改cp.cc int main()
{// 种子srand((size_t)time(nullptr));// MyRingQueueint *rq new MyRingQueueint();MyRingQueueTaskint *rq new MyRingQueueTaskint();// 多生产者多消费者pthread_t pro[4], con[3];for(int i 0; i 4; i)pthread_create(proi, nullptr, Producer, rq);for(int i 0; i 3; i)pthread_create(coni, nullptr, Consumer, rq);for(int i 0; i 4; i)pthread_join(pro[i], nullptr);for(int i 0; i 3; i)pthread_join(con[i], nullptr);delete rq;return 0;
} 可以看到一批线程正在运行 阻塞队列 效率已经够高了那么创造 环形队列 的意义在哪呢 首先要明白 生产者消费者模型 高效的地方从来都不是往缓冲区中放数据、从缓冲区中拿数据。
对缓冲区的操作对于计算机说就是小意思需要关注的点在于 获取数据和消费数据这是比较耗费时间的阻塞队列至多支持获取一次数据获取或一次数据消费在代码中的具体体现就是 所有线程都在使用一把锁并且每次只能 push、pop 一个数据而环形队列就不一样了生产者、消费者可以通过条件变量知晓数据获取、数据消费次数并且当生产者和消费者都是单线程时使用环形队列可以避免加锁提高并发性能因此效率十分高。
环形队列 中允许多个生产者线程一起进行数据获取也允许多个消费者线程一起进行数据消费简单任务处理感知不明显但复杂任务就不一样了这就有点像同时下载多份资源是可以提高效率的。
注意 一起操作并非同时操作任务开始时间有先后但都是在进行处理的。 环形队列 一定优于 阻塞队列 吗 答案是否定的存在即合理如果 环形队列 能完全碾压 阻塞队列那么早就不用学习 阻塞队列 了这两种都属于 生产者消费者模型 常见的交易场所有着各自的适用场景 。
特征阻塞队列互斥锁实现环形队列信号量实现)内部同步机制使用互斥锁或类似的锁机制来实现线程安全使用信号量来实现线程安全阻塞操作支持阻塞操作当队列为空或已满是线程可以等待也支持阻塞操作当队列为空或已满时线程可以等待数据覆盖通常不会覆盖已有元素新元素添加时需要等待队列有空间基于不同场景设计当队列为满时通常时会丢弃旧数据或者覆盖旧数据实现复杂度实现可能较为复杂需要处理锁的获取和释放实现相对简单需要管理信号量线程安全通过锁来保证线程安全容易引入死锁问题通过信号量来保证线程安全不易引入死锁问题添加和删除操作时间O(1) 在队列未满或非空时O(1) (常数时间除非队列已满或为空)应用场景数据的分发和收集多线程数据传递任务调度广播通知等循环缓存处理数据轮询循环任务调度等