简单的线程池cc++

简单的线程池cc++,第1张

简单的线程池c/c++

linux 下的C语言版本:

#include 
#include 
#include 
#include 
#include 
#include 

#define LL_INSERT(item,list) do{
    item->prev = NULL;
    item->next = list;
    if(list!=NULL) list->prev=item;
    list = item;
}while(0)

#define LL_REMOVE(item,list) do {
    if(item->prev !=NULL) item->prev->next = item->next;
    if(item->next !=NULL) item->next->prev = item->prev;
    if(list == item) list = item->next;
    item->prev = item->next=NULL;
}while(0)
//任务结构体
//具体的任务处理函数 用户传进来的参数(在必要的时候传给处理函数) 
//因为任务需要放在队列里面,而且这个队列要很方便地插入和删除,所以使用链表比较好
struct NJOB {
    void (*func)(void* args);
    void *user_data;
    struct NJOB* prev;
    struct NJOB* next;
};
//执行任务的线程需要有一些属性辅助工作,所以封装一个worker结构体,存放辅助变量
//线程,线程退出标志 ,线程闲置标志,所属线程池的句柄
//同理这些线程要放在一个地方以便使用,考虑到可能动态的增加线程或者减少线程,所以
//也用一个链表存放
struct MANAGER;
struct NWORKER {
    pthread_t th;
    int terminate;
    int idle;
    struct MANAGER* pool;

    struct NWORKER* prev;
    struct NWORKER* next;
};

//线程队列和任务队列之间的调度,需要控制,所以加一个manager
//任务队列会被多个线程共享,是临界资源,需要加锁保护  mtx cond 
struct MANAGER {
    struct NJOB* jobs;
    struct NWORKER* workers;
    int numworkers;
    pthread_mutex_t jobs_mtx;
    pthread_cond_t jobs_cond;
};
typedef struct MANAGER nThreadPool;

static void* nThreadEntranceFunc(void* arg){
    struct NWORKER* wkr = (struct NWORKER*)arg;
    struct MANAGER* pool = wkr->pool;
    while(1){
        pthread_mutex_lock(&pool->jobs_mtx);
        while(pool->jobs==NULL){
            if(wkr->terminate>0)break;
            wkr->idle = 1;
            //等待被条件唤醒,一旦被唤醒就会开始抢锁,只有拿到锁pthread_cond_wait才会返回
            pthread_cond_wait(&pool->jobs_cond,&pool->jobs_mtx);
        }
        if(wkr->terminate>0){
            pthread_mutex_unlock(&pool->jobs_mtx);
            break;
        }
        wkr->idle = 0;
        struct NJOB* job = pool->jobs;
        for(;job->next!=NULL;job=job->next);
        LL_REMOVE(job,pool->jobs);
        pthread_mutex_unlock(&pool->jobs_mtx);
        job->func(job->user_data);
        free(job->user_data);
        free(job);
    }
    free(wkr);
    return NULL;
}

int nThreadPoolInit(nThreadPool *pool, int numworkers){
    if(numworkers<1) numworkers=1;
    if(pool==NULL) return -1;
    memset(pool,0,sizeof(nThreadPool));
    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
    memcpy(&pool->jobs_cond,&blank_cond,sizeof(pthread_cond_t));
    pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
    memcpy(&pool->jobs_mtx,&blank_mutex,sizeof(pthread_mutex_t));

    int i=0;
    for(;ipool = pool;
        int ret = pthread_create(&wkr->th,NULL,nThreadEntranceFunc,(void*)wkr);
        if(ret!=0){
            free(wkr);
            return -3;
        }
        LL_INSERT(wkr,pool->workers);
        pool->numworkers += 1;
    }
    return 0;
}

void nThreadPoolJobPush(nThreadPool *pool, struct NJOB* job){
    pthread_mutex_lock(&pool->jobs_mtx);
    LL_INSERT(job,pool->jobs);
    //如果cond_wait队列不为空,则至少将一个线程从cond_wait内部的阻塞状态唤醒
    pthread_cond_signal(&pool->jobs_cond);
    pthread_mutex_unlock(&pool->jobs_mtx);
}

