C 语言实现线程池,支持动态拓展和销毁

C 语言实现线程池,支持动态拓展和销毁,第1张

概述C 语言实现线程池,支持动态拓展销毁

下面是内存溢出 jb51.cc 通过网络收集整理的代码片段。

内存溢出小编现在分享给大家,也给大家做个参考。

/**
 * 实现功能
 * 1.初始化指定个数的线程
 * 2.使用链表来管理任务队列
 * 3.支持拓展动态线程
 * 4.如果闲置线程过多,动态销毁部分线程
 */
#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include "threadpool.h"/** * 实现功能 * 1.初始化指定个数的线程 * 2.使用链表来管理任务队列 * 3.支持拓展动态线程 * 4.如果闲置线程过多,动态销毁部分线程 *//**------------------线程池API函数----------------* 创建线程池* thread_pool_t* thread_pool_create(int num);* 线程的入口函数* voID *thread_excute_route(voID *arg);* 添加任务* int   thread_pool_add_worker(thread_pool_t *pool,voID*(*process)(voID *arg),voID *arg);  * 如果闲置时间过久则销毁部分线程** voID *thread_pool_is_need_recovery(voID *arg);* 检测是否需要拓展线程* voID *thread_pool_is_need_extend(voID *arg);* 等待所有的线程结束  voID  thread_pool_wait(thread_pool_t *pool);* 销毁线程池  voID  thread_pool_destory(thread_pool_t *pool);*  显示线程的状态信息*   voID *display_thread(voID *arg);*//*-----------------demo------------------------*/struct param{	int ID;};voID *demo(voID *arg){	 struct param *p = (struct param*)arg;	 printf("thread[%ld] excute task %d\n",pthread_self(),p->ID);}int main(int argc,char **argv){	thread_pool_t  *thread_pool=NulL;	thread_pool = thread_pool_create(10); 	/*添加任务*/	int i=0;	struct param *p = NulL;	for(i=0;i<10000;i++){		p = (struct param*) malloc(sizeof(struct param));		p->ID =i;		thread_pool_add_worker(thread_pool,demo,(voID*)p);		//free(p);	}	thread_pool_wait(thread_pool);	//thread_pool_destory(thread_pool);	return 0;}
#include <stdio.h>#include <pthread.h>#include <stdlib.h>#include <signal.h>/*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/typedef struct  thread_worker_s{	voID *(*process)(voID *arg);  //处理函数	voID *arg;                    //参数	struct thread_worker_s *next;}thread_worker_t;#define bool int#define true  1#define false 0/*线程池中各线程状态描述*/#define THREAD_STATE_RUN               0#define THREAD_STATE_TASK_WAITING      1#define THREAD_STATE_TASK_PROCESSING   2#define THREAD_STATE_TASK_FINISHED     3#define THREAD_STATE_EXIT              4     typedef struct thread_info_s{	pthread_t ID;	int       state;  	struct  thread_info_s *next;}thread_info_t;static char* thread_state_map[] ={"创建","等待任务","处理中","处理完成","已退出"};/*线程压缩的时候只有 0,1,2,4 状态的线程可以销毁*//*线程池管理器*/#define THREAD_BUSY_PERCENT  0.5   /*线程:任务 = 1:2 值越小,说明任务多,增加线程*/#define THREAD_IDLE_PERCENT  2     /*线程:任务 = 2:1 值大于1,线程多于任务,销毁部分线程*/typedef struct thread_pool_s{	pthread_mutex_t queue_lock ; //队列互斥锁,即涉及到队列修改时需要加锁	pthread_cond_t  queue_ready; //队列条件锁,队列满足某个条件,触发等待这个条件的线程继续执行,比如说队列满了,队列空了	thread_worker_t *head      ;       //任务队列头指针	bool   		is_destroy     ;       //线程池是否已经销毁	int num;					       //线程的个数	int rnum; 				   ;       //正在跑的线程	int knum; 				   ;       //已杀死的线程	int queue_size             ;	   //工作队列的大小  	thread_info_t *threads     ;       //线程组ID,通过pthread_join(thread_IDs[0],NulL) 来执行线程	pthread_t     display      ;	   //打印线程	pthread_t     destroy      ;       //定期销毁线程的线程ID	pthread_t     extend       ;	float  percent 			   ;       //线程个数于任务的比例 rnum/queue_size	int    init_num            ;	pthread_cond_t    extend_ready		   ;       //如果要增加线程}thread_pool_t;/*-------------------------函数声明----------------------*//** *  1.初始化互斥变量 *  2.初始化等待变量 *  3.创建指定个数的线程线程 */thread_pool_t* thread_pool_create(int num);voID *thread_excute_route(voID *arg);/*调试函数*/voID deBUG(char *message,int flag){	if(flag)		printf("%s\n",message);}voID *display_thread(voID *arg);/** *  添加任务包括以下几个 *** 作 *  1.将任务添加到队列末尾 *  2.通知等待进程来处理这个任务 pthread_cond_singal();*/int thread_pool_add_worker(thread_pool_t *pool,voID *arg);  //网线程池的队列中增加一个需要执行的函数,也就是任务/** *  销毁线程池,包括以下几个部分 *  1.通知所有等待的进程 pthread_cond_broadcase *  2.等待所有的线程执行完 *  3.销毁任务列表 *  4.释放锁,释放条件 *  4.销毁线程池对象 */voID *thread_pool_is_need_recovery(voID *arg);voID *thread_pool_is_need_extend(voID *arg);voID thread_pool_destory(thread_pool_t *pool);thread_pool_t *thread_pool_create(int num){	if(num<1){		return NulL;	}	thread_pool_t *p;	p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s));	if(p==NulL)		return NulL;	p->init_num = num;	/*初始化互斥变量与条件变量*/	pthread_mutex_init(&(p->queue_lock),NulL);	pthread_cond_init(&(p->queue_ready),NulL);	/*设置线程个数*/	p->num   = num; 	p->rnum  = num;	p->knum  = 0;	p->head = NulL;	p->queue_size =0; 	p->is_destroy = false;		int i=0;	thread_info_t *tmp=NulL;	for(i=0;i<num;i++){		/*创建线程*/		tmp=  (struct thread_info_s*)malloc(sizeof(struct thread_info_s));		if(tmp==NulL){		 	free(p);		 	return NulL;		}else{			tmp->next = p->threads;			p->threads = tmp;		}		pthread_create(&(tmp->ID),NulL,thread_excute_route,p);		tmp->state = THREAD_STATE_RUN;	}	/*显示*/	pthread_create(&(p->display),display_thread,p);	/*检测是否需要动态线程*/	//pthread_create(&(p->extend),thread_pool_is_need_extend,p);	/*动态销毁*/	pthread_create(&(p->destroy),thread_pool_is_need_recovery,p);	return p;}int thread_pool_add_worker(thread_pool_t *pool,voID*(*process)(voID*arg),voID*arg){	thread_pool_t *p= pool;	thread_worker_t *worker=NulL,*member=NulL;	worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s));	int incr=0;	if(worker==NulL){		return -1;	}	worker->process = process;	worker->arg     = arg;	worker->next    = NulL;	thread_pool_is_need_extend(pool);	pthread_mutex_lock(&(p->queue_lock));	member = p->head;	if(member!=NulL){		while(member->next!=NulL)			member = member->next;		member->next = worker;	}else{		p->head = worker;	}	p->queue_size ++;	pthread_mutex_unlock(&(p->queue_lock));	pthread_cond_signal(&(p->queue_ready));	return 1;}voID thread_pool_wait(thread_pool_t *pool){	thread_info_t *thread;	int i=0;	for(i=0;i<pool->num;i++){		thread = (thread_info_t*)(pool->threads+i);		thread->state = THREAD_STATE_EXIT;		pthread_join(thread->ID,NulL);	}}voID thread_pool_destory(thread_pool_t *pool){	thread_pool_t   *p     = pool;	thread_worker_t *member = NulL;	if(p->is_destroy)		return ;	p->is_destroy = true;	pthread_cond_broadcast(&(p->queue_ready));	thread_pool_wait(pool);	free(p->threads);	p->threads = NulL;	/*销毁任务列表*/	while(p->head){		member = p->head;		p->head = member->next;		free(member);	}	/*销毁线程列表*/	thread_info_t *tmp=NulL;	while(p->threads){		tmp = p->threads;		p->threads = tmp->next;		free(tmp);	}	pthread_mutex_destroy(&(p->queue_lock));	pthread_cond_destroy(&(p->queue_ready));	return ;}/*通过线程ID,找到对应的线程*/thread_info_t *get_thread_by_ID(thread_pool_t *pool,pthread_t ID){	thread_info_t *thread=NulL;	thread_info_t *p=pool->threads;	while(p!=NulL){		if(p->ID==ID)			return p;		p = p->next;	}	return NulL;}/*每个线程入口函数*/voID *thread_excute_route(voID *arg){	thread_worker_t *worker = NulL;	thread_info_t   *thread = NulL; 	thread_pool_t*   p = (thread_pool_t*)arg;	//printf("thread %lld create success\n",pthread_self());	while(1){		pthread_mutex_lock(&(p->queue_lock));		/*获取当前线程的ID*/		pthread_t  pthread_ID = pthread_self();		/*设置当前状态*/		thread = get_thread_by_ID(p,pthread_ID);		/*线程池被销毁,并且没有任务了*/		if(p->is_destroy==true && p->queue_size ==0){			pthread_mutex_unlock(&(p->queue_lock));			thread->state = THREAD_STATE_EXIT;			p->knum ++;			p->rnum --;			pthread_exit(NulL);		}		if(thread){			thread->state = THREAD_STATE_TASK_WAITING;  /*线程正在等待任务*/		}		/*线程池没有被销毁,没有任务到来就一直等待*/		while(p->queue_size==0 && !p->is_destroy){			pthread_cond_wait(&(p->queue_ready),&(p->queue_lock));		}		p->queue_size--;		worker = p->head;		p->head = worker->next;		pthread_mutex_unlock(&(p->queue_lock));				if(thread)			thread->state = THREAD_STATE_TASK_PROCESSING;  /*线程正在执行任务*/		(*(worker->process))(worker->arg);		if(thread)			thread->state = THREAD_STATE_TASK_FINISHED;    /*任务执行完成*/		free(worker);		worker = NulL;	}}/*拓展线程*/voID *thread_pool_is_need_extend(voID *arg){	thread_pool_t *p = (thread_pool_t *)arg;	thread_pool_t *pool = p;	/*判断是否需要增加线程,最终目的 线程:任务=1:2*/	if(p->queue_size>100){		int incr =0;		if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){			incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum;  /*计算需要增加线程个数*/			int i=0;			thread_info_t *tmp=NulL;			thread_pool_t *p = pool;			pthread_mutex_lock(&pool->queue_lock);			if(p->queue_size<100){				pthread_mutex_unlock(&pool->queue_lock);				return ;			}			for(i=0;i<incr;i++){				/*创建线程*/				tmp=  (struct thread_info_s*)malloc(sizeof(struct thread_info_s));				if(tmp==NulL){				 	continue;				}else{					tmp->next = p->threads;					p->threads = tmp;				}				p->num  ++;				p->rnum ++;				pthread_create(&(tmp->ID),p);				tmp->state = THREAD_STATE_RUN;			}			pthread_mutex_unlock(&pool->queue_lock);		}	}	//pthread_cond_signal(&pool->extend_ready);}pthread_cond_t  sum_ready;/*恢复初始线程个数*/voID *thread_pool_is_need_recovery(voID *arg){	thread_pool_t *pool = (thread_pool_t *)arg;	int i=0;	thread_info_t *tmp = NulL,*prev=NulL,*p1=NulL;	/*如果没有任务了,当前线程大于初始化的线程个数*/	while(1){		i=0;		if(pool->queue_size==0 && pool->rnum > pool->init_num ){			sleep(5);			/*5s秒内还是这个状态的话就,销毁部分线程*/			if(pool->queue_size==0 && pool->rnum > pool->init_num ){				pthread_mutex_lock(&pool->queue_lock);				tmp = pool->threads;				while((pool->rnum != pool->init_num) && tmp){					/*找到空闲的线程*/					if(tmp->state != THREAD_STATE_TASK_PROCESSING){						i++;						if(prev)							prev->next    = tmp->next;						else							pool->threads =  tmp->next;						pool->rnum --;  /*正在运行的线程减一*/						pool->knum ++;  /*销毁的线程加一*/						kill(tmp->ID,SIGKILL); /*销毁线程*/						p1  = tmp;						tmp = tmp->next;						free(p1);						continue;					}					prev = tmp;					tmp  = tmp->next;				}				pthread_mutex_unlock(&pool->queue_lock);				printf("5s内没有新任务销毁部分线程,销毁了 %d 个线程\n",i);			}		}		sleep(5);	}}/*打印一些信息的*/voID *display_thread(voID *arg){	thread_pool_t *p =(thread_pool_t *)arg;	thread_info_t *thread = NulL;	int i=0;	while(1){		printf("threads %d,running %d,killed %d\n",p->num,p->rnum,p->knum);	/*线程总数,正在跑的,已销毁的*/		thread = p->threads;		while(thread){			printf("ID=%ld,state=%s\n",thread->ID,thread_state_map[thread->state]);			thread = thread->next;		}		sleep(5);	}}

以上是内存溢出(jb51.cc)为你收集整理的全部代码内容,希望文章能够帮你解决所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

总结

以上是内存溢出为你收集整理的C 语言实现线程池,支持动态拓展和销毁全部内容,希望文章能够帮你解决C 语言实现线程池,支持动态拓展和销毁所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1231866.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-06
下一篇 2022-06-06

发表评论

登录后才能评论

评论列表(0条)

保存