#include
#include
#include
#include
#include
#include
using namespace std;
struct Task {
void* ctx;
void (*handle)(void* ctx);
};
void thread_cycle_task(void* ctx);
class ThreadPool {
public:
queue _task_queue;
int _max_tasks;
mutex _mt;
condition_variable _cond;
int _thread_nums;
thread* _thread;
public:
ThreadPool(int thread_nums, int max_tasks) {
_max_tasks = max_tasks;
_thread_nums = thread_nums;
_thread = new thread[_thread_nums];
// 初始化线程
for (int i = 0; i < _thread_nums; ++i) {
_thread[i] = thread(thread_cycle_task, (void*)this);
}
}
~ThreadPool() {
// 销毁化线程
cout << "~ThreadPool()" << endl;
for (int i = 0; i < _thread_nums; ++i) {
_thread[i].join();
}
delete[] _thread;
}
void push_task_queue(Task& task) {
_mt.lock();
if (_task_queue.size() == _max_tasks) {
_mt.unlock();
return;
}
_cond.notify_one(); // 出发一个 以免导致惊群
_task_queue.push(task);
_mt.unlock();
}
};
void thread_cycle_task(void* ctx) {
ThreadPool* tp = (ThreadPool*)ctx;
Task task;
for (;;) {
{
std::unique_lock mutex(tp->_mt);
while (tp->_task_queue.size() == 0) {
tp->_cond.wait(mutex);
}
task = tp->_task_queue.front();
tp->_task_queue.pop();
}
task.handle(task.ctx);
}
}
mutex out;
class DealProsser {
public:
static void fun(void* ctx) {
std::unique_lock mutex(out);
cout << (int)ctx << endl;
}
};
int main() {
ThreadPool tp(5, 50);
for (int i = 0; i < 60; ++i) {
Task task;
task.handle = DealProsser::fun;
task.ctx = (void*)i;
tp.push_task_queue(task);
}
return 0;
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)