做研学的企业网站,崂山区建设局网站,玉石电商网站建设方案,移动互联网的终点和归宿是什么线程同步和生消模型 前言正式开始再次用黄牛抢票来讲解线程同步的思想通过条件变量来实现线程同步条件变量接口介绍初始化和销毁pthread_cond_waitsignal和broadcast 生产消费者模型三种关系用基本工程师思维再次理解基于生产消费者模型的阻塞队列版本一版本二多生多消 利用RAI… 线程同步和生消模型 前言正式开始再次用黄牛抢票来讲解线程同步的思想通过条件变量来实现线程同步条件变量接口介绍初始化和销毁pthread_cond_waitsignal和broadcast 生产消费者模型三种关系用基本工程师思维再次理解基于生产消费者模型的阻塞队列版本一版本二多生多消 利用RAII来对锁进行优化 前言
本篇线程同步的内容是完全基于线程互斥来讲的如果屏幕前的你对于线程互斥还不是很了解的话可以看看我上一篇博客【Linux】详解线程第二篇——用黄牛抢陈奕迅演唱会门票的例子来讲解【 线程互斥与锁 】
正式开始
上篇线程互斥中重点讲了互斥锁虽然解决了多线程并发导致的临界资源不安全的问题但是还存在一个比较重要的问题访问临界资源合理性的问题。
再次用黄牛抢票来讲解线程同步的思想
再举一下我上篇博客中黄牛抢票的例子。
上一篇博客的例子中只有黄牛和票这两个元素对应的就是线程和临界资源既然互斥锁已经讲了那么就能多一个锁这个元素了也就可以理解为多了一个售票员。所以这里一共三个主要元素黄牛线程、票临界资源、售票员锁。
前面博客中我未加usleep对黄牛买票进行限制的例子中出现了一个黄牛将所有票抢完的例子也就是说整个流程中只有一个线程对临界资源进行了访问其他的线程虽然想要访问临界资源但是都没有访问到这种情况不能说有错误只能说设计的不合理会造成其他线程的饥饿问题一个人把所有的活全包了其他人挣不到钱就没饭吃了虽然这个例子有点极端。但是确实是一个问题。
还有一个问题假如此刻票被抢完了但是票卖完后隔一段时间还会再次补票但是无法确定票补充的时间会在随机时刻进行补发临界资源未准备就绪但有可能随意时刻准备好。这样的话所有的黄牛都想抢票但是都不知道什么时候会补票于是所有的黄牛无时无刻都在问售票员票是否补发这样的话就会很浪费所有黄牛和售票员的时间也即浪费所有线程向锁申请资源的时间导致运行效率下降。虽然没做错但是不合理。
上面这两个问题都是访问临界资源的合理性的问题。而引入线程同步就是为了解决这个问题。
想一想我们现实生活中是怎样售票的当票卖完后我们不需要一直询问售票员是否有票只要等待通知即可比如说12306补到票了会通知你当你进行补票的时候是会排队的也就是为什么你点击补票会显示当前人数是多还是中等还是少的信息。如果你补的比较早那么你排队就会靠前一点同理补得比较晚就会排的靠后一点。这里最重要的一点就是排队使得整个流程有了一定的顺序。也就是让所有的线程按照一定的顺序去访问临界资源。这里排队可以解决一部分问题也就是不断询问是否有票的问题。
那么还有一个黄牛将所有的票抢完的问题。再拿12306来说我们每个人只能买一张票如果想买多张就要多给一个乘员信息比如说你给你自己买了票还想给你朋友买票的话得要你朋友的身份证号码等信息。也就票和人是一对一的。那这里搞得极端一点规定每次让黄牛只能买一张票买完票后禁止继续买票再加上上面的队列如果卖完票的黄牛还想再买票就必须去队尾重新排队购买。
上面其实已经把线程同步的思想讲解出来了不过都是以黄牛的角度来说的这里改口成线程来总结一下
所有的线程想要访问临界资源时都必须排队。访问完临界资源的线程若想要继续对临界资源进行访问就必须跑到队尾等待其前面的线程访问完才能轮到当前线程。
这样让访问临界资源的线程按照一定的顺序进行临界资源的访问就是线程同步最重要的思想。
那么如何实现线程同步呢
通过条件变量来实现线程同步
我们在申请临界资源前先要做临界资源是否存在的检测而检测也是需要访问临界资源的那么对临界资源的检测也是一定需要在加锁和解锁之间的。
我把前面博客中的的例子改一改加上补票机制
#include iostream
using namespace std;#include pthread.h
#include unistd.h
#include ctime#include string// 线程数据包含线程名字也就是黄牛名字还有线程对应的互斥锁
class ThreadData
{
public:ThreadData(const string name, pthread_mutex_t* pmtx):_name(name),_pmtx(pmtx){}public:string _name;pthread_mutex_t* _pmtx;
};// 黄牛人数
#define THREAD_NUM 5// 陈奕迅演唱会的票数tickets
int tickets 1000;// 黄牛抢票执行的方法
void* getTicket(void* arg)
{ThreadData* ptd (ThreadData*)arg;// 每个黄牛疯狂抢ticketswhile(1){pthread_mutex_lock(ptd-_pmtx);if(tickets 0){usleep(rand() % 2000);tickets--;printf(%s抢到票了, 此时ticket num ::%d\n, ptd-_name.c_str(), tickets);// 当对临界资源访问完后就解锁pthread_mutex_unlock(ptd-_pmtx);}else{// 当对临界资源访问完后就解锁这里是当tickets 0的情况也要解锁pthread_mutex_unlock(ptd-_pmtx);printf(%s没抢到票, 票抢空了\n, ptd-_name.c_str());// 这里不再让黄牛抢不到票就退出而是继续检查是否有票}// 抢到或没抢到票都执行一下后续动作这里直接用usleep替代usleep(rand() % 5000);}// 记得要释放掉线程数据不然内存泄漏delete ptd;return nullptr;
}int main()
{// 局部锁pthread_mutex_t mtx;// 默认给空就行pthread_mutex_init(mtx, nullptr);// 种一颗随机数种子srand((unsigned int)time(nullptr) ^ getpid() ^ 0x24233342);// 假设此时有三个黄牛进行抢票pthread_t tid[THREAD_NUM];for(int i 0; i THREAD_NUM; i){string tmp;tmp to_string(i 1) 号黄牛;ThreadData* ptd new ThreadData(tmp, mtx);// 每个黄牛去抢票pthread_create(tid i, nullptr, getTicket, (void*)ptd);}// 补票机制while(1){if(tickets 0){cout 票抢光了准备补票中... endl;sleep(rand() % 10);tickets 500;break;}sleep(1);}// 等待每个黄牛抢完票后退出for(int i 0; i THREAD_NUM; i){pthread_join(tid[i], nullptr);}pthread_mutex_destroy(mtx);return 0;
}运行 我截了三张图一张是第一次票抢光之后准备补票 第二张是票补上了之后
第三张是补票抢完之后
这里可以发现当票抢光之后5个线程一直在查询当前是否有票正如开始所说的那样。
这里的代码会在 if(tickets 0) 前进行加锁因为tickets 0就访问了临界资源没票之后会补票但是线程不会退出而是一直在查询是否补票这样效率就很低一直循环着申请锁和解锁虽然没错但是不合理。故这种常规方式检测条件也就注定了其必须频繁的申请加锁和解锁。
但是我们可以改一改
不要让线程再频繁的去自己检测临界资源是否准备就绪如果未准备就绪就让当前线程等待也就是进入S状态。当临界资源就绪的时候再唤醒想要访问临界资源的线程。
这样效率就会大大提高。
想要实现这个功能的话可以通过条件变量来实现。
条件变量接口介绍
条件conditionpthread库中用起来条件变量的接口都是中间加上cond。
初始化和销毁
定义一个条件变量和定义一个锁一样也分全局和局部初始化也是全局可以直接用宏局部要用init
全局初始化的宏
局部初始化的时候第一个参数就是条件变量的地址第二个参数给空让它以默认方式初始化就行。 还是以pthread开头的接口返回值绝大部分都是正确返回0错误返回错误码。
destroy就是销毁条件变量没啥好讲的。
pthread_cond_wait 就是传一个条件变量指针然后一个锁为啥要传一个锁后面再说。只要调用了这个函数的线程就会进入阻塞状态也就是从运行队列进入了等待队列中。这里就相当于是黄牛开始排队了而且注意这里的队伍是在等待cond这个条件准备就绪也就是说这里的队伍是专门为cond开辟的一个队不同于普通的等待队列。
这里就相当于是前面的if判断资源是否存在了但是是直接让线程进入等待队列中。
还有一个timewait多了一个参数前两个参数和wait一样第三个参数是一个时间让线程等待时当第三个参数一到会自动醒来。
signal和broadcast 上面的pthread_cond_wait是让线程进入到与cond相关的等待队列中当signal被调用时就会有一个线程出队就相当于是等待资源准备就绪了此时就会唤醒一个线程。不过这里前提是signal的参数中的cond要和wait的参数中的cond指向是一样的。不匹配就无法唤醒wait对应cond的队列中的线程。broadcast广播的意思当调用这个函数时会将cond对应等待队列中的所有线程都唤醒此时所有的线程会按照顺序出队。
下面我写一个全新的例子来演示一下这里的cond用法
// 线程个数
const static int THREAD_NUM 4;// 不同线程的执行方法
/**********************************************************************************/
void Thread_1_Func(const string name)
{cout name is doing 加密工作 endl;
}void Thread_2_Func(const string name)
{cout name is doing 持久化工作 endl;
}void Thread_3_Func(const string name)
{cout name is doing 查询工作 endl;
}void Thread_4_Func(const string name)
{cout name is doing 管理工作 endl;
}
/**********************************************************************************/// 函数指针指向线程所要执行的函数
typedef void(*pfunc)(const string name);// 每个线程最有用的数据
class ThreadData
{
public:ThreadData(const string name, pthread_mutex_t* pmtx, pthread_cond_t* pcond, pfunc pf):_name(name),_pmtx(pmtx),_pcond(pcond),_pf(pf){}public:string _name; // 线程名pthread_mutex_t* _pmtx; // 锁pthread_cond_t* _pcond; // 条件变量pfunc _pf; // 回调函数
};// 判断线程是否退出
static bool quit false;// pthread_create的回调方法
void* ThreadRoutine(void* arg)
{ThreadData* ptd (ThreadData*)arg;while(!quit){// 访问临界资源前上锁pthread_mutex_lock(ptd-_pmtx);// 相当于if判断此时线程直接阻塞pthread_cond_wait(ptd-_pcond, ptd-_pmtx);if(!quit){// 去调用线程对应的方法ptd-_pf(ptd-_name);}else{// 退出cout ptd-_name quit endl; }// 访问完后解锁pthread_mutex_unlock(ptd-_pmtx);}// 记得释放传来的对象不然内存泄漏了delete ptd;ptd nullptr;return nullptr;
}int main()
{// 局部条件变量pthread_cond_t cond;pthread_cond_init(cond, nullptr);// 局部锁pthread_mutex_t mtx;pthread_mutex_init(mtx, nullptr);// 各个函数执行的方法pfunc funcs[THREAD_NUM] { Thread_1_Func, Thread_2_Func, Thread_3_Func, Thread_4_Func};// 多个线程pthread_t tids[THREAD_NUM];// 创建THREAD_NUM个线程for(int i 0; i THREAD_NUM; i){ThreadData* tmp new ThreadData(to_string(i 1) 号线程, mtx, cond, funcs[i]);pthread_create(tids i, nullptr, ThreadRoutine, (void*)tmp);}cout prepare to do the jobs endl;sleep(1);cout start doing jobs endl;// 发signal让等待队列中的线程执行其方法int count 0;while(count ! 8){pthread_cond_signal(cond);count;sleep(1);}quit true;// quit改为true时其他线程已经在等待队列中了//得让各个线程都执行其一次方法才会循环上去判断quit改变了pthread_cond_broadcast(cond);cout jobs done endl;for(int i 0; i THREAD_NUM; i){pthread_join(tids[i], nullptr);}return 0;
}上面的代码中各线程执行pthread_cond_wait就会进入cond对应的等待队列中当main线程执行一次pthread_cond_signal就会唤醒一个进程。
运行起来
这里用的是signal来唤醒各个线程的还可以用broadcast 这样一次就会唤醒一批线程。
运行
生产消费者模型
带着两个问题来讲一讲生产消费者模型
条件满足的时候我们再唤醒指定的线程但是我们怎么知道条件是否满足互斥所在线程同步中的意义以及为何将pthread_cond_wait_wait放在加锁和解锁之间
学习生产消费者模型可以帮助我们解决第一个问题。 在编写生产消费者模型的某种场景的代码时可以帮助我们理解互斥锁的意义。
下面来举一个生活中的生产消费者模型的例子。
现在有一个超市屏幕前的你是一名消费者。
超市中的商品并不是由超市直接提供的而是由供应商提供的超市本质上是一个商品缓冲区。如图 上图中有【两种角色消费者和生产者。一个交易场所。】
我们买东西时不会去供应商处买而是去超市这样能够提高效率。假如说消费者想要买一包方便面下面都以方便面来说如果直接去供应商那里买的话供应商还要开机器给你做一包这样对于供应商来说成本太高了一包就得开一次机器电费都比那一包方便面贵没这个必要。直接一次性生产一大堆然后提供给超市这样想要买方便面的消费者就去超市。这样就不会浪费那么多的人力和物力去一包一包的生产方便面。
有了超市供货商只负责生产不用为消费者准备东西。消费者只需要去超市不用再跑到供货商处这样逻辑上就实现了解耦。
三种关系
来说说各角色间的关系。
生产者和生产者
假设图中的供应商都生产的是同种资源比如说都生产的是方便面不过品牌不一样这家是康师傅的这家是今麦郎的……那么各个生产者之间就是竞争关系用计算机术语来说的话生产者和生产者之间是互斥关系。
消费者和消费者
如果说疫情前快要封城了所有的居民都要去超市抢购物资此时超市的方便面已经快被抢空了极端点就剩一包了那么有很多的居民都想要这一包方便面居民也就是消费者此时的消费者和消费者之间就是竞争关系都去竞争那一包方便面还是用计算机术语来说。在资源很少而需要同种资源的消费者很多的情况下消费者之间是存在互斥关系的。 上面这两种互斥还可以这样来解释 超市的空间是一个共享资源比如说某一货架不能让供应商全部都抢着去摆放资源这样物品会放乱同种不同类的方便面都摆放在一起这样会造成混乱。也不能让顾客去某一空间抢着争夺资源这样可能会导致消费者本来想拿康师傅但拿成了今麦郎。所以必须要保证生产者生产的过程是安全的消费者消费的过程也要是安全的。 故生产者之间是互斥关系消费者之间也是互斥关系。 生产者和消费者
生产者生产时不能让消费者消费不然数据未传输完毕时部分数据已经被消费者拿走且消费者不会再次消费这样就会导致生产者和消费者数据不一致问题。比如说一包方便面只造了不到一半就被消费者拿走此时生产者仍然在生产而且从开始到结束不会停止知道生产完整一包方便面才停止所以生产者所知道的信息是其会生产一整包方便面但是消费者把一半拿走了消费者得到的信息是其只拿到了半包方便面此时就可以说二者的数据不一致。所以说生产者和消费者之间存在互斥关系。
再比如说过年期间超市生意非常好所有的居民都忙着进年货当超市中的某一种商品被抢空时就要通知对应的生产者去生产如果未通知就会出现本篇刚开始将的抢票问题会不断有消费者询问是否有商品被补充。同样当超市商品已摆放满了也要通知生产者停止生产。故当缺资源的时候通知生产者生产当资源补充上来时就通知消费者消费当资源盈余时就通知生产者停止生产同时刺激消费者消费。即同步。所以生产者和消费者之间也存在同步关系。 所以说生产者和消费者之间存在同步和互斥两种关系。 那么我们写代码的时候怎么编写一个生产消费者模型呢 只需要掌握住三个原则即可。
一个交易场所。两个角色——生产者和消费者。三种关系生生(互斥)、消消(互斥)、生消(同步和互斥)。
不过第三点有个特例当只有一个生产者 和 一个消费者时就只需要维护生消这一种关系这一点后面会再谈这里先暂不考虑。
通过锁和条件变量来体现出三种关系。
用基本工程师思维再次理解
代码中我们要用线程来体现出生产者和消费者也就是要给线程进行角色划分。
超市是用某种数据结构来表示的缓冲区。
商品即某些数据。
超市里是否有新增货物生产者最清楚。因为生产者一成功生产就会新增货物。 超市里剩余多少空间让生产者生产消费者最清楚。因为消费者每次消费都会新增空余空间。
所以这里就可以解决最初说的第一个问题条件满足的时候我们再唤醒指定的线程但是我们怎么知道条件是否满足
当生产者生产商品后就可立马通知消费者来消费消费者将商品拿走后就可通知生产者去生产。故可以让生产者线程和消费者线程互相同步完成对应的生产消费者模型。这句话中的通知就是唤醒。加粗的字样就是对这个问题的解答。
再来看一个图
这里要强调一点生产和消费的过程不仅仅是从生产者生产到仓库再让消费者拿走。重要的点不在这而是生产者从获取到数据开始生产和消费者拿掉数据开始处理的两个过程中间传递数据的过程耗时是非常短的至于为什么等会写代码的时候就知道了。
我前面讲进程间通信的时候说过进程间通信的本质是让两个进程看到同一块空间。 校正一下进程间通信的前提是让两个进程看到同一块空间进程间通信的本质就是生产消费者模型。比如说管道自带同步和互斥的属性正常情况下一端写一端读当管道为空的时候读端会阻塞当管道满的时候写端会阻塞。这里就和生产消费者模型很像写端读入数据后就让读端去读读端读好了数据后就让写端去写。当写端关闭时读端就相当于读到了文件末尾read会返回零当读端关闭的时候写端直接出错进程就退出了。 【注】如果我这里讲的内容你不太会的话可以看看我这篇博客【Linux】进程间通信匿名管道、命名管道、共享内存等包含代码示例
基于生产消费者模型的阻塞队列
下面就来写写生产消费者模型的代码。
我会写两个版本第一个版本细节比较少第二个版本会基于第一个版本稍微进行一点优化。
版本一
先来说说大致思路 一个交易场所前面超市的那个例子说了超市就是用某个数据结构表示的缓冲区这里我就以队列来表示这个缓冲区不过我不会再自己手搓一个队列直接用STL库的那个如果有同学不懂STL的队列可以看看这篇【C】STL栈和队列基本功能介绍、题目练习和模拟实现容器适配器。 两种角色生产者和消费者的表示这里我就先来简单一点的一个生产者的线程和一个消费者的线程。后面的那个例子再来多个生产者和消费者。 3种关系中生生、消消、生消都要保持两两互斥生消还要有一个同步的关系。我们可以用一个类来实现其中可以用一个锁来表示所有的互斥用两个条件变量来表示生消的同步这个类还可以将第一点中的队列包含在在内。
先把这个类简单给出来 templateclass T
class CPqueue
{
public:// 构造CPqueue(int capacity): _capacity(capacity){// 互斥锁、条件变量都要用接口来初始化pthread_mutex_init(_mtx, nullptr);pthread_cond_init(_full, nullptr);pthread_cond_init(_empty, nullptr);}private:// 用队列这个数据结构来表示std::queueT _q;// 超市中能够存放的最大容量int _capacity;// 判断超市是否已经放满了pthread_cond_t _full;// 判断超市是否是空的pthread_cond_t _empty;// 互斥锁用来两两互斥pthread_mutex_t _mtx;
};上面的两个条件变量一个是判断当前队列中存放的数据满了没有一个是判断当前队列中是否空了。 如果满了就得让生产者线程阻塞不能继续生产了并通知消费者来消费等接到生产信号了再去生产 如果空了就得让消费者线程阻塞不能继续消费了并通知生产者去生产等接到消费信号了再去消费。 而这里对应的生产的动作就是往队列中push数据消费的动作就是从队列中pop数据。
所以要提供两个接口一个是让生产者push的一个是让消费者pop的。
下面的代码是在CPqueue中的
// 判断队列是否为空
bool IsEmpty()
{return _q.size() 0;
}// 判断队列是否已满
bool IsFull()
{return _q.size() _capacity;
}void PushData(const T data)
{// 进来先上锁pthread_mutex_lock(_mtx);/* 上了锁之后先判断临界资源是否准备就绪也就是队列是否满了*/if(IsFull()) pthread_cond_wait(_full, _mtx);// 到此处就说明队列不满就可以push数据了_q.push(data);// push完了先发送让消费者消费的信号pthread_cond_signal(_empty);// 解锁pthread_mutex_unlock(_mtx);
}void PopData(T data)
{// 先上锁pthread_mutex_lock(_mtx);/* 上锁后先判断临界资源是否准备就绪也就是队列是否为空*/if(IsEmpty()) pthread_cond_wait(_empty, _mtx);// 到此处说明队列不为空就可以pop了data _q.front(); // 先拿数据再pop_q.pop();// pop完了发送让生产者生产的信号pthread_cond_signal(_full);// 解锁pthread_mutex_unlock(_mtx);
}这里就是同步和互斥的逻辑。包含了一个交易场所和三种关系中的生消同步和互斥。
下面来写生产者和消费者的线程
// 生产者执行的方法
void* Productor(void* arg)
{CPqueueint* pq (CPqueueint*)arg;int data 10;while(1){std::cout productor send data :: data std::endl;pq-PushData(data);data;sleep(1);}
}// 消费者执行的方法
void* Consumer(void* arg)
{CPqueueint* pq (CPqueueint*)arg;while(1){int data 0;pq-PopData(data);std::cout consumer get data :: data std::endl std::endl;}
}int main()
{CPqueueint* pq new CPqueueint(6);// 消费者线程pthread_t consumerThread;// 生产者线程pthread_t productorThread;// 消费者线程初始化pthread_create(consumerThread, nullptr, Consumer, (void*)pq);// 生产者线程初始化pthread_create(productorThread, nullptr, Productor, (void*)pq);// 等待从线程退出pthread_join(consumerThread, nullptr);pthread_join(productorThread, nullptr);delete pq;return 0;
}运行起来效果就是这样的
还可以控制消费者的消费时间点比如说队列满了再让消费者消费
运行 同样的还可以队列装一半了再退出这里就不演示了。
这里可以说一下pthread_cond_wait的作用了 pthread_cond_wait是在临界区中的来一个问题说如果线程等待了会持有锁等待吗 答案是不会的pthread_cond_wait的第二个参数是一把锁的指针意义在于当pthread_cond_wait调用成功后传入pthread_cond_wait的锁会被自动解开所以不用担心线程在pthread_cond_wait的时候会带锁wait。 线程阻塞并恢复之后从哪里阻塞就会从哪里唤醒也就是pthread_cond_wait这里当线程被唤醒时pthread_cond_wait会自动帮助线程申请其本来调用pthread_cond_wait成功时解开的那把锁因为县城被唤醒时仍然在临界区中。
不过这里唤醒有多种情况就先说一个生产者和一个消费者的。 比如说我这里代码写的是先唤醒对方线程再解锁
当另一方醒来时wait会自动申请锁但此时锁是被当前方的线程占用的所以另一方又会在锁上面进行阻塞等待申请锁时若锁被占用则当前线程就阻塞等待锁资源等到当前方解锁后另一方就会自动拿到锁。
先解锁后唤醒时比如这样 此时就是另一方被唤醒时锁未被占用那么就会直接得到锁。
这里是单生产者和单消费者的还有多生产者和多消费者的情况同理不过是在锁上等待的线程会更多这里就不再多讲了。
需要注意的是唤醒和解锁的先后顺序是都可以的只要发生了生成——消费这一行为即可。 不过我个人更推荐先唤醒后解锁。
前面说了生产消费者模型能够提高工作效率。如果单从生产 - 消费的角度来看问题的话其实这里并没有什么效率上的提高。因为这一步只是进行了简单的拷贝而已真正的效率提高在生产者和消费者可同时工作这一点上。生产者生产完后消费者去取数据并执行后续操作在这个过程中生产者还可以继续接受生产的任务比如网络发来的数据 / 标准输入的数据接收好就送到缓冲区超市中重在二者可以并发执行。就像餐馆里面的大厨和服务员一样大厨做好饭后不需要亲自将饭送到餐桌上而是让端盘的服务员去送服务员送的同时大厨仍可继续做饭服务员送完菜后也可 处理顾客的其他要求拿酒、盛米饭等。这样服务员和大厨间的工作是互不影响的效率就高了。
真正的时间消耗在生产者获取到数据过程 和 消费者获取到数据后的后续处理过程中间的拷贝耗时相对来说是非常短的当消费者的后续处理动作很耗时时可以搞多个消费者线程并发执行该动作总线程个数尽量不要超过CPU核数相等最好这样效率会更高。
版本二
pthread_cond_wait是一个函数只要是一个函数就可能会调用失败拿push来说
if(IsFull()) pthread_cond_wait();pthread_cond_wait是有返回值的是一个int调用失败了返回的是一个错误码。
如果这里pthread_cond_wait调用失败了那就可能导致队列是满的但是代码仍向下执行了就会超出规定的队列容量STL的queue中会自动扩容我们写的那个capacity只是我们自己规定的如果出现不合法的行为也是不好检测的那么这样就不合理了。pthread_cond_wait还可能存在伪唤醒的情况意思就是条件变量_full/_empty并不满足条件但是当前线程被唤醒了。比如若其他线程误操作发送了信号就会导致当前线程跳出其所阻塞的队列中并进行后续操作也是不合理的。
所以要把if改一改换成while
这样就算调用失败了或者伪唤醒了都会再上去判断一次IsFull()此时如果队列还是满的就会再次调用pthread_cond_wait若还失败了又会上去再判断……知道真正被唤醒并且队列不是满的了就会跳出while循环这里push是这样pop也是同理的
故当跳出while后后面的代码是一定能正确的进行生产和消费的。
还有这里的阻塞队列不用关心哪一个线程先执行。 如果是生产者先执行的话就直接生产即可。 如果是消费者先执行的话队列为空会阻塞住此时就会还会让生产者来生产。 所以逻辑上一定是能让生产者先往队列中生产者东西的。
注意我上面代码中用了模版而且上面演示的类型是int这里我要改一改了改成一个自定义类型。简单实现一个计算器其成员变量为两个值一个function包装器如果有屏幕前的你对包装器不了解的话可以看这篇【C】C11中比较重要的内容介绍
// 对function包装器重命名一下
typedef std::functionint(int, int) func;class Caculator
{
public:Caculator(){}Caculator(int x, int y, func fun): _x(x), _y(y), _fun(fun){}int operator()(){return _fun(_x, _y);}public:int _x;int _y;func _fun;
};阻塞队列的代码不变。线程执行方法变一下
// 存放各个函数的做法和接口
std::vectorstd::pairchar, func kv;// 这里搞了4个计算功能加减乘除
const int KVSIZE 4;// 生产者执行方法
void* Productor(void* arg)
{CPqueueCaculator* pq (CPqueueCaculator*)arg;while(1){// 随机分配给消费者任务int task rand() % KVSIZE;func fun kv[task].second;int x rand() % 200;int y rand() % 500;std::cout productor send task :: task -- x (kv[task].first) y ? std::endl;pq-PushData(Caculator(x, y, fun));sleep(1);}
}// 消费者执行方法
void* Consumer(void* arg)
{CPqueueCaculator* pq (CPqueueCaculator*)arg;while(1){Caculator tmp;pq-PopData(tmp);std::cout consumer get task :: tmp() std::endl std::endl;}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid() ^ 0x323424);func MyAdd [](int x, int y){ return x y; };func MySub [](int x, int y){ return x - y; };func MyMul [](int x, int y){ return x * y; };func MyDiv [](int x, int y){ return x / y; };// 这里用到了lambda表达式如果屏幕前的你不懂可以看看我刚刚给的那个链接kv.push_back(std::pairchar, func(, MyAdd));kv.push_back(std::pairchar, func(-, MySub));kv.push_back(std::pairchar, func(*, MyMul));kv.push_back(std::pairchar, func(/, MyDiv));CPqueueCaculator* pq new CPqueueCaculator(4);// 消费者线程pthread_t consumerThread;// 生产者线程pthread_t productorThread;// 消费者线程初始化pthread_create(consumerThread, nullptr, Consumer, (void*)pq);// 生产者线程初始化pthread_create(productorThread, nullptr, Productor, (void*)pq);pthread_join(consumerThread, nullptr);pthread_join(productorThread, nullptr);delete pq;return 0;
}再加上阻塞队列运行起来就是这样
这里除法有一丁点问题我就不改了各位想改的自己动手试试改正浮点数就行。
多生多消
这里搞简单点就两个生产者、两个消费者
其它代码就只用改消费者打印换行一次所有从线程打印一下线程id就行。
运行
可以看到有两个消费者和两个生产者成功。
这里就和线程池有点像了不过我不打算在这篇讲线程池下一篇再详谈。
利用RAII来对锁进行优化
RAII学过智能指针的同学应该知道是啥如果你不懂看这篇【C】智能指针。
这里写一个类专门用来管理锁资源
class LockGuard
{
public:LockGuard(pthread_mutex_t* pmtx):_pmtx(pmtx){pthread_mutex_lock(_pmtx);}~LockGuard(){pthread_mutex_unlock(_pmtx);}public:pthread_mutex_t* _pmtx;
};再加锁的时候就不需要调用pthread库中的函数了直接定义一个局部的对象就行定义时自定调用构造就会进行加锁析构就会调用解锁。
void PushData(const T data)
{// 直接让对象来管理锁LockGuard LG(_mtx); // 构造加锁/* 上了锁之后先判断临界资源是否准备就绪也就是队列是否满了*/while(IsFull()) pthread_cond_wait(_full, _mtx);// 到此处就说明队列不满就可以push数据了_q.push(data);// 发送消费者消费的信号pthread_cond_signal(_empty);} // 析构解锁void PopData(T data)
{// 直接让对象来管理锁LockGuard LG(_mtx);// 构造加锁/* 上锁后先判断临界资源是否准备就绪也就是队列是否为空*/while(IsEmpty()) pthread_cond_wait(_empty, _mtx);// 到此处说明队列不为空就可以pop了data _q.front(); // 先拿数据再pop_q.pop();// pop完了发送让生产者生产的信号pthread_cond_signal(_full);} // 析构解锁运行就是
和前面没啥区别。不过RAII的思想放到这里非常的妙。 这里也就是RALL风格的加锁。
这篇就讲到这下一篇细说信号量等知识。
到此结束。。。