博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Linux组件封装(七)——线程池的简单封装
阅读量:6307 次
发布时间:2019-06-22

本文共 10448 字,大约阅读时间需要 34 分钟。

线程池的封装,基础思想与生产者消费者的封装一样,只不过我们是将线程池封装为自动获取任务、执行任务,让用户调用相应的接口来添加任务。

在线程池的封装中,我们同样需要用到的是MutexLock、Condition、Thread这些基本的封装。

基础封装如下:

MutexLock:

1 #ifndef MUTEXLOCK_H 2 #define MUTEXLOCK_H 3  4 #include "NonCopyable.h" 5 #include 
6 #include
7 #include
8 #define TINY_CHECK(exp)\ 9 if(!exp)\10 { \11 fprintf(stderr, "File : %s, Line : %d Exp : ["#exp"] is true, abort.\n", __FILE__, __LINE__); abort();\12 }13 14 15 16 class MutexLock : NonCopyable17 {18 friend class Condition;19 public:20 MutexLock();21 ~MutexLock();22 void lock();23 void unlock();24 25 bool isLocked() const { return _isLock; }26 pthread_mutex_t *getMutexPtr() { return &_mutex; }27 28 private:29 void restoreMutexStatus()30 { _isLock = true; }31 32 pthread_mutex_t _mutex;33 bool _isLock;34 };35 36 37 class MutexLockGuard : NonCopyable //将锁封装到MutexLockGuard中,38 { //这样只需定义一个对象,便可39 public: //便可自动上锁,对象销毁时自动解锁40 MutexLockGuard(MutexLock &mutex)41 :_mutex(mutex)42 { _mutex.lock(); }43 44 ~MutexLockGuard()45 { _mutex.unlock(); }46 47 private:48 MutexLock &_mutex;49 };50 #define MutexLockGuard(m) "ERROR"51 52 #endif
View Code
1 #include "MutexLock.h" 2 #include 
3 4 MutexLock::MutexLock() 5 :_isLock(false) 6 { 7 TINY_CHECK(!pthread_mutex_init(&_mutex, NULL)); 8 } 9 10 MutexLock::~MutexLock()11 {12 assert(!isLocked());13 TINY_CHECK(!pthread_mutex_destroy(&_mutex));14 }15 16 void MutexLock::lock()17 {18 TINY_CHECK(!pthread_mutex_lock(&_mutex));19 _isLock = true;20 }21 22 void MutexLock::unlock()23 {24 _isLock = false;25 TINY_CHECK(!pthread_mutex_unlock(&_mutex));26 }
View Code

Condition:

1 #ifndef CONDITION_H 2 #define CONDITION_H 3  4 #include 
5 #include "NonCopyable.h" 6 7 class MutexLock; 8 9 10 class Condition : NonCopyable11 {12 public:13 Condition(MutexLock &mutex);14 ~Condition();15 16 void wait();17 void notify();18 void notifyAll();19 private:20 pthread_cond_t _cond;21 MutexLock &_mutex;22 };23 24 #endif
View Code
1 #include "Condition.h" 2 #include "MutexLock.h" 3 #include 
4 5 Condition::Condition(MutexLock &mutex) 6 :_mutex(mutex) 7 { 8 TINY_CHECK(!pthread_cond_init(&_cond, NULL)); 9 }10 11 Condition::~Condition()12 {13 TINY_CHECK(!pthread_cond_destroy(&_cond));14 }15 16 void Condition::wait()17 {18 assert(_mutex.isLocked());19 TINY_CHECK(!pthread_cond_wait(&_cond, _mutex.getMutexPtr()));20 _mutex.restoreMutexStatus();21 }22 23 void Condition::notify()24 {25 TINY_CHECK(!pthread_cond_signal(&_cond));26 }27 28 void Condition::notifyAll()29 {30 TINY_CHECK(!pthread_cond_broadcast(&_cond));31 }
View Code

Thread:

1 #ifndef THREAD_H 2 #define THREAD_H 3 #include 
4 #include
5 #include
6 class Thread : boost::noncopyable 7 { 8 public: 9 10 typedef std::function
ThreadCallback;11 12 Thread(ThreadCallback cb);13 ~Thread();14 15 void start();16 void join();17 18 19 20 21 private:22 23 static void *runInThread(void *);24 pthread_t _threadId;25 bool _isRun;26 ThreadCallback _callback;27 };28 29 30 #endif /*THREAD_H*/
View Code
1 #include "Thread.h" 2 #include 
3 4 Thread::Thread(ThreadCallback cb) 5 :_threadId(0), 6 _isRun(false), 7 _callback(cb) 8 { 9 10 }11 12 Thread::~Thread()13 {14 if(_isRun)15 pthread_detach(_threadId);16 }17 18 19 void Thread::start()20 {21 pthread_create(&_threadId, NULL, runInThread, this);22 _isRun = true;23 }24 25 void Thread::join()26 {27 assert(_isRun);28 pthread_join(_threadId, NULL);29 _isRun = false;30 }31 32 void *Thread::runInThread(void *arg)33 {34 Thread *p = static_cast
(arg);35 p->_callback();36 return NULL;37 }
View Code

