图书馆新生专栏网站建设,砀山网站建设,建网站哪便宜,仿淘宝网站制作线程池c实现
概述
线程池#xff08;Thread Pool#xff09;是一种并发编程的设计模式#xff0c;它用于管理和重复使用线程#xff0c;以提高程序的性能和资源利用率。线程池通过维护一组预先创建的线程#xff0c;这些线程可以在需要时被重复使用#xff0c;而不是为…线程池c实现
概述
线程池Thread Pool是一种并发编程的设计模式它用于管理和重复使用线程以提高程序的性能和资源利用率。线程池通过维护一组预先创建的线程这些线程可以在需要时被重复使用而不是为每个任务都创建一个新的线程。
线程池通常包含以下主要组件
任务队列Task Queue 用于存储待执行的任务。当有新的任务到达时它被放入任务队列中等待执行。工作线程Worker Threads 一组预先创建的线程它们在整个程序的生命周期内一直存在。这些线程从任务队列中获取任务并执行任务的操作。线程池管理器Thread Pool Manager 负责管理线程池的创建、销毁和任务分配等工作。它维护线程池中的线程和任务队列并协调它们的工作。
线程池的优势包括
降低线程创建和销毁的开销 创建和销毁线程是相对昂贵的操作。线程池通过重复使用线程减少了这些开销。控制并发度 可以限制线程的数量防止系统过载。这有助于更好地管理系统资源。提高响应速度 当有任务到达时线程池中的工作线程可以立即处理而不需要等待新线程的创建。简化线程管理 线程池将线程的管理抽象出来开发者只需关注任务的提交和处理。
预备知识
并发线程基础: 包括线程、进程、同步、互斥、锁等操作系统基础知识: 关于线程和进程管理的方面内存管理: 确保在多线程环境下正确地分配和释放内存消费者和生产者模型
实现
这里我没有按照上述的三部分来说明, 我觉得分为以下四个部分更容易理解线程池的架构
任务队列
任务结构体 struct Task 任务队列类 class TaskQueue 线程池类 工作线程 管理者线程
1. 任务队列
这里主要就是利用c的封装, 把任务队列封装为一个类, 因为任务队列自身要完成添加任务, 取出任务等行为, 所以我们把任务队列封装为一个类TaskQueue, 这个类也将作为线程池类的类成员
因为任务队列里面放的肯定是任务, 这个任务是什么类型呢? 我们把这个任务也封装一下, 封装为任务结构体Task, 那这个结构体里有哪些成员呢? 熟悉c语言的人应该知道回调函数的基本形式, 大多是由一个**函数指针和一个函数参数也就是通用指针组成**, 也就是:
void(*func)(void* arg);
void* arg;这声明了一个函数指针 func该指针指向一个接受 void* 类型的参数通常是一个指向某种数据的指针并返回 void 的函数。
所以我们的任务结构体可以确定, 在定义一个任务队列 class TaskQueue:
成员:
queue队列存放任务Task锁, 用来在添加任务和取出任务时加锁
方法:
构造, 析构去任务, 放任务返回任务队列中的任务个数
具体的实现代码如下:
声明 TaskQueue.h
// TaskQueue.h#ifndef TEST_TASKQUEUE_H
#define TEST_TASKQUEUE_H
#include queue
using namespace std;
#include pthread.husing callback void(*) (void* arg);struct Task{callback func;// void(*function)(void* arg);void* arg;// 构造函数Task(){func nullptr;arg nullptr;}// 有参构造Task(callback f, void* a): func(f), arg(a) {};
};class TaskQueue {
public:// 构造和析构TaskQueue();~TaskQueue();// 添加任务void addTask(Task task);void addTask(callback f, void* arg);// 取出任务Task takeTask();// 返回任务队列中的个数, 使用内联函数, 提高效率inline int getTaskNum(){return (int)m_Queue.size();}
private:pthread_mutex_t m_Mutex;queueTask m_Queue;
};#endif //TEST_TASKQUEUE_H实现 TaskQueue.cpp
// TaskQueue.cpp#include TaskQueue.h
#include pthread.hTaskQueue::TaskQueue() {pthread_mutex_init(this-m_Mutex, nullptr);
}TaskQueue::~TaskQueue() {pthread_mutex_destroy(this-m_Mutex);
}void TaskQueue::addTask(Task task) {pthread_mutex_lock(this-m_Mutex);this-m_Queue.push(task);pthread_mutex_unlock(this-m_Mutex);
}void TaskQueue::addTask(callback f, void *arg) {pthread_mutex_lock(this-m_Mutex);this-m_Queue.push(Task(f, arg));pthread_mutex_unlock(this-m_Mutex);
}Task TaskQueue::takeTask() {Task task;pthread_mutex_lock(this-m_Mutex);task this-m_Queue.front();this-m_Queue.pop();pthread_mutex_unlock(this-m_Mutex);return task;
}2. ThreadPool类
这个类就是线程池类, 实现线程池的各种操作, 其中线程池的创建和析构较为麻烦一些, 剩下的行数都比较简单
类成员:
任务队列, TaskQueue类管理者线程 * 1工作线程 * n各种线程数量锁, 所整个线程池条件变量, 任务队列为空时阻塞工作线程, 非空时工作线程解除阻塞是否要销毁线程池 bool类型
函数:
构造, 析构添加任务获取活着的, 忙的线程个数线程退出函数
先看声明文件:
声明ThreadPool.h
// ThreadPool.h#ifndef TEST_THREADPOOL_H
#define TEST_THREADPOOL_H
#include TaskQueue.hclass ThreadPool {
public:// 构造函数ThreadPool(int min, int max);~ThreadPool();// 添加任务void addTask(Task task);// 获取忙线程个数int getBusyNum();// 获取活着的线程个数int getLiveNum();private:// 为什么要设置为静态函数呢// 这里也可以不将其设置为静态的, 可以把worker函数和manager函数变为全局函数// 具体原因可以看构造函数中创建线程的部分static void* worker(void* arg);static void* manager(void* arg);void threadExit();private:TaskQueue* taskQueue; // 任务队列pthread_t managerId; // 管理者线程IDpthread_t* workIDs; // 工作的线程ID 多个int m_Min; // 最少线程数量int m_Max; // 最多线程数量int m_BusyNum; // 忙着的线程数量int m_LiveNum; // 存活的线程数量int m_ExitNum; // 要销毁的线程数量 (线程较多, 任务少的时候)pthread_mutex_t m_MutexPool; // 锁整个线程池pthread_cond_t m_NotEmpty; // 任务队列是不是空了bool m_ShutDown false; // 是否要销毁线程池, 销毁 - 1, 不销毁 - 0
};#endif //TEST_THREADPOOL_H然后我们一个一个的看ThreadPool里面的函数
首先是构造函数, 也就是我们创建线程池的时候要完成哪些操作:
构造函数
ThreadPool(int min, int max);函数的参数时线程池最少线程数量和最多线程数量, 意思就是就算没有任务, 也要有min个线程就绪, 任务再多, 也只能有max个线程
步骤:
创建一个TaskQueue实例为工作者线程开辟堆内存将工作线程的线程id初始化为0初始化成员中的各种线程数量和线程池是否关闭的bool值判断锁和条件变量是否初始化成功(同时也初始化了锁和条件变量)创建管理者线程和工作线程
来看一下构造函数的具体实现:
// 构造函数
ThreadPool::ThreadPool(int min, int max) {do {this-taskQueue new TaskQueue;if(taskQueue nullptr){cout new taskQueue failed... endl;break;}workIDs new pthread_t[max];if (workIDs nullptr) {printf(new threadIDs failed...\n);break;}// 将工作线程的id都初始化为0memset(workIDs, 0, sizeof(pthread_t) * max);m_Min min;m_Max max;m_BusyNum 0;m_LiveNum min;m_ExitNum 0;// 判断锁和条件变量是否初始化成功if (pthread_mutex_init(m_MutexPool, nullptr) ! 0 ||pthread_cond_init(m_NotEmpty, nullptr) ! 0) {printf(mutex or condition init failed...\n);break;}m_ShutDown false;// 创建线程// 管理者线程pthread_create(managerId, nullptr, manager, this);// 工作线程for (int i 0; i min; i) {pthread_create(workIDs[i], nullptr, worker, this);}return;} while (0);// do while 外面这些代码都是出了问题才会走到的delete []workIDs;delete taskQueue;return;
}这里的do…while(0) 结构主要是在好几次初始化的过程中有可能失败, 失败的话每个条件判断语句中都要释放一遍堆内存, 干脆就放在do…while循环中, 如果程序都正常运行, 在do中的最后就return了, 走不到do…while的外面
在创建管理者线程和工作线程的时候, 我们把this指针传给了manager函数和worker函数, 所以我们再来看这两个函数
工作线程 worker
虽然这一部分我在大纲中归为第三大部分, 但其实worker函数和manager函数也是ThreadPool类的一部分, 只不过是静态成员函数(静态的原因后面再说)
看一下工作线程的处理流程
判断任务队列是否为空, 空: 阻塞判断ThreadPool是否要关闭了取出任务开始工作释放资源
具体实现:
void *ThreadPool::worker(void *arg) {// 类型转换ThreadPool* pool static_castThreadPool*(arg);// 要一直检查队列里的内容while(true){pthread_mutex_lock(pool-m_MutexPool);// 判断任务队列是否为空while(pool-taskQueue-getTaskNum() 0 !pool-m_ShutDown){// 阻塞工作线程 条件变量pthread_cond_wait(pool-m_NotEmpty, pool-m_MutexPool);// 判断是否有要销毁的线程if(pool-m_ExitNum 0){--pool-m_ExitNum;if(pool-m_LiveNum pool-m_Min){--pool-m_LiveNum;pthread_mutex_unlock(pool-m_MutexPool);pool-threadExit();}}}// 判断线程池是否要关闭if(pool-m_ShutDown){// 先解锁后退出pthread_mutex_unlock(pool-m_MutexPool);pool-threadExit();}// 取任务Task task pool-taskQueue-takeTask();// busy线程1pool-m_BusyNum;// 解锁pthread_mutex_unlock(pool-m_MutexPool);// 开始工作cout thread pthread_self() start working...\n;// 任务处理task.func(task.arg);delete task.arg;task.arg nullptr;// 处理结束cout thread pthread_self() end working...\n;pthread_mutex_lock(pool-m_MutexPool);--pool-m_BusyNum;pthread_mutex_unlock(pool-m_MutexPool);}return nullptr;
}类型转换说明: 因为传进来的是一个void* 类型的指针, 我们要把它转换为ThreadPool*类型离开操作
while(true)说明: 工作线程要一直检测任务队列中是否有任务, 只要有任务就要处理, 没有的话就阻塞
管理者线程 manager
管理者的任务主要是按照线程池工作线程的数量和任务数量相应的创建和销毁线程
按照频率检测线程数量
把需要的线程数量取出来按规则创建和销毁
具体实现代码:
void *ThreadPool::manager(void *arg) {// 类型转换ThreadPool* pool static_castThreadPool*(arg);while(!pool-m_ShutDown){// 按频率3s检测一次sleep(3);// 取出当前任务队列的任务个数 和 当前线程数 和 繁忙的线程数pthread_mutex_lock(pool-m_MutexPool);int queueSize pool-taskQueue-getTaskNum();int liveNum pool-m_LiveNum;int busyNum pool-m_BusyNum;pthread_mutex_unlock(pool-m_MutexPool);// 按相应规则创建和销毁线程// 创建线程// 规则: 任务数量 线程数量 线程数量 maxconst int NUMBER 2;if(queueSize liveNum liveNum pool-m_Max){// 创建线程// 在threadIDs数组中找一个可用的空间存放新创建的id// 遍历整个threadID数组, 看哪些可用pthread_mutex_lock(pool-m_MutexPool);int count 0;for(int i 0; i pool-m_Max pool-m_LiveNum pool-m_Max count NUMBER; i){if(pool-workIDs[i] 0){ // 空间可用pthread_create(pool-workIDs[i], nullptr, worker, pool);count;pool-m_LiveNum;}}pthread_mutex_unlock(pool-m_MutexPool);}// 销毁线程// 规则: 忙的线程*2 存活的线程数 存活的线程最小线程数if(busyNum * 2 liveNum liveNum pool-m_Min){pthread_mutex_lock(pool-m_MutexPool);pool-m_ExitNum NUMBER;// 让线程自杀for (int i 0; i NUMBER pool-m_LiveNum pool-m_Min; i) {pthread_cond_signal(pool-m_NotEmpty);}pthread_mutex_unlock(pool-m_MutexPool);}}return nullptr;
}具体的规则已经在代码中指出, 这个规则是可以自己规定的
解释一下让线程自杀的逻辑:
让线程自杀之后, 会调用pthread_cond_signal唤醒阻塞的worker线程, 就是上一部分中第10行代码, 这时exitNum不为0, 就会执行第12行的逻辑, 也就是worker线程的这一部分:
// 阻塞工作线程 条件变量
pthread_cond_wait(pool-m_NotEmpty, pool-m_MutexPool);
// 判断是否有要销毁的线程
if(pool-m_ExitNum 0){--pool-m_ExitNum;if(pool-m_LiveNum pool-m_Min){--pool-m_LiveNum;pthread_mutex_unlock(pool-m_MutexPool);pool-threadExit();}
}其余各种锁的逻辑应该都能看懂, 不做过多解释
添加任务
这部分的逻辑相对简单一些
直接看代码:
void ThreadPool::addTask(Task task) {if(m_ShutDown) return;// 添加任务不需要加锁任务队列中有锁this-taskQueue-addTask(task);// 唤醒工作线程pthread_cond_signal(this-m_NotEmpty);
}获取忙的线程, 活着的线程个数
int ThreadPool::getBusyNum() {int busyNum 0;pthread_mutex_lock(m_MutexPool);busyNum m_BusyNum;pthread_mutex_unlock(m_MutexPool);return busyNum;
}int ThreadPool::getLiveNum() {int liveNum 0;pthread_mutex_lock(m_MutexPool);liveNum m_LiveNum;pthread_mutex_unlock(m_MutexPool);return liveNum;
}线程退出函数
细心的可以看出其他部分代码, 线程退出是使用的pool-threadExit();, 而不是直接pthread_exit(NULL)
原因主要是, 在线程退出后, 我们还需要把这个线程在workerID中的线程id重置为0, 所以不是单纯的调用pthread_exit(NULL)这么简单, 看代码:
void ThreadPool::threadExit() {pthread_t pid pthread_self();for(int i 0; i this-m_Max; i){if(this-workIDs[i] pid){cout threadExit() function: thread to_string(pthread_self()) exiting... endl;this-workIDs[i] 0;break;}}pthread_exit(NULL);
}运行
所有代码已经准备完毕, 给大家看一下整体的结构
一共5个文件, 2个.h 对应2个 .cpp, 还有一个main文件 TaskQueue.h文件的结构如下: TaskQueue.cpp文件的结构: ThreadPool.h文件的结构: Thread Pool.cpp文件的结构: 把完整的ThreadPool.cpp代码放在下面:
实现ThreadPool.cpp
// ThreadPool.cpp#include ThreadPool.h
#include pthread.h
#include cstring
#include unistd.hThreadPool::ThreadPool(int min, int max) {do {this-taskQueue new TaskQueue;if(taskQueue nullptr){cout new taskQueue failed... endl;break;}workIDs new pthread_t[max];if (workIDs nullptr) {printf(new threadIDs failed...\n);break;}// 将工作线程的id都初始化为0memset(workIDs, 0, sizeof(pthread_t) * max);m_Min min;m_Max max;m_BusyNum 0;m_LiveNum min;m_ExitNum 0;// 判断锁和条件变量是否初始化成功if (pthread_mutex_init(m_MutexPool, nullptr) ! 0 ||pthread_cond_init(m_NotEmpty, nullptr) ! 0) {printf(mutex or condition init failed...\n);break;}m_ShutDown false;// 创建线程// 管理者线程// 将manager和worker函数设置为静态函数的进一步解释// 在使用pthread_create函数创建线程时该函数的第三个参数是一个函数指针指向线程所要执行的函数。// 在C中非静态成员函数需要一个对象实例才能被调用而pthread_create函数只接受普通的函数指针// 因此在这种情况下通常会将非静态成员函数转换为静态成员函数或全局函数。//// 静态成员函数和全局函数没有与特定对象实例相关联因此可以直接使用函数指针传递给pthread_create// 而不需要担心对象的实例。这是因为静态成员函数和全局函数不依赖于特定对象的状态它们只能访问静态成员或全局变量。pthread_create(managerId, nullptr, manager, this);// 把this传给manager函数:// 因为manager函数是静态函数, 只能访问类的静态成员变量// 要想访问类的非静态成员变量(函数), 必须把类的实例对象传进去// 工作线程for (int i 0; i min; i) {pthread_create(workIDs[i], nullptr, worker, this);}return;} while (0);// do while 外面这些代码都是出了问题才会走到的delete []workIDs;delete taskQueue;return;
}ThreadPool::~ThreadPool() {m_ShutDown true;// 阻塞回收管理者线程pthread_join(managerId, NULL);// 唤醒消费者线程for(int i 0; i m_LiveNum; i){pthread_cond_signal(m_NotEmpty);}// 删除堆内存if(taskQueue) delete taskQueue;if(workIDs) delete []workIDs;pthread_mutex_destroy(m_MutexPool);pthread_cond_destroy(m_NotEmpty);}void ThreadPool::addTask(Task task) {if(m_ShutDown) return;// 添加任务不需要加锁任务队列中有锁this-taskQueue-addTask(task);// 唤醒工作线程pthread_cond_signal(this-m_NotEmpty);
}int ThreadPool::getBusyNum() {int busyNum 0;pthread_mutex_lock(m_MutexPool);busyNum m_BusyNum;pthread_mutex_unlock(m_MutexPool);return busyNum;
}int ThreadPool::getLiveNum() {int liveNum 0;pthread_mutex_lock(m_MutexPool);liveNum m_LiveNum;pthread_mutex_unlock(m_MutexPool);return liveNum;
}void *ThreadPool::worker(void *arg) {// 类型转换ThreadPool* pool static_castThreadPool*(arg);// 要一直检查队列里的内容while(true){pthread_mutex_lock(pool-m_MutexPool);// 判断任务队列是否为空while(pool-taskQueue-getTaskNum() 0 !pool-m_ShutDown){// 阻塞工作线程 条件变量pthread_cond_wait(pool-m_NotEmpty, pool-m_MutexPool);// 判断是否有要销毁的线程if(pool-m_ExitNum 0){--pool-m_ExitNum;if(pool-m_LiveNum pool-m_Min){--pool-m_LiveNum;pthread_mutex_unlock(pool-m_MutexPool);pool-threadExit();}}}// 判断线程池是否要关闭if(pool-m_ShutDown){// 先解锁后退出pthread_mutex_unlock(pool-m_MutexPool);pool-threadExit();}// 取任务Task task pool-taskQueue-takeTask();// busy线程1pool-m_BusyNum;// 解锁pthread_mutex_unlock(pool-m_MutexPool);// 开始工作cout thread pthread_self() start working...\n;// 任务处理task.func(task.arg);delete task.arg;task.arg nullptr;// 处理结束cout thread pthread_self() end working...\n;pthread_mutex_lock(pool-m_MutexPool);--pool-m_BusyNum;pthread_mutex_unlock(pool-m_MutexPool);}return nullptr;
}void *ThreadPool::manager(void *arg) {// 类型转换ThreadPool* pool static_castThreadPool*(arg);while(!pool-m_ShutDown){// 按频率3s检测一次sleep(3);// 取出当前任务队列的任务个数 和 当前线程数 和 繁忙的线程数pthread_mutex_lock(pool-m_MutexPool);int queueSize pool-taskQueue-getTaskNum();int liveNum pool-m_LiveNum;int busyNum pool-m_BusyNum;pthread_mutex_unlock(pool-m_MutexPool);// 按相应规则创建和销毁线程// 创建线程// 规则: 任务数量 线程数量 线程数量 maxconst int NUMBER 2;if(queueSize liveNum liveNum pool-m_Max){// 创建线程// 在threadIDs数组中找一个可用的空间存放新创建的id// 遍历整个threadID数组, 看哪些可用pthread_mutex_lock(pool-m_MutexPool);int count 0;for(int i 0; i pool-m_Max pool-m_LiveNum pool-m_Max count NUMBER; i){if(pool-workIDs[i] 0){ // 空间可用pthread_create(pool-workIDs[i], nullptr, worker, pool);count;pool-m_LiveNum;}}pthread_mutex_unlock(pool-m_MutexPool);}// 销毁线程// 规则: 忙的线程*2 存活的线程数 存活的线程最小线程数if(busyNum * 2 liveNum liveNum pool-m_Min){pthread_mutex_lock(pool-m_MutexPool);pool-m_ExitNum NUMBER;// 让线程自杀for (int i 0; i NUMBER pool-m_LiveNum pool-m_Min; i) {pthread_cond_signal(pool-m_NotEmpty);}pthread_mutex_unlock(pool-m_MutexPool);}}return nullptr;
}void ThreadPool::threadExit() {pthread_t pid pthread_self();for(int i 0; i this-m_Max; i){if(this-workIDs[i] pid){cout threadExit() function: thread to_string(pthread_self()) exiting... endl;this-workIDs[i] 0;break;}}pthread_exit(NULL);
}代码里也即是了为什么要把worker和manager函数设置为静态成员函数, 以及为什么要把this指针传给回调函数, 如果不传的话, 静态成员函数是无法访问类中的其他非静态成员变量的
最后看一下运行文件main.cpp
// main.cpp#include TaskQueue.h
#include iostream
#include TaskQueue.cpp
#include ThreadPool.h
#include ThreadPool.cpp
using namespace std;void func(void* arg){int num *(int*)arg;cout thread pthread_self() is working, num num endl;sleep(1);
}int main(){ThreadPool pool(3, 10);for (int i 0; i 100; i) {int* num new int(i 100);pool.addTask(Task(func, num));}sleep(20);return 0;
}这里主要是写了一个数数的任务函数
运行效果如下图: 但是不知道为什么有的地方会出现bug, 大家有想法的也可以告诉我, 我改正一下