C++基础组件——线程池实现

C++基础组件——线程池实现,第1张

C++系列文章目录

1、C++设计模式——单例模式
2、C++基础组件——线程池实现

文章目录
  • C++系列文章目录
  • 前言
  • 一、线程池定义
    • 线程池解决什么问题
  • 二、C语言实现
  • 三、C++实现
    • 1.v1版本
      • 问题
    • 2.v2版本
  • 总结


前言

实现线程池,给出C语言版以及两个C++版的实现方法,解决了一些常见问题,总结了遇到的问题及解决方法。


一、线程池定义

池化技术,起到了建立缓冲区的作用:内存池、数据库连接池、消息池(消息队列)、对象池

线程池解决什么问题

1、解决任务处理

2、阻塞IO

3、解决线程创建与销毁的成本问题

4、管理线程

在日志系统中,直接写磁盘的话,性能会被限制在磁盘的读写能力上面,引入线程池,达到异步解耦的作用

二、C语言实现

三大组件:

1、执行队列,线程

2、任务队列,任务

3、管理组件

#include 
#include 
#include 
#include 
#include 
#include 
#include 

//双链表表示 
typedef struct Worker{
	pthread_t thread;
	struct Manager *pool;//指向管理者
	int terminate;//工作线程终止符
	struct Worker *prev;
	struct Worker *next;
}WORK;

typedef struct Job{
	void *(*process) (void *arg);//执行函数
    void *arg;/*回调函数的参数*/
    struct Job *prev;
    struct Job *next;
}JOB;

typedef struct Manager{
	WORK *workers;//指向第一个工作线程
	JOB *jobs;//指向第一个任务
	
	pthread_cond_t jobs_cond;//接收工作信号
    pthread_mutex_t jobs_mutex;//标记锁定任务
}ThreadPool;

int pool_add_job(ThreadPool *pool,JOB *job);//添加任务
void *nThreadCallback(void *arg);

//线程池初始化
int ThreadPool_init(ThreadPool *pool,int max_thread_num){
	if(max_thread_num < 1)	max_thread_num = 1;
	if(pool == NULL)	return -1;
	memset(pool,0,sizeof(ThreadPool));
	
    //初始化信号量
    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
    memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));
    //初始化信号量
    pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
    memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t));

    for(int i=0;i<max_thread_num;i++){
    	WORK *worker = (WORK*)malloc(sizeof(WORK));
    	if(worker == NULL){
    		printf("malloc");
    		return -2;
		}
		memset(worker,0,sizeof(WORK));
		worker->pool = pool;
		worker->next=NULL;  
		worker->prev=NULL;
		
		int ret = pthread_create(&worker->thread,NULL,nThreadCallback,worker);//创建工作线程,监听nThreadCallback函数,传入worker参数
		if(ret){
			printf("pthread_create");
			free(worker);
			return -3;
		}
		//加入线程池
		if(pool->workers == NULL){//第一个结点:pool->workers指向第一个结点
			pool->workers=worker; 
			worker->next=NULL;
			worker->prev=NULL;
		}
		else{//插入其余结点
			worker->next=pool->workers;
			pool->workers->prev=worker;
			pool->workers=worker; 
		}
	}
	return 0;
}
//加入任务 
int pool_add_job(ThreadPool *pool,JOB *job){
	pthread_mutex_lock(&pool->jobs_mutex);//加锁
	//加入任务链表
	if(pool->jobs == NULL){
		pool->jobs=job;
		job->next=NULL;
		job->prev=NULL;
	}
	else{
		job->next=pool->jobs;
		pool->jobs->prev=job;
		pool->jobs=job;
	}
	pthread_cond_signal(&pool->jobs_cond);//传递信号:告诉工作线程开始执行
	pthread_mutex_unlock(&pool->jobs_mutex);//解锁
	return 0;
}