NonCopyable:

1 #ifndef NONCOPYABLE_H 2 #define NONCOPYABLE_H 3  4 class NonCopyable             //禁用值语意 5 { 6     public: 7         NonCopyable() { } 8         ~NonCopyable() { } 9     private:10         NonCopyable(const NonCopyable &);11         void operator= (const NonCopyable &);12 };13 14 #endif  /*NON_COPYABLE_H*/
View Code

 

在线程池的封装中,我们需要的数据结构有一个互斥锁,两个条件变量,任务队列以及线程池的队列。

然后,我们需要提供给用户添加任务的接口addTask,在线程池中,我们需要相应的获取任务函数getTask,执行任务的函数runInThread。

头文件代码如下:

1 #ifndef THREAD_POOL_H 2 #define THREAD_POOL_H 3  4 #include 
5 #include "MutexLock.h" 6 #include "Condition.h" 7 #include
8 #include
9 #include
10 11 class Thread;12 13 class ThreadPool : boost::noncopyable14 {15 public:16 typedef std::function
Task;17 18 ThreadPool(size_t queueSize, size_t poolSize);19 ~ThreadPool();20 21 void start();22 void stop();23 24 void addTask(Task task);25 bool isRunning() const26 { return _isStart; }27 28 private:29 Task getTask();30 void runInThread();31 32 mutable MutexLock _mutex;33 Condition _full;34 Condition _empty;35 size_t _queueSize;36 std::queue
_queue;37 const size_t _poolSize;38 std::vector
> _threads;39 bool _isStart;40 };41 42 43 #endif /*THREAD_POOL_H*/
View Code

在构造函数中,我们用一把锁去初始化两个条件变量,用相应的长度来初始化任务队列的长度与线程池中线程的个数:

ThreadPool::ThreadPool(size_t queueSize, size_t poolSize)    :_full(_mutex),     _empty(_mutex),     _queueSize(queueSize),     _poolSize(poolSize),     _isStart(false){}

addTask函数中,我们首先要判断线程池是否开启,然后加锁,判断任务队列是否已满,进行等待。等待后将相应的任务加入到任务队列,通知getTask来获取任务:

void ThreadPool::addTask(Task task){    if(!_isStart)        return;    MutexLockGuard lock(_mutex);    while(_queue.size() >= _queueSize)        _empty.wait();    _queue.push(std::move(task));    _full.notify();}

getTask函数中,我们首先判断线程池是否开启,然后对应加锁,判断任务队列是否为空,进行等待。等待后,取出任务队列中的第一个任务,通知addTask可以继续添加任务:

ThreadPool::Task ThreadPool::getTask(){    if(!_isStart)        return Task();        MutexLockGuard lock(_mutex);    while(_queue.empty() && _isStart)        _full.wait();    assert(!_queue.empty());    Task task = _queue.front();    _queue.pop();    _empty.notify();    return task;}

runInThread函数相对简单,只是获取相应任务,执行该任务即可:

void ThreadPool::runInThread(){    while(_isStart)    {        Task task(getTask());        if(task)            task();    }}

start函数,相对来说就比较复杂,我们首先需要new出线程,将线程添加到线程队列,然后将线程队列中的线程开启。

void ThreadPool::start(){    if(_isStart)        return ;    _isStart = true;    for(size_t i = 0; i != _poolSize; ++ i)        _threads.push_back(std::unique_ptr
(new Thread(std::bind(&ThreadPool::runInThread, this)))); for(size_t i = 0; i != _poolSize; ++ i) _threads[i]->start();}

这里需要注意:由于线程不可复制和赋值,我们将Thread相应的unique_ptr添加到线程队列,才可以达到相应的效果。而每个线程的回调函数,我们运用C++11的特性——function和bind来实现,我们将Thread的回调函数设置为一个function模板,通过bind,将ThreaPool::runInThread函数中的一个隐式参数转化为无参数,即将this指针绑定给ThreaPool::runInThread,这样,ThreaPool::runInThread就不再需要this指针了。

stop函数,我们需要考虑的因素较多,由于线程池已经开启,但不是每个线程都在运行,有些还在沉睡中,所以我们需要通过notifyAll来通知所有的线程获取任务,这样线程池中的线程已经唤醒,然后我们对每个线程进行join,而任务队列中未执行的任务,我们需要将其清空,通过pop函数来弹出任务队列中的任务:

