尽管C++11加入线程库,但C++对多线程的支持还是比较初级,稍微高级的用法还是需要自己实现。
线程池是提前创建并维护多个线程,等待管理者分配任务的机制,避免短时间线程创建和销毁的代价,一般是IO密集型的场景使用。
主要包括线程管理器、任务线程、消息队列
-
线程管理器:主要功能是创建和启动线程、调配任务、管理线程等。
主要有三个方法:
- start:创建一定数量线程池
- stop:终止所有线程并回收资源
- addTask:添加任务到消息队列
-
任务线程:等待分配任务的线程,一般使用条件变量实现等待和通知机制
-
任务队列:存放任务的缓冲机制,队列有调度功能,使用优先队列实现,但需要锁限制并发
按任务队列和线程池大小可分成四种情况:
- 没有任务,线程池中任务队列为空,啥也不做
- 添加小于等于线程池数量的任务,主线程添加任务后通知唤醒线程池中的线程开始取任务。
此时任务缓冲队列还是空
- 添加大于线程池数量的任务,继续添加发现线程池用完,于是存入缓冲队列,工作线程空闲后主动从任务队列取任务执行
- 添加大于线程池数量的任务,且任务队列已满,当线程中线程用完,且任务缓冲队列已满,进入等待状态,等待任务缓冲队列通知
使用C++11中的bind/function定义和调用任务处理函数
任务处理函数的声明、优先级、带优先级的任务:
typedef std::function<void()> Task_type;//任务类型
enum taskPriorityE {LOW,MIDDLE,HIGH};//优先级
typedef std::pair<taskPriorityE,Task_type> TaskPair;//任务优先级和任务类型组合的任务
禁用拷贝构造和赋值运算符:
ThreadPool(const ThreadPool&);
const ThreadPool& operator=(const ThreadPool&);
声明线程池大小、任务队列、互斥锁和条件变量、线程池是否开始
int m_threads_size;//线程池大小
std::vector<std::thread*> m_threads;//线程池
std::priority_queue<TaskPair,std::vector<TaskPair>,TaskPriorityCmp> m_tasks;//任务队列
std::mutex m_mutex;//STL队列不是线程安全的,因此需要结合互斥锁
std::condition_variable m_cond;//条件变量
bool m_started;//是否开始
另外值得注意的是为了安全性和便捷性,只暴露stop和addTask两个接口给用户,其他start和threadLoop、take等接口都被声明为私有函数,
实现构造函数通过传入的参数,初始化线程池大小、锁、条件变量、是否开始,之后开始运行
ThreadPool::ThreadPool(int threads_size)
:m_threads_size(threads_size),m_mutex(),m_cond(),m_started(false)
{
start();
}
start中创建线程,并将线程和任务处理函数进行绑定
void ThreadPool::start()
{
assert(m_threads.empty());
assert(!m_started);
m_started = true;
m_threads.reserve(m_threads_size);
for(int i=0;i<m_threads_size;++i)
{
m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop,this)));
}
}
stop中通知所有线程,并将所有线程分离,最后将线程池清空
void ThreadPool::start()
{
assert(m_threads.empty());
assert(!m_started);
m_started = true;
m_threads.reserve(m_threads_size);
for(int i=0;i<m_threads_size;++i)
{
m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop,this)));
}
}
threadLoop中循环从队列中拿任务并执行
void ThreadPool::threadLoop()
{
while(m_started)
{
Task_type task = take();
if(task)
task();
}
}
addTask是添加任务到任务队列,并通知线程。
为方便使用重载addTask函数
void ThreadPool::addTask(const Task_type& task)
{
std::unique_lock<std::mutex> lock(m_mutex);
TaskPair taskPair(MIDDLE,task);
m_tasks.emplace(taskPair);
m_cond.notify_one();
}
void ThreadPool::addTask(const TaskPair& taskPair)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_tasks.emplace(taskPair);
m_cond.notify_one();
}
take从队列中拿任务,若队列为空且已开始,则等待,对应上面的情况1,若不空则从队列拿任务并返回
ThreadPool::Task_type ThreadPool::take()
{
std::unique_lock<std::mutex> lock(m_mutex);
while(m_tasks.empty()&&m_started)
{
m_cond.wait(lock);
}
Task_type task;
int size = m_tasks.size();
if(!m_tasks.empty()&&m_started)
{
task = m_tasks.top().second;
m_tasks.pop();
assert(size -1 == m_tasks.size());
}
return task;
}
最后提醒下,由于STL的队列不是线程安全,因此对队列的添加addTask、删除take都需要锁,当然后续会改进成使用boost库中线程安全的队列,这样就能大大提高并发性。
使用C++11中的thread、mutex、condition_variable、priority_queue、vector实现简单的线程池。
后续考虑使用无锁队列优化。
完整代码见这里
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)