珠海十大网站建设公司,做财经直播网站,制作表白网页,外贸平台建设需要云服务器等云产品来学习Linux的同学可以移步/–腾讯云–/官网#xff0c;轻量型云服务器低至112元/年#xff0c;优惠多多。#xff08;联系我有折扣哦#xff09; 文章目录 1. 线程池1.1 线程池是什么1.2 为什么要有线程池1.3 线程池的应用场景1.4 线程池的任…需要云服务器等云产品来学习Linux的同学可以移步/–腾讯云–/官网轻量型云服务器低至112元/年优惠多多。联系我有折扣哦 文章目录 1. 线程池1.1 线程池是什么1.2 为什么要有线程池1.3 线程池的应用场景1.4 线程池的任务1.5 线程池的代码实现 2. 线程安全的单例模式3. STL、智能指针和线程安全4. 其他常见锁的了解5. 读者写者问题 1. 线程池
1.1 线程池是什么
线程池是一种线程的使用方式是一种池化技术在很早之前我们其实接触过一些池化技术的例子比如STL容器内部实现的alloctor就是一种内存的池化技术将要用的空间首先申请一大批然后再按照容器的需要从这个内存池中申请。还有在学习进程的时候我们也尝试写过一些进程池的代码一次创建一大批进程按照要求分别执行不同的任务。当然这里也可以创建线程池用线程来执行任务
1.2 为什么要有线程池
线程过多会带来调度开销进而影响缓存局部性和整体性能。而线程池维护着多个线程等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
1.3 线程池的应用场景
需要大量的线程来完成任务且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务使用线程池技术是非常合适的。因为单个任务小而任务数量巨大你可以想象一个热门网站的点击次数。 但对于长时间的任务比如一个Telnet连接请求线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。对性能要求苛刻的应用比如要求服务器迅速响应客户请求。接受突发性的大量请求但不至于使服务器因此产生大量线程的应用。突发性大量客户请求在没有线程池情况下将产生大量线程虽然理论上大部分操作系统线程数目最大值不是问题短时间内产生大量线程可能使内存到达极限出现错误。
1.4 线程池的任务
创建固定数量线程池循环从任务队列中获取任务对象获取到任务对象后执行任务对象中的任务接口
1.5 线程池的代码实现
/* ThreadPool.hpp 线程池的代码实现 */
#pragma once
#include LockGuard.hpp
#include Thread.hpp
#include vector
#include queue
#include string
#include iostreamconst int gnum 5; // 线程池中默认的线程个数template class T
class ThreadPool; // 线程池类的声明/* 线程数据类保存线程对应的内容包括线程池对象的指针和线程名 */
template class T
class ThreadData
{
public:ThreadData(ThreadPoolT *tp, const std::string n) : threadpool(tp), name(n){};public:ThreadPoolT *threadpool;std::string name;
};/* 线程池类的实现 */
template class T
class ThreadPool
{
public:static void *handleTask(void *args) // 线程需要执行的回调函数{ThreadDataT *td static_castThreadDataT *(args);while (true){T t; // 构建任务对象{LockGuard lockGuard(td-threadpool-mutex()); // 上锁while (td-threadpool-isQueueEmpty()){// 如果任务队列为空线程挂起等待队列中被填充任务td-threadpool-threadWait();}t td-threadpool-pop(); // 如果队列中有任务就拿出任务}// 任务在锁外执行std::cout 获取了一个任务 t.toTaskString() 并执行了任务结果是 t() std::endl;}delete td;return nullptr;}public: // 给handleTask调用的外部接口pthread_mutex_t *mutex() { return _mutex; }bool isQueueEmpty() { return _task_queue.empty(); }void threadWait() { pthread_cond_wait(_cond, _mutex); }T pop() // 获取线程池中任务队列里需要执行的下一个任务{T t _task_queue.front();_task_queue.pop();return t;}public: // 需要暴露给外部的接口ThreadPool(const int num gnum) // 构造函数初始化互斥量和条件变量构建指定个数的Thread对象{pthread_mutex_init(_mutex, nullptr);pthread_cond_init(_cond, nullptr);for (int i 0; i num; i){_threads.push_back(new Thread());}}void run() // 为所有线程对象创建真正的执行流并执行对应的回调函数{for (const auto thread : _threads){ThreadDataT *td new ThreadDataT(this, thread-GetTaskName()); // 构造handleTask的参数对象thread-start(handleTask, td); // 调用该线程的start函数创建新线程执行指定的handleTask任务std::cout thread-GetTaskName() start... std::endl;}}void push(const T in) // 将指定任务push到队列中{// 加锁LockGuard lockGuard(_mutex); // 自动加锁在当前代码段结束之后调用LockGuard的析构函数解锁_task_queue.push(in);pthread_cond_signal(_cond); // 发送信号表示此时task_queue中有值让消费者可以使用}~ThreadPool() // 析构函数销毁互斥量和条件变量delete所有thread对象指针自动调用thread对象的析构函数{pthread_mutex_destroy(_mutex);pthread_cond_destroy(_cond);for (auto thread : _threads){delete thread;}}private:std::vectorThread * _threads; // 保存所有线程对象的指针std::queueT _task_queue; // 需要被分配的任务队列pthread_mutex_t _mutex; // 任务队列需要被互斥的访问pthread_cond_t _cond; // 生产任务和消费任务之间需要进行同步
};构建和测试线程池用到的小组件
/* Task.hpp 线程对象C风格的线程库*/
#pragma once#include iostream
#include string
#include functional
#include pthread.h
#include cassertclass Thread
{
public:using func_t std::functionvoid *(void *); // 定义func_t类型static int number; // 线程编号按照一次运行时的调用次数计数
public:Thread(){char *buffer new char[64];name_ thread- std::to_string(number);}static void *start_routine(void *args){Thread *_this static_castThread *(args);void *ret _this-run(_this-args_);return ret;}void *run(void *arg){return func_(arg);}void start(func_t func, void *args){func_ func;args_ args;int n pthread_create(tid_, nullptr, start_routine, this);assert(n 0);(void)n;}void join(){int n pthread_join(tid_, nullptr);assert(n 0);(void)n;}std::string GetTaskName(){return name_;}~Thread() {}private:std::string name_; // 线程名pthread_t tid_; // 线程idfunc_t func_; // 线程调用的函数void *args_; // 线程调用函数的参数
};
int Thread::number 0;/* LockGuard.hpp RAII风格的互斥锁 */
#pragma once#include pthread.hclass Mutex
{
public:Mutex(pthread_mutex_t *lock_p nullptr) : _lock_p(lock_p) {} // 构造函数void lock() // 加锁{if (_lock_p)pthread_mutex_lock(_lock_p);}void unlock() // 解锁{if (_lock_p)pthread_mutex_unlock(_lock_p);}~Mutex() {} // 析构函数private:pthread_mutex_t *_lock_p;
};class LockGuard // RAII风格的锁的实现
{
public:LockGuard(pthread_mutex_t *mutex) : _mutex(mutex) // 构造函数{_mutex.lock(); // 在构造函数中加锁}~LockGuard() // 析构函数{_mutex.unlock(); // 在析构函数中解锁}private:Mutex _mutex;
};/* Task.hpp 测试用到的测试任务 */
#pragma once#include string
#include iostream
#include functionalstatic std::string oper -*/%;class CalTask
{
public:using func_t std::functionint(int, int, char);public:CalTask() {}CalTask(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callback(func){}std::string operator()(){int result _callback(_x, _y, _op);char buffer[64];snprintf(buffer, sizeof buffer, %d %c %d %d, _x, _op, _y, result);return buffer;}std::string toTaskString(){char buffer[64];snprintf(buffer, sizeof buffer, %d %c %d ?, _x, _op, _y);return buffer;}private:int _x;int _y;char _op;func_t _callback;
};
class SaveTask
{typedef std::functionvoid(const std::string) func_t;public:SaveTask() {}SaveTask(const std::string msg, func_t func): _msg(msg), _func(func){}void operator()(){_func(_msg);}private:std::string _msg;func_t _func;
};
int mymath(int a, int b, char op)
{int ans 0;switch (op){case :ans a b;break;case -:ans a - b;break;case *:ans a * b;break;case /:{if (b 0){std::cerr div zero error! std::endl;ans -1;}elseans a / b;}break;case %:{if (b 0){std::cerr mod zero error! std::endl;ans -1;}elseans a % b;}break;default:break;}return ans;
}
void Save(const std::string msg)
{FILE *fp fopen(./log.txt, a);if (fp NULL){std::cerr open file error std::endl;return;}fputs(msg.c_str(), fp);fputs(\n, fp);fclose(fp);
}测试代码
/* main.cc */
#include ThreadPool.hpp
#include Task.hpp
#include string
#include memory
#include unistd.hint main()
{// 创建线程用于等待输入执行任务std::unique_ptrThreadPoolCalTask tp(new ThreadPoolCalTask());tp-run();// 主线程用于输入任务int x, y;char op;while (true){std::cout 请输入数据1# ;std::cin x;std::cout 请输入数据2# ;std::cin y;std::cout 请输入你要进行的运算#;std::cin op;CalTask t(x, y, op, mymath);tp-push(t);sleep(1);}return 0;
}运行结果 2. 线程安全的单例模式
关于设计模式和单例模式的讲解我们在C专栏中已经有过了解感兴趣的可以自行探究这里附上链接【C】特殊类设计
我们知道单例模式的设计有饿汉模式和懒汉模式两种饿汉模式的实现会拖慢启动时间所以这里我们采用懒汉模式来实现我们刚刚创建的线程池。
在一个进程中有一个线程池即可所以这里我们的线程池要改写成单例模式的我们使用懒汉模式来改写
/* 懒汉模式的实现 */
// 头文件 ...// .../* 线程池类的实现 */
template class T
class ThreadPool
{
public:// ...
public: // 给handleTask调用的外部接口// ...
public: // 需要暴露给外部的接口// ...static ThreadPoolT *getInstance(){if(nullptr tp){std::lock_guardstd::mutex lck(_singletonlock);if(nullptr tp){tp new ThreadPoolT ();}}return tp;}
private: // 单例模式需要私有化的接口ThreadPool(const int num gnum) // 构造函数初始化互斥量和条件变量构建指定个数的Thread对象{pthread_mutex_init(_mutex, nullptr);pthread_cond_init(_cond, nullptr);for (int i 0; i num; i){_threads.push_back(new Thread());}}//delete拷贝构造和析构函数ThreadPool(const ThreadPoolT ) delete;ThreadPoolT *operator(const ThreadPoolT ) delete;private:std::vectorThread * _threads; // 保存所有线程对象的指针std::queueT _task_queue; // 需要被分配的任务队列pthread_mutex_t _mutex; // 任务队列需要被互斥的访问pthread_cond_t _cond; // 生产任务和消费任务之间需要进行同步static ThreadPoolT *tp; // 静态成员存放ThreadPool指针static std::mutex _singletonlock; // 创建线程安全的单例对象要加的锁
};
templateclass T
ThreadPoolT *ThreadPoolT::tp nullptr;
templateclass T
std::mutex ThreadPoolT::_singletonlock;3. STL、智能指针和线程安全
STL中的容器是否是线程安全的? 不是。原因是, STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响。而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶)。因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全 智能指针是否是线程安全的? 对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数 4. 其他常见锁的了解
**悲观锁**在每次取数据时总是担心数据会被其他线程修改所以会在取数据前先加锁读锁写锁行锁等当其他线程想要访问数据时被阻塞挂起。乐观锁每次取数据时候总是乐观的认为数据不会被其他线程修改因此不上锁。但是在更新数据前会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式版本号机制和CAS操作。CAS操作当需要更新数据时判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败失败则重试一般是一个自旋的过程即不断重试。自旋锁公平锁非公平锁
5. 读者写者问题
在编写多线程的时候有一种情况是十分常见的。那就是有些公共数据修改的机会比较少。相比较改写它们读的机会反而高的多。通常而言在读的过程中往往伴随着查找的操作中间耗时很长。给这种代码段加锁会极大地降低我们程序的效率。那么有没有一种方法可以专门处理这种多读少写的情况呢 有那就是读写锁 读写锁的分析读写锁一共是两个锁分别为读锁和写锁对应着读者和写者。当写者需要写数据时请求写锁然后再写数据此时读者不能读当写锁不存在时多个读者可以并发的访问同一数据提高了效率
读者写者问题和生产者消费者模型的本质区别就是消费者会取走数据而读者不会取走数据。
//初始化读写锁
pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);//销毁读写锁
pthread_rwlock_destroy(pthread_rwlock_t *rwlock);//读加锁
pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);//写加锁
pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);//解锁
pthread_rwlock_unlock(pthread_rwlock_t *rwlock);本章完…
分别为读锁和写锁对应着读者和写者。当写者需要写数据时请求写锁然后再写数据此时读者不能读当写锁不存在时多个读者可以并发的访问同一数据提高了效率
读者写者问题和生产者消费者模型的本质区别就是消费者会取走数据而读者不会取走数据。
//初始化读写锁
pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);//销毁读写锁
pthread_rwlock_destroy(pthread_rwlock_t *rwlock);//读加锁
pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);//写加锁
pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);//解锁
pthread_rwlock_unlock(pthread_rwlock_t *rwlock);本章完…