C++实现简单线程池

C++实现简单线程池,第1张

线程池

尽管C++11加入线程库,但C++对多线程的支持还是比较初级,稍微高级的用法还是需要自己实现。


线程池是提前创建并维护多个线程,等待管理者分配任务的机制,避免短时间线程创建和销毁的代价,一般是IO密集型的场景使用。


主要包括线程管理器、任务线程、消息队列

  • 线程管理器:主要功能是创建和启动线程、调配任务、管理线程等。


    主要有三个方法:

    • start:创建一定数量线程池
    • stop:终止所有线程并回收资源
    • addTask:添加任务到消息队列
  • 任务线程:等待分配任务的线程,一般使用条件变量实现等待和通知机制

  • 任务队列:存放任务的缓冲机制,队列有调度功能,使用优先队列实现,但需要锁限制并发

线程池工作的几种情况

按任务队列和线程池大小可分成四种情况:

  1. 没有任务,线程池中任务队列为空,啥也不做
  2. 添加小于等于线程池数量的任务,主线程添加任务后通知唤醒线程池中的线程开始取任务。


    此时任务缓冲队列还是空

  3. 添加大于线程池数量的任务,继续添加发现线程池用完,于是存入缓冲队列,工作线程空闲后主动从任务队列取任务执行
  4. 添加大于线程池数量的任务,且任务队列已满,当线程中线程用完,且任务缓冲队列已满,进入等待状态,等待任务缓冲队列通知
线程池的实现 声明

使用C++11中的bind/function定义和调用任务处理函数

任务处理函数的声明、优先级、带优先级的任务:

typedef std::function<void()> Task_type;//任务类型
enum taskPriorityE {LOW,MIDDLE,HIGH};//优先级
typedef std::pair<taskPriorityE,Task_type> TaskPair;//任务优先级和任务类型组合的任务

禁用拷贝构造和赋值运算符:

ThreadPool(const ThreadPool&);
const ThreadPool& operator=(const ThreadPool&);

声明线程池大小、任务队列、互斥锁和条件变量、线程池是否开始

int m_threads_size;//线程池大小
std::vector<std::thread*> m_threads;//线程池
std::priority_queue<TaskPair,std::vector<TaskPair>,TaskPriorityCmp>  m_tasks;//任务队列
std::mutex m_mutex;//STL队列不是线程安全的,因此需要结合互斥锁
std::condition_variable m_cond;//条件变量
bool m_started;//是否开始

另外值得注意的是为了安全性和便捷性,只暴露stop和addTask两个接口给用户,其他start和threadLoop、take等接口都被声明为私有函数,

实现

构造函数通过传入的参数,初始化线程池大小、锁、条件变量、是否开始,之后开始运行

ThreadPool::ThreadPool(int threads_size)
    :m_threads_size(threads_size),m_mutex(),m_cond(),m_started(false)
{
    start();
}

start中创建线程,并将线程和任务处理函数进行绑定

void ThreadPool::start()
{
    assert(m_threads.empty());
    assert(!m_started);
    m_started = true;
    m_threads.reserve(m_threads_size);
    for(int i=0;i<m_threads_size;++i)
    {
        m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop,this)));
    }
}

stop中通知所有线程,并将所有线程分离,最后将线程池清空

void ThreadPool::start()
{
    assert(m_threads.empty());
    assert(!m_started);
    m_started = true;
    m_threads.reserve(m_threads_size);
    for(int i=0;i<m_threads_size;++i)
    {
        m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop,this)));
    }
}

threadLoop中循环从队列中拿任务并执行

void ThreadPool::threadLoop()
{
    while(m_started)
    {
        Task_type task = take();
        if(task)
            task();
    }
}

addTask是添加任务到任务队列,并通知线程。


为方便使用重载addTask函数

void ThreadPool::addTask(const Task_type& task)
{
    std::unique_lock<std::mutex> lock(m_mutex);
    TaskPair taskPair(MIDDLE,task);
    m_tasks.emplace(taskPair);
    m_cond.notify_one();
}

void ThreadPool::addTask(const TaskPair& taskPair)
{

    std::unique_lock<std::mutex> lock(m_mutex);
    m_tasks.emplace(taskPair);
    m_cond.notify_one();
}

take从队列中拿任务,若队列为空且已开始,则等待,对应上面的情况1,若不空则从队列拿任务并返回

ThreadPool::Task_type ThreadPool::take()
{
    std::unique_lock<std::mutex> lock(m_mutex);
    while(m_tasks.empty()&&m_started)
    {
        m_cond.wait(lock);
    }

    Task_type task;

    int size = m_tasks.size();
    if(!m_tasks.empty()&&m_started)
    {
        task = m_tasks.top().second;
        m_tasks.pop();
        assert(size -1 == m_tasks.size());
    }
    return task;
}

最后提醒下,由于STL的队列不是线程安全,因此对队列的添加addTask、删除take都需要锁,当然后续会改进成使用boost库中线程安全的队列,这样就能大大提高并发性。


总结

使用C++11中的thread、mutex、condition_variable、priority_queue、vector实现简单的线程池。


后续考虑使用无锁队列优化。


完整代码见这里

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/607310.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-14
下一篇 2022-04-14

发表评论

登录后才能评论

评论列表(0条)

保存