每日一歌,分享好心情: 卡斯特梅的雨季
c++11 实现的线程池,项目已用,Lets go
用到了很多c11的语法,大家一起学习
#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_
#include
#include
#include
#include
#include
#include
#include
#include
class ThreadPool {
public:
ThreadPool(int num_threads = 4);
ThreadPool(const ThreadPool&) = delete;
ThreadPool() = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
/*
1. 向线程池中添加一个任务,任务即函数,在函数中实现业务逻辑
2. 注意两点:
①可变参数模板 c11新引入的特性,在c11之前类模板和函数模板只能含有固定数据量的模板参数
②返回类型后置语法 也是c11引入的
*/
template <typename F, typename... Args> /* */
auto AddTask(F&& f, Args&&... args) -> std::future<decltype(f(args...))>; /**/
~ThreadPool();
private:
std::vector<std::thread> _worker_threads; /*worker线程*/
bool _stop = false; /*线程池是否被关闭*/
std::queue<std::function<void()>> _tasks_queue; /*任务队列, 队列中每个任务是一个 void () 类型的函数*/
std::mutex _task_queue_mutex; /*任务队列互斥量*/
std::condition_variable _condition; /*条件变量, 有任务时唤醒工作线程,反之休眠*/
};
ThreadPool::ThreadPool(int num_threads)
: _stop(false) {
if (num_threads < 1) {
throw std::runtime_error("ThreadPool: num_threads less than one");
}
/*循环向worker vector中插入, 每个worker都是一个std::thread, 给std::thread的构造函数传入一个lamda表达式*/
for (int i=0; i<num_threads; ++i) {
_worker_threads.emplace_back(
std::thread(
[this]{
while(true) {
std::function<void()> task; /*待执行的任务*/
{
std::unique_lock<std::mutex> lock(this->_task_queue_mutex); /*任务队列加锁保护*/
/*
wait(lock,pred)
第一次执行wait时,在上一句已经获得锁,然后判断pred,若pred为true, 不阻塞继续执行, 若pred为false,释放锁,阻塞当前线程,等待被notify
注意,阻塞线程时已经释放锁了
当线程被唤醒时,重新判断pred的值,若为false,释放锁,继续阻塞当前线程
*/
this->_condition.wait(lock, [this]{return this->_stop || !this->_tasks_queue.empty(); });
if (this->_stop && this->_tasks_queue.empty()) return;
task = std::move(this->_tasks_queue.front());
this->_tasks_queue.pop();
}
/*执行一个任务*/
task();
}
}
)
);
}
}
ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(_task_queue_mutex);
_stop = true;
}
/*唤醒所有工作线程,并等待工作线程执行完毕*/
_condition.notify_all();
for (std::thread &worker : _worker_threads) {
worker.join();
}
}
/*向任务队列中添加一条任务*/
template <typename F, typename... Args>
auto ThreadPool::AddTask(F&&f, Args&&...args) -> std::future<decltype(f(args...))> {
using return_type = decltype(f(args...));
/*使用std::packaged_task将函数与参数包装成异步可调用对象*/
auto task_ptr = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
/*将任务函数包装成一个 void() 类型的工作函数*/
std::function<void()> warpper_func = [task_ptr]() { (*task_ptr)(); };
{
std::unique_lock<std::mutex> lock(_task_queue_mutex);
if (_stop) {
throw std::runtime_error("ThreadPool is Stopped!");
}
_tasks_queue.emplace(warpper_func);
}
/*任务已经加入任务队列, 唤醒一个工作线程*/
_condition.notify_one();
/*返回该任务关联的future对象,方便调用者获取任务的返回值*/
return task_ptr->get_future();
}
#endif
二、测试代码
#include "threadpool.hpp"
#include
#include
#include
using namespace std;
int main(int argc, char* argv[]) {
/*
可以调整pool_size和task_num的数值,观察线程池是否起作用
ps 由于多个线程向std::cout 中插入数据,我们的例子打印内容并不美观,业务上用到时对于临界区肯定有保护措施滴
*/
int pool_size = 5;
int task_num = 6;
ThreadPool threadpool(pool_size);
vector<future<int>> resVec;
for (int i=0; i<task_num; ++i) {
resVec.emplace_back(
threadpool.AddTask(
[i] {
cout << "hello " << endl;
this_thread::sleep_for(chrono::seconds(1));
return i;
}
)
);
}
/*打印每个任务的返回值*/
for (auto&& result: resVec) {
cout << result.get() << " ";
}
cout << endl;
return 0;
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)