基于C++11 的线程池

基于C++11 的线程池,第1张

本章主要将基于C++11实现的线程池,暂时没有考虑线程的动态申请和注销(可以根据自己需求实现)。线程数量值建议:cpu核数*2+1。线程过多会导致线程切换浪费大量资源,线程数量过少可能导致并发性能下降。

ThreadPool.h

#pragma once
#include 
#include 
#include 
#include 
#include 
#include 
namespace Jef{

	class ITask{
	public:
		virtual ~ITask(){};
		virtual void run() = 0;
	};

	class Semaphore{
	public:
		Semaphore(unsigned long init_sem = 0);

		bool wait(int wait_milliseconds = 0x7fffffff);
		void signal();
	private:
		std::mutex _mtx;
		std::condition_variable _cond;
		unsigned long _count;
	};


	class ThreadPool
	{
	public:
		ThreadPool(int thread_num);
		virtual ~ThreadPool();
		virtual void on_idle();

		void start();
		void stop();
		void addTask(std::shared_ptr<ITask> task);
	protected:
		std::shared_ptr<ITask> pop_task();
		void add_thread();
		
		void release_thread();
		void _run();
	private:
		Semaphore _sem;
		int _thread_num;
		std::mutex _mtx;
		std::atomic_bool _is_exit;
		std::queue<std::shared_ptr<ITask> > _task;
		std::vector<std::thread> _threads;
	};
};

ThreadPool.cpp

#include "stdafx.h"
#include "ThreadPool.h"
#include 

namespace Jef{
	///
	Semaphore::Semaphore(unsigned long init_sem){
		_count = init_sem;
	}

	bool Semaphore::wait(int wait_milliseconds)
	{
		std::unique_lock<std::mutex> lk(_mtx);
		if (_cond.wait_for(lk, std::chrono::milliseconds(wait_milliseconds), [&](){return _count > 0; }))
		{
			--_count;
			return true;
		}
		return false;
	}

	void Semaphore::signal()
	{
		std::unique_lock<std::mutex> lk(_mtx);
		++_count;
		_cond.notify_one();
	}

	///
	ThreadPool::ThreadPool(int thread_num)
	{
		_thread_num = thread_num;
		_is_exit = false;
	}


	ThreadPool::~ThreadPool()
	{
		stop();
	}

	void ThreadPool::start()
	{
		for (int i = 0; i < _thread_num; ++i)
		{
			add_thread();
		}
	}

	void ThreadPool::stop()
	{
		_is_exit = true;
		for (auto &it : _threads)
		{
			if (it.joinable())
			{
				it.join();
			}
		}
	}

	void ThreadPool::addTask(std::shared_ptr<ITask> task)
	{
		{
			std::lock_guard<std::mutex> lk(_mtx);
			_task.push(task);
		}
		_sem.signal();
	}

	std::shared_ptr<ITask> ThreadPool::pop_task()
	{
		std::shared_ptr<ITask> task = nullptr;
		std::lock_guard<std::mutex> lk(_mtx);
		if (!_task.empty())
		{
			task = _task.front();
			_task.pop();
		}
		return task;
	}

	void ThreadPool::add_thread()
	{
		std::thread t = std::thread(std::bind(&Jef::ThreadPool::_run, this));
		_threads.push_back(std::move(t));
	}

	void ThreadPool::release_thread()
	{
		
	}

	void ThreadPool::_run()
	{
		while (!_is_exit)
		{
			if (_sem.wait(100))
			{
				std::shared_ptr<ITask> task = pop_task();
				if (task != nullptr)
				{
					task->run();
				}

			}
			else
			{
				on_idle();
			}
		}

		std::ostringstream oss;
		oss << std::this_thread::get_id();
		std::string stid = oss.str();
		unsigned long long tid = std::stoull(stid);
		LOG_INFO_F(_T("thread:[%I64d] exit"), tid);
	}

	void ThreadPool::on_idle()
	{
		std::ostringstream oss;
		oss << std::this_thread::get_id();
		std::string stid = oss.str();
		unsigned long long tid = std::stoull(stid);
		LOG_INFO_F(_T("thread:[%I64d] on_idle"), tid);
	}
}

基本使用:

//任务继承ITask,在run里面实现业务逻辑
//如果想在线程空闲的时候处理某些业务,可以重写ThreadPool的OnIdle函数
class TaskA :public Jef::ITask{
public:
	TaskA(){

	}
	TaskA(int a, int b)
	{
		this->a = a;
		this->b = b;
	}
	virtual void run(){
		cout << a << "----" << b << "---" << "TaskA" << endl;
	}

private:
	int a;
	int b;
};


//启动线程池
Jef::ThreadPool pool(4);
pool.start()

//添加任务
pool.addTask(shared_ptr<TaskA>(new TaskA(a, i)));

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/801000.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-07
下一篇 2022-05-07

发表评论

登录后才能评论

评论列表(0条)

保存