void *nThreadCallback(void *arg){//回调函数
	printf("starting thread 0x%ld\n",pthread_self());
	WORK *worker = (WORK*)arg;
	while(1){
		pthread_mutex_lock(&(worker->pool->jobs_mutex));//上锁
		while(worker->pool->jobs == NULL){//任务队列为空则等待 
			if(worker->terminate)	break;
			pthread_cond_wait(&worker->pool->jobs_cond,&worker->pool->jobs_mutex);//若无信号传来则一直阻塞 
		}
		if(worker->terminate){//销毁工作线程 
			pthread_mutex_unlock(&(worker->pool->jobs_mutex));
			printf ("thread 0x%ld will exit\n", pthread_self ());
             pthread_exit (NULL);
			break;
		}
		//得到执行信号,开始执行任务
		printf ("thread 0x%ld is starting to work\n", pthread_self ());
		JOB *job=worker->pool->jobs;
		
		//移除任务
		assert(job != NULL);//断言,任务结点不为空
		worker->pool->jobs=job->next;
		if(worker->pool->jobs != NULL)//worker->pool->jobs指向真正的任务结点时才将前驱结点置空
			worker->pool->jobs->prev=NULL;
		job->next=NULL;
		pthread_mutex_unlock(&(worker->pool->jobs_mutex));//解锁
		(*(job->process)) (job->arg);//执行函数
	}
    //释放资源
	free(worker);
	pthread_exit(NULL);
}
//销毁线程池
void ThreadPoolDestroy(ThreadPool *pool){
	for(WORK *worker = pool->workers; worker != NULL; worker = worker->next){
		worker->terminate = 1;//将每个工作线程的终止符设为1
	}
	pthread_mutex_lock(&(pool->jobs_mutex));//上锁
	pthread_cond_broadcast(&pool->jobs_cond);//传出信号量:结束到信号的工作线程开始销毁
	pthread_mutex_unlock(&(pool->jobs_mutex));//解锁
}

void *myprocess (void *arg)
{
    printf ("threadid is 0x%ld, working on task %d\n", pthread_self (),*(int *) arg);
    sleep (1);/*休息一秒,延长任务的执行时间*/
    return NULL;
}
int main (int argc, char **argv)
{
    ThreadPool *pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    ThreadPool_init(pool,3);
    /*连续向池中投入10个任务*/
    int tasknum[10];
    for(int i=0;i<10;i++){
    	tasknum[i]=i;
    	JOB *job = (JOB*)malloc(sizeof(JOB));
    	memset(job,0,sizeof(JOB));
    	job->process = myprocess;
    	job->arg = &tasknum[i];//需要传入不同的地址
    	job->next=NULL;
    	job->prev=NULL;
    	pool_add_job(pool,job);
    	
	}
    /*等待所有任务完成*/
    sleep (5);  //这句可能出问题,偷懒写法。任务执行时间长时,当main函数退出时,所有子线程被迫退出
    /*销毁线程池*/
    ThreadPoolDestroy (pool);
    return 0;
}

每个工作线程实际上一直在执行回调函数,在wait语句处阻塞,等待插入任务。传递信号量后,所有线程争抢任务,得到任务的线程执行,其余线程继续等待。
对任务进行加锁是防止多个线程同时取得任务,对任务进行修改,任务只能是固定参数。


三、C++实现 1.v1版本

使用单例模式包装线程池,全局仅产生一个对象

#pragma once
#ifndef _MYTHREADPOOL_V1_H_
#define _MYTHREADPOOL_V1_H_
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include

class MyThreadPool_v1{
private:
	static MyThreadPool_v1* instance;
	using ThreadPoolTask = std::function<void()>;//	包装器包装void类型
	typedef struct Task {
		ThreadPoolTask task;
		std::string name;
		Task() {
			this->task = nullptr;
			this->name = "";
		}
		Task(ThreadPoolTask task, std::string name) {
			this->task = task;
			this->name = name;
		}
	}Task;
	std::queue<Task> Tasks;		//任务队列
	std::vector<std::unique_ptr<std::thread>> WorkThreads;		//工作线程组,独占指针指向每一个线程
	std::mutex taskmutex;		//互斥锁
	std::condition_variable cond;		//条件变量
	static std::atomic<bool>running;	//原子 *** 作保证多线程不会产生争夺
	MyThreadPool_v1();
	~MyThreadPool_v1();
	void CallBack(int id);
	//防止复制
	MyThreadPool_v1(MyThreadPool_v1 const&) = delete;
	MyThreadPool_v1& operator=(MyThreadPool_v1 const&) = delete;
public:
	static MyThreadPool_v1* GetInstance();
	void PushTask(ThreadPoolTask task, std::string name);
	void desstory();
};
#endif // !_MYTHREADPOOL_V1_H_
#include"MyThreadPool_v1.h"
#include "windows.h"
#include
/**/
MyThreadPool_v1* MyThreadPool_v1::instance = nullptr;
std::atomic<bool> MyThreadPool_v1::running = true;	//原子 *** 作保证多线程不会产生争夺
MyThreadPool_v1* MyThreadPool_v1::GetInstance() {
	std::cout << "instance" << std::endl;
	if (nullptr == instance) {	//双重检验
		static std::mutex mutex;	//新建一个静态锁
		std::lock_guard<std::mutex>lock(mutex);	//上锁
		if (nullptr == instance) {
			//生成MyThreadPool_v1对象封装为共享指针赋值给单例对象
			instance = new MyThreadPool_v1();
		}
	}
	return instance;
}