int  nThreadPoolWorkerAdd(nThreadPool *pool,int num){
    if(pool==NULL||pool->workers==NULL)
        return  -1;
    for(int i=0;ipool = pool;
        int ret = pthread_create(&wkr->th,NULL,nThreadEntranceFunc,wkr);
        if(ret!=0){
            free(wkr);
            return -3;
        }
        LL_INSERT(wkr,pool->workers);
        pool->numworkers+=1;
    }
    return pool->numworkers;
}

int  nThreadPoolWorkerDrop(nThreadPool *pool,int num){
    struct NWORKER* wkr = pool->workers;
    int count=0;
    pthread_mutex_lock(&pool->jobs_mtx);
    for(;wkr!=NULL;wkr=wkr->next){
        if(wkr->idle>0){
            wkr->terminate=1;
            count++;
        }
        if(count>=num)break;
    }
    pthread_mutex_unlock(&pool->jobs_mtx);
    pthread_cond_broadcast(&pool->jobs_cond);
    return count;
}

void nThreadPoolDestroy(nThreadPool *pool){
    struct NWORKER* wkr=NULL;
    for(wkr=pool->workers;wkr!=NULL;wkr=wkr->next)
        wkr->terminate=1;
    pthread_cond_broadcast(&pool->jobs_cond);
    // make sure pool is terminate in real.
    sleep(1); 
    return;
}

//以下是模拟客户端程序
struct ArgCallback{
    char name[15];
    int age;
};

void callbacks(void* arg){
    struct ArgCallback* para=(struct ArgCallback*)arg;
    sleep(1);
    printf("name=%s, age=%dn",para->name,para->age);
}

void EXIT(int sig){
    printf("外部信号:%d,主进程退出n",sig);
    exit(0);
}

int main(int argc, char* args[]){
    nThreadPool Pool;
    nThreadPoolInit(&Pool,50);
    for(int i=0;i<500;i++){
        struct NJOB* job=(struct NJOB*)malloc(sizeof(struct NJOB));
        memset(job,0,sizeof(struct NJOB));
        struct ArgCallback* arg=(struct ArgCallback*)malloc(sizeof(struct ArgCallback));
        memset(arg,0,sizeof(struct ArgCallback));
        sprintf(arg->name,"id_%d",i);
        arg->age=i;
        job->user_data=(void*)arg;
        job->func=callbacks;
        nThreadPoolJobPush(&Pool,job);
    }
    int isdestroyed=0;
    while(1){
        char c = getchar();
        if(c=='d'){
            nThreadPoolDestroy(&Pool);
            isdestroyed = 1;
        }
        if(isdestroyed!=0&&c=='q')
            break;
    }
    return 0;
    // signal(SIGINT,EXIT); //ctrl+c signal
    // signal(SIGTERM,EXIT);//kill -15 signal
    // while(1){
    //     sleep(10);
    // }
    // return 0;
}

跨平台的c++版本:

#include 
#include 
#include 
#include 
#include 
#include 

struct NJOB {
    void (*func)(void* args);
    void *user_data;
};

struct MANAGER;
struct NWORKER {
    std::thread th;
    int terminate;
    int idle;
    struct MANAGER* pool;
};

struct MANAGER {
    std::list  jobs;
    std::list workers;
    int numworkers;
    std::mutex jobs_mtx;
    std::condition_variable jobs_cond;
};
typedef struct MANAGER nThreadPool;

