c++11 线程池 实现,已部署~~

c++11 线程池 实现,已部署~~,第1张

每日一歌,分享好心情: 卡斯特梅的雨季

c++11 实现的线程池,项目已用,Lets go
用到了很多c11的语法,大家一起学习

一、实现
#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

class ThreadPool {
public:
    ThreadPool(int num_threads = 4);
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool() = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
    ThreadPool& operator=(ThreadPool&&) = delete;

    /*
      1. 向线程池中添加一个任务,任务即函数,在函数中实现业务逻辑
      2. 注意两点:
         ①可变参数模板 c11新引入的特性,在c11之前类模板和函数模板只能含有固定数据量的模板参数
         ②返回类型后置语法 也是c11引入的
    */
    template <typename F, typename... Args>  /*   */
    auto AddTask(F&& f, Args&&... args) -> std::future<decltype(f(args...))>;  /**/

    ~ThreadPool();

private:
    std::vector<std::thread> _worker_threads;           /*worker线程*/
    bool _stop = false;                                 /*线程池是否被关闭*/
    std::queue<std::function<void()>> _tasks_queue;     /*任务队列, 队列中每个任务是一个 void () 类型的函数*/
    std::mutex _task_queue_mutex;                       /*任务队列互斥量*/
    std::condition_variable _condition;                 /*条件变量, 有任务时唤醒工作线程,反之休眠*/
};

ThreadPool::ThreadPool(int num_threads) 
    : _stop(false) {
    if (num_threads < 1) {
        throw std::runtime_error("ThreadPool: num_threads less than one");
    }

    /*循环向worker vector中插入, 每个worker都是一个std::thread, 给std::thread的构造函数传入一个lamda表达式*/
    for (int i=0; i<num_threads; ++i) {
        _worker_threads.emplace_back(
            std::thread(
                [this]{
                    while(true) {
                        std::function<void()> task; /*待执行的任务*/
                        {
                            std::unique_lock<std::mutex> lock(this->_task_queue_mutex);     /*任务队列加锁保护*/
                            /*
                              wait(lock,pred)  
                              第一次执行wait时,在上一句已经获得锁,然后判断pred,若pred为true, 不阻塞继续执行, 若pred为false,释放锁,阻塞当前线程,等待被notify
                              注意,阻塞线程时已经释放锁了
                              当线程被唤醒时,重新判断pred的值,若为false,释放锁,继续阻塞当前线程
                              */
                            this->_condition.wait(lock, [this]{return this->_stop || !this->_tasks_queue.empty(); });
                            if (this->_stop && this->_tasks_queue.empty()) return;
                            task = std::move(this->_tasks_queue.front());
                            this->_tasks_queue.pop();
                        }
                        /*执行一个任务*/
                        task();
                    }
                }
            )
        );
    }
}

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(_task_queue_mutex);
        _stop = true;
    }
    /*唤醒所有工作线程,并等待工作线程执行完毕*/
    _condition.notify_all();
    for (std::thread &worker : _worker_threads) {
        worker.join();
    }
}

/*向任务队列中添加一条任务*/
template <typename F, typename... Args>
auto ThreadPool::AddTask(F&&f, Args&&...args) -> std::future<decltype(f(args...))> {
    using return_type = decltype(f(args...));
    /*使用std::packaged_task将函数与参数包装成异步可调用对象*/
    auto task_ptr = std::make_shared<std::packaged_task<return_type()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    
    /*将任务函数包装成一个 void() 类型的工作函数*/
    std::function<void()> warpper_func = [task_ptr]() { (*task_ptr)(); };
    {
        std::unique_lock<std::mutex> lock(_task_queue_mutex);
        if (_stop) {
            throw std::runtime_error("ThreadPool is Stopped!");
        }
        _tasks_queue.emplace(warpper_func);
    }
    /*任务已经加入任务队列, 唤醒一个工作线程*/
    _condition.notify_one();

    /*返回该任务关联的future对象,方便调用者获取任务的返回值*/
    return task_ptr->get_future();
}


#endif
二、测试代码
#include "threadpool.hpp"
#include 
#include 
#include 
using namespace std;


int main(int argc, char* argv[]) {
    /*
        可以调整pool_size和task_num的数值,观察线程池是否起作用
        ps 由于多个线程向std::cout 中插入数据,我们的例子打印内容并不美观,业务上用到时对于临界区肯定有保护措施滴
    */
    int pool_size = 5;
    int task_num = 6;

    ThreadPool threadpool(pool_size);
    vector<future<int>> resVec;
    
    for (int i=0; i<task_num; ++i) {
        resVec.emplace_back(
            threadpool.AddTask(
                [i] {
                    cout << "hello " << endl;
                    this_thread::sleep_for(chrono::seconds(1));
                    return i;
                }
            )
        );
    }

    /*打印每个任务的返回值*/
    for (auto&& result: resVec) {
        cout << result.get() << " ";
    }
    cout << endl;

    return  0;
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/798605.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存