MyThreadPool_v1::MyThreadPool_v1() {
	std::cout << "构造函数" << std::endl;
	int cpunums = 4;
	//std::condition_variable(cond);	//初始化条件变量
	for (int i = 0; i < cpunums; i++) {
		//std::lock_guardlock(taskmutex);	//上锁
		auto func = std::bind(&MyThreadPool_v1::CallBack, this, i);//绑定
		WorkThreads.push_back(std::make_unique<std::thread>(func));//加入指向func的独占指针,类型是线程类
	}
	printf("WorkThreads.size(): %d\n", WorkThreads.size());
}

MyThreadPool_v1::~MyThreadPool_v1() {
	{
		std::unique_lock<std::mutex> lock(taskmutex);
		running = false;
	}
	//通知所有工作线程重新抢锁并重新判断wait条件,唤醒后因为running为false了,所以都会结束
	cond.notify_all();//通知全部线程
	// 等待所有工作线程结束
	for (const auto &workthread : WorkThreads) {
		workthread->join();//由于线程都开始竞争了,因此必定会执行完,join可等待线程执行完
	}
}

void MyThreadPool_v1::CallBack(int id) {
	printf("WorkThread:  %d start\n", id);
	while (1) {
		std::unique_lock<std::mutex> lock(this->taskmutex);
		while (Tasks.empty()) {	//当任务队列为空时一直循环
			if (running == false) {
				break;
			}
			cond.wait(lock);	//阻塞
		}
		if (running == false) {
			break;
		}
		Task task;
		{	//同步 *** 作
			task = Tasks.front();
			Tasks.pop();
		}
		//解锁
		printf("Work(%d):start\n", id);
		task.task();//执行任务
		char ch[10];
		strcpy_s(ch, task.name.c_str());
		printf("正在执行的任务id:%s\n",ch);
		printf("Work(%d):end\n", id);
	}
	printf("Work(%d):stoped\n", id);
}

void MyThreadPool_v1::PushTask(ThreadPoolTask task, std::string name) {
	{
		//加锁
		std::unique_lock<std::mutex> lock(taskmutex);
		Tasks.push({ task,name });
		char ch[10];
		strcpy_s(ch, name.c_str());
	}
	cond.notify_one();//通知一个进程
}
void MyThreadPool_v1::desstory() {
	{
		std::unique_lock<std::mutex> lock(taskmutex);
		running = false;
	}
	//通知所有工作线程重新抢锁并重新判断wait条件,唤醒后因为running为false了,所以都会结束
	cond.notify_all();//通知全部线程
	// 等待所有工作线程结束
	for (const auto &workthread : WorkThreads) {
		workthread->join();//由于线程都开始竞争了,因此必定会执行完,join可等待线程执行完
	}
}
#include"MyThreadPool_v1.h"
#include
#include 
using namespace std;
void funct() {
	cout << "进入到函数:" << endl; 
	Sleep(2);
}

int main() {
	auto ThreadPool = MyThreadPool_v1::GetInstance();//	拿到线程池单例对象
	for (int i = 0; i < 10; i++) {
		ThreadPool->PushTask(funct, to_string(i));
	}
	Sleep(5);	//防止执行过快程序直接结束
	ThreadPool->desstory();	//线程池销毁时,判断所有线程是否执行完毕,进行阻塞,避免直接return 0中止未完成的线程
	return 0;	//任务执行时间长,但是程序已经运行到结尾,会直接中止所有线程
}
问题

当前台线程结束后,程序结束,main函数结束会强制所有后台子线程结束

1.任务执行时间长时,当main函数退出时,所有子线程被迫退出

2.单例线程池无法自动析构

3.任务函数参数不可变

2.v2版本

使用智能指针的单例模式实现线程池

使用C++11新特性实现可变参数模板,可以执行不同参数的任务

#pragma once
#ifndef _MYTHREADPOOL_V2_H_
#define _MYTHREADPOOL_V2_H_
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include 
class MyThreadPool_v2 {
public:
	static std::shared_ptr<MyThreadPool_v2> GetInstacne(int threadnums);
	~MyThreadPool_v2();
	//可变参数,利用后置decltype推导类型传递给auto
	template<class F, class... Args>
	auto PushTask(F&& f, Args&&... args) //任务管道函数
		-> std::future<typename std::result_of<F(Args...)>::type>;
private:
	MyThreadPool_v2(int threadnums);
	using ThreadPoolTask = std::function<void()>;//	包装器包装void类型
	std::queue<ThreadPoolTask> Tasks;		//任务队列
	std::vector<std::thread> WorkThreads;		//工作线程组,独占指针指向每一个线程

	std::mutex taskmutex;		//互斥锁
	std::condition_variable cond;		//条件变量
	std::atomic<bool>running = true;	//原子 *** 作保证多线程不会产生争夺
};

