1、C++设计模式——单例模式
2、C++基础组件——线程池实现
- C++系列文章目录
- 前言
- 一、线程池定义
- 线程池解决什么问题
- 二、C语言实现
- 三、C++实现
- 1.v1版本
- 问题
- 2.v2版本
- 总结
前言
实现线程池,给出C语言版以及两个C++版的实现方法,解决了一些常见问题,总结了遇到的问题及解决方法。
一、线程池定义
线程池解决什么问题池化技术,起到了建立缓冲区的作用:内存池、数据库连接池、消息池(消息队列)、对象池
1、解决任务处理
2、阻塞IO
3、解决线程创建与销毁的成本问题
4、管理线程
在日志系统中,直接写磁盘的话,性能会被限制在磁盘的读写能力上面,引入线程池,达到异步解耦的作用
二、C语言实现三大组件:
1、执行队列,线程
2、任务队列,任务
3、管理组件
#include
#include
#include
#include
#include
#include
#include
//双链表表示
typedef struct Worker{
pthread_t thread;
struct Manager *pool;//指向管理者
int terminate;//工作线程终止符
struct Worker *prev;
struct Worker *next;
}WORK;
typedef struct Job{
void *(*process) (void *arg);//执行函数
void *arg;/*回调函数的参数*/
struct Job *prev;
struct Job *next;
}JOB;
typedef struct Manager{
WORK *workers;//指向第一个工作线程
JOB *jobs;//指向第一个任务
pthread_cond_t jobs_cond;//接收工作信号
pthread_mutex_t jobs_mutex;//标记锁定任务
}ThreadPool;
int pool_add_job(ThreadPool *pool,JOB *job);//添加任务
void *nThreadCallback(void *arg);
//线程池初始化
int ThreadPool_init(ThreadPool *pool,int max_thread_num){
if(max_thread_num < 1) max_thread_num = 1;
if(pool == NULL) return -1;
memset(pool,0,sizeof(ThreadPool));
//初始化信号量
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));
//初始化信号量
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t));
for(int i=0;i<max_thread_num;i++){
WORK *worker = (WORK*)malloc(sizeof(WORK));
if(worker == NULL){
printf("malloc");
return -2;
}
memset(worker,0,sizeof(WORK));
worker->pool = pool;
worker->next=NULL;
worker->prev=NULL;
int ret = pthread_create(&worker->thread,NULL,nThreadCallback,worker);//创建工作线程,监听nThreadCallback函数,传入worker参数
if(ret){
printf("pthread_create");
free(worker);
return -3;
}
//加入线程池
if(pool->workers == NULL){//第一个结点:pool->workers指向第一个结点
pool->workers=worker;
worker->next=NULL;
worker->prev=NULL;
}
else{//插入其余结点
worker->next=pool->workers;
pool->workers->prev=worker;
pool->workers=worker;
}
}
return 0;
}
//加入任务
int pool_add_job(ThreadPool *pool,JOB *job){
pthread_mutex_lock(&pool->jobs_mutex);//加锁
//加入任务链表
if(pool->jobs == NULL){
pool->jobs=job;
job->next=NULL;
job->prev=NULL;
}
else{
job->next=pool->jobs;
pool->jobs->prev=job;
pool->jobs=job;
}
pthread_cond_signal(&pool->jobs_cond);//传递信号:告诉工作线程开始执行
pthread_mutex_unlock(&pool->jobs_mutex);//解锁
return 0;
}
void *nThreadCallback(void *arg){//回调函数
printf("starting thread 0x%ld\n",pthread_self());
WORK *worker = (WORK*)arg;
while(1){
pthread_mutex_lock(&(worker->pool->jobs_mutex));//上锁
while(worker->pool->jobs == NULL){//任务队列为空则等待
if(worker->terminate) break;
pthread_cond_wait(&worker->pool->jobs_cond,&worker->pool->jobs_mutex);//若无信号传来则一直阻塞
}
if(worker->terminate){//销毁工作线程
pthread_mutex_unlock(&(worker->pool->jobs_mutex));
printf ("thread 0x%ld will exit\n", pthread_self ());
pthread_exit (NULL);
break;
}
//得到执行信号,开始执行任务
printf ("thread 0x%ld is starting to work\n", pthread_self ());
JOB *job=worker->pool->jobs;
//移除任务
assert(job != NULL);//断言,任务结点不为空
worker->pool->jobs=job->next;
if(worker->pool->jobs != NULL)//worker->pool->jobs指向真正的任务结点时才将前驱结点置空
worker->pool->jobs->prev=NULL;
job->next=NULL;
pthread_mutex_unlock(&(worker->pool->jobs_mutex));//解锁
(*(job->process)) (job->arg);//执行函数
}
//释放资源
free(worker);
pthread_exit(NULL);
}
//销毁线程池
void ThreadPoolDestroy(ThreadPool *pool){
for(WORK *worker = pool->workers; worker != NULL; worker = worker->next){
worker->terminate = 1;//将每个工作线程的终止符设为1
}
pthread_mutex_lock(&(pool->jobs_mutex));//上锁
pthread_cond_broadcast(&pool->jobs_cond);//传出信号量:结束到信号的工作线程开始销毁
pthread_mutex_unlock(&(pool->jobs_mutex));//解锁
}
void *myprocess (void *arg)
{
printf ("threadid is 0x%ld, working on task %d\n", pthread_self (),*(int *) arg);
sleep (1);/*休息一秒,延长任务的执行时间*/
return NULL;
}
int main (int argc, char **argv)
{
ThreadPool *pool = (ThreadPool*)malloc(sizeof(ThreadPool));
ThreadPool_init(pool,3);
/*连续向池中投入10个任务*/
int tasknum[10];
for(int i=0;i<10;i++){
tasknum[i]=i;
JOB *job = (JOB*)malloc(sizeof(JOB));
memset(job,0,sizeof(JOB));
job->process = myprocess;
job->arg = &tasknum[i];//需要传入不同的地址
job->next=NULL;
job->prev=NULL;
pool_add_job(pool,job);
}
/*等待所有任务完成*/
sleep (5); //这句可能出问题,偷懒写法。任务执行时间长时,当main函数退出时,所有子线程被迫退出
/*销毁线程池*/
ThreadPoolDestroy (pool);
return 0;
}
每个工作线程实际上一直在执行回调函数,在wait语句处阻塞,等待插入任务。传递信号量后,所有线程争抢任务,得到任务的线程执行,其余线程继续等待。
对任务进行加锁是防止多个线程同时取得任务,对任务进行修改,任务只能是固定参数。
三、C++实现 1.v1版本
使用单例模式包装线程池,全局仅产生一个对象
#pragma once
#ifndef _MYTHREADPOOL_V1_H_
#define _MYTHREADPOOL_V1_H_
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
class MyThreadPool_v1{
private:
static MyThreadPool_v1* instance;
using ThreadPoolTask = std::function<void()>;// 包装器包装void类型
typedef struct Task {
ThreadPoolTask task;
std::string name;
Task() {
this->task = nullptr;
this->name = "";
}
Task(ThreadPoolTask task, std::string name) {
this->task = task;
this->name = name;
}
}Task;
std::queue<Task> Tasks; //任务队列
std::vector<std::unique_ptr<std::thread>> WorkThreads; //工作线程组,独占指针指向每一个线程
std::mutex taskmutex; //互斥锁
std::condition_variable cond; //条件变量
static std::atomic<bool>running; //原子 *** 作保证多线程不会产生争夺
MyThreadPool_v1();
~MyThreadPool_v1();
void CallBack(int id);
//防止复制
MyThreadPool_v1(MyThreadPool_v1 const&) = delete;
MyThreadPool_v1& operator=(MyThreadPool_v1 const&) = delete;
public:
static MyThreadPool_v1* GetInstance();
void PushTask(ThreadPoolTask task, std::string name);
void desstory();
};
#endif // !_MYTHREADPOOL_V1_H_
#include"MyThreadPool_v1.h"
#include "windows.h"
#include
/**/
MyThreadPool_v1* MyThreadPool_v1::instance = nullptr;
std::atomic<bool> MyThreadPool_v1::running = true; //原子 *** 作保证多线程不会产生争夺
MyThreadPool_v1* MyThreadPool_v1::GetInstance() {
std::cout << "instance" << std::endl;
if (nullptr == instance) { //双重检验
static std::mutex mutex; //新建一个静态锁
std::lock_guard<std::mutex>lock(mutex); //上锁
if (nullptr == instance) {
//生成MyThreadPool_v1对象封装为共享指针赋值给单例对象
instance = new MyThreadPool_v1();
}
}
return instance;
}
MyThreadPool_v1::MyThreadPool_v1() {
std::cout << "构造函数" << std::endl;
int cpunums = 4;
//std::condition_variable(cond); //初始化条件变量
for (int i = 0; i < cpunums; i++) {
//std::lock_guardlock(taskmutex); //上锁
auto func = std::bind(&MyThreadPool_v1::CallBack, this, i);//绑定
WorkThreads.push_back(std::make_unique<std::thread>(func));//加入指向func的独占指针,类型是线程类
}
printf("WorkThreads.size(): %d\n", WorkThreads.size());
}
MyThreadPool_v1::~MyThreadPool_v1() {
{
std::unique_lock<std::mutex> lock(taskmutex);
running = false;
}
//通知所有工作线程重新抢锁并重新判断wait条件,唤醒后因为running为false了,所以都会结束
cond.notify_all();//通知全部线程
// 等待所有工作线程结束
for (const auto &workthread : WorkThreads) {
workthread->join();//由于线程都开始竞争了,因此必定会执行完,join可等待线程执行完
}
}
void MyThreadPool_v1::CallBack(int id) {
printf("WorkThread: %d start\n", id);
while (1) {
std::unique_lock<std::mutex> lock(this->taskmutex);
while (Tasks.empty()) { //当任务队列为空时一直循环
if (running == false) {
break;
}
cond.wait(lock); //阻塞
}
if (running == false) {
break;
}
Task task;
{ //同步 *** 作
task = Tasks.front();
Tasks.pop();
}
//解锁
printf("Work(%d):start\n", id);
task.task();//执行任务
char ch[10];
strcpy_s(ch, task.name.c_str());
printf("正在执行的任务id:%s\n",ch);
printf("Work(%d):end\n", id);
}
printf("Work(%d):stoped\n", id);
}
void MyThreadPool_v1::PushTask(ThreadPoolTask task, std::string name) {
{
//加锁
std::unique_lock<std::mutex> lock(taskmutex);
Tasks.push({ task,name });
char ch[10];
strcpy_s(ch, name.c_str());
}
cond.notify_one();//通知一个进程
}
void MyThreadPool_v1::desstory() {
{
std::unique_lock<std::mutex> lock(taskmutex);
running = false;
}
//通知所有工作线程重新抢锁并重新判断wait条件,唤醒后因为running为false了,所以都会结束
cond.notify_all();//通知全部线程
// 等待所有工作线程结束
for (const auto &workthread : WorkThreads) {
workthread->join();//由于线程都开始竞争了,因此必定会执行完,join可等待线程执行完
}
}
#include"MyThreadPool_v1.h"
#include
#include
using namespace std;
void funct() {
cout << "进入到函数:" << endl;
Sleep(2);
}
int main() {
auto ThreadPool = MyThreadPool_v1::GetInstance();// 拿到线程池单例对象
for (int i = 0; i < 10; i++) {
ThreadPool->PushTask(funct, to_string(i));
}
Sleep(5); //防止执行过快程序直接结束
ThreadPool->desstory(); //线程池销毁时,判断所有线程是否执行完毕,进行阻塞,避免直接return 0中止未完成的线程
return 0; //任务执行时间长,但是程序已经运行到结尾,会直接中止所有线程
}
问题
当前台线程结束后,程序结束,main函数结束会强制所有后台子线程结束
1.任务执行时间长时,当main函数退出时,所有子线程被迫退出
2.单例线程池无法自动析构
3.任务函数参数不可变
2.v2版本使用智能指针的单例模式实现线程池
使用C++11新特性实现可变参数模板,可以执行不同参数的任务
#pragma once
#ifndef _MYTHREADPOOL_V2_H_
#define _MYTHREADPOOL_V2_H_
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
class MyThreadPool_v2 {
public:
static std::shared_ptr<MyThreadPool_v2> GetInstacne(int threadnums);
~MyThreadPool_v2();
//可变参数,利用后置decltype推导类型传递给auto
template<class F, class... Args>
auto PushTask(F&& f, Args&&... args) //任务管道函数
-> std::future<typename std::result_of<F(Args...)>::type>;
private:
MyThreadPool_v2(int threadnums);
using ThreadPoolTask = std::function<void()>;// 包装器包装void类型
std::queue<ThreadPoolTask> Tasks; //任务队列
std::vector<std::thread> WorkThreads; //工作线程组,独占指针指向每一个线程
std::mutex taskmutex; //互斥锁
std::condition_variable cond; //条件变量
std::atomic<bool>running = true; //原子 *** 作保证多线程不会产生争夺
};
//模板不能和定义分离,在编译时期不知道参数定义
template <typename F, typename... Args>
auto MyThreadPool_v2::PushTask(F&& f, Args&& ...args)
->std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
//指向packaged_task的共享指针
auto task = std::make_shared<std::packaged_task<return_type()>>( //packaged_task包装一个可调用对象,可以传递给future对象,使其在另一线程获取结果
std::bind(std::forward<F>(f), std::forward<Args>(args)...) //完美转发,保持函数和参数类型不变
);
//获取任务future
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex>lock(taskmutex);//独占加锁
if (!running) {
throw std::runtime_error("PushTask on stopped ThreadPool");
}
Tasks.emplace([task]() { (*task)(); });
// 发送通知,唤醒一个wait状态的工作线程重新抢锁并重新判断wait条件
cond.notify_one();
return res;
}
}
#endif // !_MYTHREADPOOL_V2_H_
#include"MyThreadPool_v2.h"
std::shared_ptr<MyThreadPool_v2> MyThreadPool_v2::GetInstacne(int threadnums) {
//第一次声明静态对象是线程安全的,存在静态区域
static std::shared_ptr<MyThreadPool_v2> instacne_v2(new MyThreadPool_v2(threadnums));
return instacne_v2;
}
MyThreadPool_v2::MyThreadPool_v2(int threadnums) {
//int threadnums = 4;
printf("构造函数\n");
for (int i = 0; i < threadnums; ++i) {
//每一个线程执行一个匿名函数
printf("线程%d开始执行\n", i);
//匿名函数包含加锁,阻塞,取任务,执行,即之前的回调函数
WorkThreads.emplace_back
(
[this]
{
for (;;)
{
ThreadPoolTask task;//任务对象
{
std::unique_lock<std::mutex> lock(this->taskmutex);
/**等待条件成立(当线程池被关闭或有可消费任务时跳过wait继续;否则cond.wait将会unlock释放锁,
* *其他线程可以继续拿锁,但此线程会阻塞此处并休眠,直到被notify_*唤醒,被唤醒时wait会再次lock并判断条件是否成立,
* *如成立则跳过wait,否则unlock并休眠继续等待下次唤醒)
* */
this->cond.wait(lock, [this] { return !this->running || !this->Tasks.empty(); });
// 如果线程池停止且任务队列为空,说明需要关闭线程池结束返回
if (!running&&Tasks.empty()) {
return;
}
// 取得任务队首任务(注意此处的std::move)
task = std::move(this->Tasks.front());
// 从队列移除
this->Tasks.pop();
}
//执行任务
task();
}
}
);
}
}
MyThreadPool_v2::~MyThreadPool_v2() {
printf("析构函数\n");
{
std::unique_lock<std::mutex> lock(taskmutex);
running = false;
}
//通知所有工作线程重新抢锁并重新判断wait条件,唤醒后因为running为false了,所以都会结束
cond.notify_all();//通知全部线程
// 等待所有工作线程结束
for (auto &workthread : WorkThreads) {
workthread.join();//由于线程都开始竞争了,因此必定会执行完,join可等待线程执行完
}
}
#include
#include
#include
#include
#include "MyThreadPool_v2.h"
int main() {
auto pool = MyThreadPool_v2::GetInstacne(4);
/**/
std::vector< std::future<int> > results; //线程执行结果
// 批量执行线程任务
for (int i = 0; i < 8; ++i) {
results.emplace_back( // 保存线程执行结果到results
pool->PushTask([i] { // 添加一个新的工作任务到线程池
std::cout << "hello " << i << std::endl;
//std::this_thread::sleep_for(std::chrono::seconds(1));
//std::cout << "world " << i << std::endl;
return i * i;
})
);
}
// 打印线程执行结果
for (auto && result : results)
std::cout << "result:" << result.get() << std::endl;
std::cout << std::endl;
return 0;
}
在单例模式中使用了智能指针,最后的析构交由智能指针实现。C++11标准中静态对象的线程安全创建,其静态对象必须在GetInstance函数中第一次声明,不必使用双重检验来保证线程安全。PushTask函数定义了可变参数模板,使用了C++11的新特性,后置推导参数类型传递给auto,这两篇文章讲的非常详细参考1,参考2。
总结
记录一下实现线程池遇到的问题,并和之前的单例模式结合实现,其应用场景非常广泛,其中若有bug欢迎大家指出,期待指导。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)