下面是内存溢出 jb51.cc 通过网络收集整理的代码片段。
内存溢出小编现在分享给大家,也给大家做个参考。
/*** 实现功能
* 1.初始化指定个数的线程
* 2.使用链表来管理任务队列
* 3.支持拓展动态线程
* 4.如果闲置线程过多,动态销毁部分线程
*/
#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include "threadpool.h"/** * 实现功能 * 1.初始化指定个数的线程 * 2.使用链表来管理任务队列 * 3.支持拓展动态线程 * 4.如果闲置线程过多,动态销毁部分线程 *//**------------------线程池API函数----------------* 创建线程池* thread_pool_t* thread_pool_create(int num);* 线程的入口函数* voID *thread_excute_route(voID *arg);* 添加任务* int thread_pool_add_worker(thread_pool_t *pool,voID*(*process)(voID *arg),voID *arg); * 如果闲置时间过久则销毁部分线程** voID *thread_pool_is_need_recovery(voID *arg);* 检测是否需要拓展线程* voID *thread_pool_is_need_extend(voID *arg);* 等待所有的线程结束 voID thread_pool_wait(thread_pool_t *pool);* 销毁线程池 voID thread_pool_destory(thread_pool_t *pool);* 显示线程的状态信息* voID *display_thread(voID *arg);*//*-----------------demo------------------------*/struct param{ int ID;};voID *demo(voID *arg){ struct param *p = (struct param*)arg; printf("thread[%ld] excute task %d\n",pthread_self(),p->ID);}int main(int argc,char **argv){ thread_pool_t *thread_pool=NulL; thread_pool = thread_pool_create(10); /*添加任务*/ int i=0; struct param *p = NulL; for(i=0;i<10000;i++){ p = (struct param*) malloc(sizeof(struct param)); p->ID =i; thread_pool_add_worker(thread_pool,demo,(voID*)p); //free(p); } thread_pool_wait(thread_pool); //thread_pool_destory(thread_pool); return 0;}
#include <stdio.h>#include <pthread.h>#include <stdlib.h>#include <signal.h>/*线程的任务队列由,函数和参数组成,任务由链表来进行管理*/typedef struct thread_worker_s{ voID *(*process)(voID *arg); //处理函数 voID *arg; //参数 struct thread_worker_s *next;}thread_worker_t;#define bool int#define true 1#define false 0/*线程池中各线程状态描述*/#define THREAD_STATE_RUN 0#define THREAD_STATE_TASK_WAITING 1#define THREAD_STATE_TASK_PROCESSING 2#define THREAD_STATE_TASK_FINISHED 3#define THREAD_STATE_EXIT 4 typedef struct thread_info_s{ pthread_t ID; int state; struct thread_info_s *next;}thread_info_t;static char* thread_state_map[] ={"创建","等待任务","处理中","处理完成","已退出"};/*线程压缩的时候只有 0,1,2,4 状态的线程可以销毁*//*线程池管理器*/#define THREAD_BUSY_PERCENT 0.5 /*线程:任务 = 1:2 值越小,说明任务多,增加线程*/#define THREAD_IDLE_PERCENT 2 /*线程:任务 = 2:1 值大于1,线程多于任务,销毁部分线程*/typedef struct thread_pool_s{ pthread_mutex_t queue_lock ; //队列互斥锁,即涉及到队列修改时需要加锁 pthread_cond_t queue_ready; //队列条件锁,队列满足某个条件,触发等待这个条件的线程继续执行,比如说队列满了,队列空了 thread_worker_t *head ; //任务队列头指针 bool is_destroy ; //线程池是否已经销毁 int num; //线程的个数 int rnum; ; //正在跑的线程 int knum; ; //已杀死的线程 int queue_size ; //工作队列的大小 thread_info_t *threads ; //线程组ID,通过pthread_join(thread_IDs[0],NulL) 来执行线程 pthread_t display ; //打印线程 pthread_t destroy ; //定期销毁线程的线程ID pthread_t extend ; float percent ; //线程个数于任务的比例 rnum/queue_size int init_num ; pthread_cond_t extend_ready ; //如果要增加线程}thread_pool_t;/*-------------------------函数声明----------------------*//** * 1.初始化互斥变量 * 2.初始化等待变量 * 3.创建指定个数的线程线程 */thread_pool_t* thread_pool_create(int num);voID *thread_excute_route(voID *arg);/*调试函数*/voID deBUG(char *message,int flag){ if(flag) printf("%s\n",message);}voID *display_thread(voID *arg);/** * 添加任务包括以下几个 *** 作 * 1.将任务添加到队列末尾 * 2.通知等待进程来处理这个任务 pthread_cond_singal();*/int thread_pool_add_worker(thread_pool_t *pool,voID *arg); //网线程池的队列中增加一个需要执行的函数,也就是任务/** * 销毁线程池,包括以下几个部分 * 1.通知所有等待的进程 pthread_cond_broadcase * 2.等待所有的线程执行完 * 3.销毁任务列表 * 4.释放锁,释放条件 * 4.销毁线程池对象 */voID *thread_pool_is_need_recovery(voID *arg);voID *thread_pool_is_need_extend(voID *arg);voID thread_pool_destory(thread_pool_t *pool);thread_pool_t *thread_pool_create(int num){ if(num<1){ return NulL; } thread_pool_t *p; p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s)); if(p==NulL) return NulL; p->init_num = num; /*初始化互斥变量与条件变量*/ pthread_mutex_init(&(p->queue_lock),NulL); pthread_cond_init(&(p->queue_ready),NulL); /*设置线程个数*/ p->num = num; p->rnum = num; p->knum = 0; p->head = NulL; p->queue_size =0; p->is_destroy = false; int i=0; thread_info_t *tmp=NulL; for(i=0;i<num;i++){ /*创建线程*/ tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s)); if(tmp==NulL){ free(p); return NulL; }else{ tmp->next = p->threads; p->threads = tmp; } pthread_create(&(tmp->ID),NulL,thread_excute_route,p); tmp->state = THREAD_STATE_RUN; } /*显示*/ pthread_create(&(p->display),display_thread,p); /*检测是否需要动态线程*/ //pthread_create(&(p->extend),thread_pool_is_need_extend,p); /*动态销毁*/ pthread_create(&(p->destroy),thread_pool_is_need_recovery,p); return p;}int thread_pool_add_worker(thread_pool_t *pool,voID*(*process)(voID*arg),voID*arg){ thread_pool_t *p= pool; thread_worker_t *worker=NulL,*member=NulL; worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s)); int incr=0; if(worker==NulL){ return -1; } worker->process = process; worker->arg = arg; worker->next = NulL; thread_pool_is_need_extend(pool); pthread_mutex_lock(&(p->queue_lock)); member = p->head; if(member!=NulL){ while(member->next!=NulL) member = member->next; member->next = worker; }else{ p->head = worker; } p->queue_size ++; pthread_mutex_unlock(&(p->queue_lock)); pthread_cond_signal(&(p->queue_ready)); return 1;}voID thread_pool_wait(thread_pool_t *pool){ thread_info_t *thread; int i=0; for(i=0;i<pool->num;i++){ thread = (thread_info_t*)(pool->threads+i); thread->state = THREAD_STATE_EXIT; pthread_join(thread->ID,NulL); }}voID thread_pool_destory(thread_pool_t *pool){ thread_pool_t *p = pool; thread_worker_t *member = NulL; if(p->is_destroy) return ; p->is_destroy = true; pthread_cond_broadcast(&(p->queue_ready)); thread_pool_wait(pool); free(p->threads); p->threads = NulL; /*销毁任务列表*/ while(p->head){ member = p->head; p->head = member->next; free(member); } /*销毁线程列表*/ thread_info_t *tmp=NulL; while(p->threads){ tmp = p->threads; p->threads = tmp->next; free(tmp); } pthread_mutex_destroy(&(p->queue_lock)); pthread_cond_destroy(&(p->queue_ready)); return ;}/*通过线程ID,找到对应的线程*/thread_info_t *get_thread_by_ID(thread_pool_t *pool,pthread_t ID){ thread_info_t *thread=NulL; thread_info_t *p=pool->threads; while(p!=NulL){ if(p->ID==ID) return p; p = p->next; } return NulL;}/*每个线程入口函数*/voID *thread_excute_route(voID *arg){ thread_worker_t *worker = NulL; thread_info_t *thread = NulL; thread_pool_t* p = (thread_pool_t*)arg; //printf("thread %lld create success\n",pthread_self()); while(1){ pthread_mutex_lock(&(p->queue_lock)); /*获取当前线程的ID*/ pthread_t pthread_ID = pthread_self(); /*设置当前状态*/ thread = get_thread_by_ID(p,pthread_ID); /*线程池被销毁,并且没有任务了*/ if(p->is_destroy==true && p->queue_size ==0){ pthread_mutex_unlock(&(p->queue_lock)); thread->state = THREAD_STATE_EXIT; p->knum ++; p->rnum --; pthread_exit(NulL); } if(thread){ thread->state = THREAD_STATE_TASK_WAITING; /*线程正在等待任务*/ } /*线程池没有被销毁,没有任务到来就一直等待*/ while(p->queue_size==0 && !p->is_destroy){ pthread_cond_wait(&(p->queue_ready),&(p->queue_lock)); } p->queue_size--; worker = p->head; p->head = worker->next; pthread_mutex_unlock(&(p->queue_lock)); if(thread) thread->state = THREAD_STATE_TASK_PROCESSING; /*线程正在执行任务*/ (*(worker->process))(worker->arg); if(thread) thread->state = THREAD_STATE_TASK_FINISHED; /*任务执行完成*/ free(worker); worker = NulL; }}/*拓展线程*/voID *thread_pool_is_need_extend(voID *arg){ thread_pool_t *p = (thread_pool_t *)arg; thread_pool_t *pool = p; /*判断是否需要增加线程,最终目的 线程:任务=1:2*/ if(p->queue_size>100){ int incr =0; if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){ incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; /*计算需要增加线程个数*/ int i=0; thread_info_t *tmp=NulL; thread_pool_t *p = pool; pthread_mutex_lock(&pool->queue_lock); if(p->queue_size<100){ pthread_mutex_unlock(&pool->queue_lock); return ; } for(i=0;i<incr;i++){ /*创建线程*/ tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s)); if(tmp==NulL){ continue; }else{ tmp->next = p->threads; p->threads = tmp; } p->num ++; p->rnum ++; pthread_create(&(tmp->ID),p); tmp->state = THREAD_STATE_RUN; } pthread_mutex_unlock(&pool->queue_lock); } } //pthread_cond_signal(&pool->extend_ready);}pthread_cond_t sum_ready;/*恢复初始线程个数*/voID *thread_pool_is_need_recovery(voID *arg){ thread_pool_t *pool = (thread_pool_t *)arg; int i=0; thread_info_t *tmp = NulL,*prev=NulL,*p1=NulL; /*如果没有任务了,当前线程大于初始化的线程个数*/ while(1){ i=0; if(pool->queue_size==0 && pool->rnum > pool->init_num ){ sleep(5); /*5s秒内还是这个状态的话就,销毁部分线程*/ if(pool->queue_size==0 && pool->rnum > pool->init_num ){ pthread_mutex_lock(&pool->queue_lock); tmp = pool->threads; while((pool->rnum != pool->init_num) && tmp){ /*找到空闲的线程*/ if(tmp->state != THREAD_STATE_TASK_PROCESSING){ i++; if(prev) prev->next = tmp->next; else pool->threads = tmp->next; pool->rnum --; /*正在运行的线程减一*/ pool->knum ++; /*销毁的线程加一*/ kill(tmp->ID,SIGKILL); /*销毁线程*/ p1 = tmp; tmp = tmp->next; free(p1); continue; } prev = tmp; tmp = tmp->next; } pthread_mutex_unlock(&pool->queue_lock); printf("5s内没有新任务销毁部分线程,销毁了 %d 个线程\n",i); } } sleep(5); }}/*打印一些信息的*/voID *display_thread(voID *arg){ thread_pool_t *p =(thread_pool_t *)arg; thread_info_t *thread = NulL; int i=0; while(1){ printf("threads %d,running %d,killed %d\n",p->num,p->rnum,p->knum); /*线程总数,正在跑的,已销毁的*/ thread = p->threads; while(thread){ printf("ID=%ld,state=%s\n",thread->ID,thread_state_map[thread->state]); thread = thread->next; } sleep(5); }}
以上是内存溢出(jb51.cc)为你收集整理的全部代码内容,希望文章能够帮你解决所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
总结以上是内存溢出为你收集整理的C 语言实现线程池,支持动态拓展和销毁全部内容,希望文章能够帮你解决C 语言实现线程池,支持动态拓展和销毁所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)