static void* nThreadEntranceFunc(void* arg){
    struct NWORKER* wkr = (struct NWORKER*)arg;
    struct MANAGER* pool = wkr->pool;
    while(1){
        std::unique_lock lock(pool->jobs_mtx);
        while(pool->jobs.empty()){
            if(wkr->terminate>0)break;
            wkr->idle = 1;
            //[1.解锁 2.当前线程入队cond_wait],wait函数会保证这两步一起执行,不被打断
            //如果该线程从cond_wait中被移出,表明已经被唤醒,会进入抢锁状态,只有拿到锁后才会退出wait
            pool->jobs_cond.wait(lock);
        }
        if(wkr->terminate>0){
            lock.unlock();
            break;
        }
        wkr->idle = 0;
        struct NJOB* job = pool->jobs.back();
        pool->jobs.pop_back();
        lock.unlock();
        job->func(job->user_data);
        free(job->user_data);
        free(job);
    }
    free(wkr);
    return NULL;
}

int nThreadPoolInit(nThreadPool *pool, int numworkers){
    if(numworkers<1) numworkers=1;
    if(pool==NULL) return -1;
    int i=0;
    for(;ipool = pool;
        std::thread* t = new(&(wkr->th)) std::thread(nThreadEntranceFunc,(void*)wkr);
        //t->detach();
        pool->workers.push_front(wkr);
        pool->numworkers += 1;
    }
    return 0;
}

void nThreadPoolJobPush(nThreadPool *pool, struct NJOB* job){
    std::unique_lock lock(pool->jobs_mtx);
    pool->jobs.push_front(job);
    //如果cond_wait队列不为空,则至少将一个线程从cond_wait内部的阻塞状态唤醒
    pool->jobs_cond.notify_one();
}

void nThreadPoolDestroy(nThreadPool *pool){
    std::list::iterator it;
    for(it=pool->workers.begin();it!=pool->workers.end();it++)
        (*it)->terminate=1;
    pool->jobs_cond.notify_all();
    // make sure pool is terminate in real.
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    return;
}

int  nThreadPoolWorkerAdd(nThreadPool *pool,int num){
    if(pool==NULL)
        return  -1;
    for(int i=0;ipool = pool;
        std::thread* t = new(&(wkr->th)) std::thread(nThreadEntranceFunc,(void*)wkr);
        //t->detach();
        pool->workers.push_front(wkr);
        pool->numworkers+=1;
    }
    return pool->numworkers;
}

int  nThreadPoolWorkerDrop(nThreadPool *pool,int num){
    int count=0;
    std::unique_lock lock(pool->jobs_mtx);
    std::list::iterator it;
    for(it=pool->workers.begin();it!=pool->workers.end();it++){
        if((*it)->idle>0){
            (*it)->terminate=1;
            count++;
        }
        if(count>=num)break;
    }
    pool->jobs_cond.notify_all();
    return count;
}


struct ArgCallback{
    char name[15];
    int age;
};

void callbacks(void* arg){
    struct ArgCallback* para=(struct ArgCallback*)arg;
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    printf("name=%s, age=%dn",para->name,para->age);
}

void EXIT(int sig){
    printf("外部信号:%d,主进程退出n",sig);
    exit(0);
}

int main(int argc, char* args[]){
    nThreadPool* Pool=new nThreadPool;
    nThreadPoolInit(Pool,50);
    for(int i=0;i<500;i++){
        struct NJOB* job=(struct NJOB*)malloc(sizeof(struct NJOB));
        memset(job,0,sizeof(struct NJOB));
        struct ArgCallback* arg=(struct ArgCallback*)malloc(sizeof(struct ArgCallback));
        memset(arg,0,sizeof(struct ArgCallback));
        sprintf(arg->name,"cpp_id_%d",i);
        arg->age=i;
        job->user_data=(void*)arg;
        job->func=callbacks;
        nThreadPoolJobPush(Pool,job);
    }
    int isdestroyed=0;
    while(1){
        char c = getchar();
        if(c=='d'){
            nThreadPoolDestroy(Pool);
            isdestroyed = 1;
        }
        if(isdestroyed!=0&&c=='q')
            break;
    }
    delete Pool;
    return 0;
}

可以考虑做成sdk以便调用。。。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4653946.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存