//模板不能和定义分离,在编译时期不知道参数定义
template <typename F, typename... Args>
auto MyThreadPool_v2::PushTask(F&& f, Args&& ...args)
	->std::future<typename std::result_of<F(Args...)>::type> {
	using return_type = typename std::result_of<F(Args...)>::type;

	//指向packaged_task的共享指针
	auto task = std::make_shared<std::packaged_task<return_type()>>(		//packaged_task包装一个可调用对象,可以传递给future对象,使其在另一线程获取结果
		std::bind(std::forward<F>(f), std::forward<Args>(args)...)		//完美转发,保持函数和参数类型不变
		);
	//获取任务future
	std::future<return_type> res = task->get_future();
	{
		std::unique_lock<std::mutex>lock(taskmutex);//独占加锁

		if (!running) {
			throw std::runtime_error("PushTask on stopped ThreadPool");
		}
		Tasks.emplace([task]() { (*task)(); });
		// 发送通知,唤醒一个wait状态的工作线程重新抢锁并重新判断wait条件
		cond.notify_one();
		return res;
	}
}
#endif // !_MYTHREADPOOL_V2_H_
#include"MyThreadPool_v2.h"
std::shared_ptr<MyThreadPool_v2> MyThreadPool_v2::GetInstacne(int threadnums) {
	//第一次声明静态对象是线程安全的,存在静态区域
	static std::shared_ptr<MyThreadPool_v2> instacne_v2(new MyThreadPool_v2(threadnums));
	return instacne_v2;
}

MyThreadPool_v2::MyThreadPool_v2(int threadnums) {
	//int threadnums = 4;
	printf("构造函数\n");
	for (int i = 0; i < threadnums; ++i) {
		//每一个线程执行一个匿名函数
		printf("线程%d开始执行\n", i);
		//匿名函数包含加锁,阻塞,取任务,执行,即之前的回调函数
		WorkThreads.emplace_back
		(
			[this]
		{
			for (;;)
			{
				ThreadPoolTask task;//任务对象
				{
					std::unique_lock<std::mutex> lock(this->taskmutex);
					/**等待条件成立(当线程池被关闭或有可消费任务时跳过wait继续;否则cond.wait将会unlock释放锁,
					* *其他线程可以继续拿锁,但此线程会阻塞此处并休眠,直到被notify_*唤醒,被唤醒时wait会再次lock并判断条件是否成立,
					* *如成立则跳过wait,否则unlock并休眠继续等待下次唤醒)
					* */
					this->cond.wait(lock, [this] { return !this->running || !this->Tasks.empty(); });
					// 如果线程池停止且任务队列为空,说明需要关闭线程池结束返回
					if (!running&&Tasks.empty()) {
						return;
					}
					// 取得任务队首任务(注意此处的std::move)
					task = std::move(this->Tasks.front());
					// 从队列移除
					this->Tasks.pop();
				}
				//执行任务
				task();
			}
		}
		);
	}
}

MyThreadPool_v2::~MyThreadPool_v2() {
	printf("析构函数\n");
	{
		std::unique_lock<std::mutex> lock(taskmutex);
		running = false;
	}
	//通知所有工作线程重新抢锁并重新判断wait条件,唤醒后因为running为false了,所以都会结束
	cond.notify_all();//通知全部线程
	// 等待所有工作线程结束
	for (auto &workthread : WorkThreads) {
		workthread.join();//由于线程都开始竞争了,因此必定会执行完,join可等待线程执行完
	}
}
#include 
#include 
#include 
#include 
#include "MyThreadPool_v2.h"
int main() {
	auto pool = MyThreadPool_v2::GetInstacne(4);
	/**/
	std::vector< std::future<int> > results;    //线程执行结果
	// 批量执行线程任务
	for (int i = 0; i < 8; ++i) {
		results.emplace_back(   // 保存线程执行结果到results
			pool->PushTask([i] {  // 添加一个新的工作任务到线程池
			std::cout << "hello " << i << std::endl;
			//std::this_thread::sleep_for(std::chrono::seconds(1));
			//std::cout << "world " << i << std::endl;
			return i * i;
		})
		);
	}
	// 打印线程执行结果
	for (auto && result : results)
		std::cout << "result:" << result.get() << std::endl;
	std::cout << std::endl;
	return 0;
}

在单例模式中使用了智能指针,最后的析构交由智能指针实现。C++11标准中静态对象的线程安全创建,其静态对象必须在GetInstance函数中第一次声明,不必使用双重检验来保证线程安全。PushTask函数定义了可变参数模板,使用了C++11的新特性,后置推导参数类型传递给auto,这两篇文章讲的非常详细参考1,参考2。


总结

记录一下实现线程池遇到的问题,并和之前的单例模式结合实现,其应用场景非常广泛,其中若有bug欢迎大家指出,期待指导。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/921264.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存