void ThreadPool::stop(){    if(!_isStart)        return ;    {        MutexLockGuard lock(_mutex);        _isStart = false;        _full.notifyAll();    }    for(size_t i = 0; i != _poolSize; ++ i)        _threads[i]->join();    while(!_queue.empty())        _queue.pop();    _threads.clear();}

 

 

具体实现代码如下:

1 #include "Thread.h" 2 #include "ThreadPool.h" 3 #include 
4 using namespace std; 5 6 ThreadPool::ThreadPool(size_t queueSize, size_t poolSize) 7 :_full(_mutex), 8 _empty(_mutex), 9 _queueSize(queueSize),10 _poolSize(poolSize),11 _isStart(false)12 {13 14 }15 ThreadPool::~ThreadPool()16 {17 if(_isStart)18 stop();19 }20 21 void ThreadPool::addTask(Task task)22 {23 if(!_isStart)24 return;25 MutexLockGuard lock(_mutex);26 while(_queue.size() >= _queueSize)27 _empty.wait();28 29 _queue.push(std::move(task));30 _full.notify();31 }32 33 ThreadPool::Task ThreadPool::getTask()34 {35 MutexLockGuard lock(_mutex);36 while(_queue.empty() && _isStart)37 _full.wait();38 39 if(!_isStart)40 return Task();41 42 assert(!_queue.empty());43 Task task = _queue.front();44 _queue.pop();45 _empty.notify();46 47 return task;48 }49 50 void ThreadPool::runInThread()51 {52 while(_isStart)53 {54 Task task(getTask());55 if(task)56 task();57 }58 }59 void ThreadPool::start()60 {61 if(_isStart)62 return ;63 _isStart = true;64 65 for(size_t i = 0; i != _poolSize; ++ i)66 _threads.push_back(std::unique_ptr
(new Thread(std::bind(&ThreadPool::runInThread, this))));67 68 for(size_t i = 0; i != _poolSize; ++ i)69 _threads[i]->start();70 }71 72 void ThreadPool::stop()73 {74 if(!_isStart)75 return ;76 {77 MutexLockGuard lock(_mutex);78 _isStart = false;79 _full.notifyAll();80 }81 82 for(size_t i = 0; i != _poolSize; ++ i)83 _threads[i]->join();84 85 while(!_queue.empty())86 _queue.pop();87 88 _threads.clear();89 }
View Code

 

 

在测试中,我们可以通过前面的定时器来测试,生成一个定时器线程,定时器到期时,对线程池执行stop。

测试代码如下:

#include "ThreadPool.h"#include "nano_sleep.hpp"#include 
#include
#include "TimerThread.h"using namespace std;void foo(){ cout << rand() % 100 << endl;}void stopPool(ThreadPool *a){ a->stop();}void nano_sleep(double val);int main(int argc, const char *argv[]){ ThreadPool b(120, 4); TimerThread a(4, 0, std::bind(&stopPool, &b)); b.start(); a.start(); while(b.isRunning()) { b.addTask(&foo); nano_sleep(0.5); } a.stop(); return 0;}void nano_sleep(double val){ struct timespec tv; tv.tv_sec = val; //取整 tv.tv_nsec = (val - tv.tv_sec) * 1000 * 1000 * 1000; int ret; do { ret = nanosleep(&tv, &tv); }while(ret == -1 && errno == EINTR);}

在测试代码中,nano_sleep是一个相对精确地睡眠函数,我们可以将睡眠的精度限制到double。

当定时器到时后,会显示timeout,程序会自动退出。

注意:由于运用了C++11的一些特性,如function、bind和右值引用等,编译时需加上-std=c++0x

转载于:https://www.cnblogs.com/gjn135120/p/4024149.html

你可能感兴趣的文章
4星|《先发影响力》:影响与反影响相关的有趣的心理学研究综述
查看>>
IE8调用window.open导出EXCEL文件题目
查看>>
python之 列表常用方法
查看>>
vue-cli脚手架的搭建
查看>>
在网页中加入百度搜索框实例代码
查看>>
在Flex中动态设置icon属性
查看>>
采集音频和摄像头视频并实时H264编码及AAC编码
查看>>
3星|《三联生活周刊》2017年39期:英国皇家助产士学会于2017年5月悄悄修改了政策,不再鼓励孕妇自然分娩了...
查看>>
linux查看命令是由哪个软件包提供的
查看>>
高级Linux工程师常用软件清单
查看>>
堆排序算法
查看>>
folders.cgi占用系统大量资源
查看>>
路由器ospf动态路由配置
查看>>
zabbix监控安装与配置
查看>>
python 异常
查看>>
last_insert_id()获取mysql最后一条记录ID
查看>>
可执行程序找不到lib库地址的处理方法
查看>>
bash数组
查看>>
Richard M. Stallman 给《自由开源软件本地化》写的前言
查看>>
oracle数据库密码过期报错
查看>>