C++ 线程池

C++ 线程池,第1张

C++ 线程

转载:醍醐灌顶全方位击破C++线程池及异步处理 - 知乎 (zhihu.com)

重点:

转载的代码有点乱,一共有二种方法。

第一种:

重点:

1.采用嵌套类:为了不被外部访问。

SafeQueue.h

#pragma once

#include 
#include 
using namespace std;

template
class SafeQueue
{
public:
    SafeQueue() = default;
    ~SafeQueue() = default;

public:
    bool Empty()
    {
        unique_lock lock(mu);
        return m_queue.empty();
    }
    int Size()
    {
        unique_lock lock(mu);
        return m_queue.size();
    }
    //向队列添加元素
    void Enqueue(const T& t)
    {
        unique_lock lock(mu);
        m_queue.push(t);
    }
    //向队列取出元素
    bool Dequeue(T& t)
    {
        unique_lock lock(mu);
        if (m_queue.empty()) return false;
        
        t = move(m_queue.front());
        m_queue.pop();
        return true;
    }
private:
    queue m_queue;
    mutex mu;
};

ThreadPool.h

#pragma once

#include 
#include 
#include "SafeQueue.h"

class ThreadPool
{
private:
	class ThreadWorker      //内置线程工作类
	{
	public:
		ThreadWorker(ThreadPool* pool, const int id):m_pool(pool),m_id(id)
		{
		}
	public:
		void operator()()  //重载 *** 作符
		{
			function func;   //定义基础函数类
			bool m_dequeued;
			while (!m_pool->m_shutdown)
			{
				unique_lock lock(m_pool->mu);
				if (m_pool->m_queue.Empty())
				{
					m_pool->m_cv.wait(lock);
				}
				m_dequeued = m_pool->m_queue.Dequeue(func);
                if (m_dequeued)
                {
                    func();
                }
			}	
		}
	private:
		int m_id;           //工作线程
		ThreadPool* m_pool; //所属线程池
	};

public:
	ThreadPool(const int num) :m_threads(vector(num)), m_shutdown(false)
	{
	}
	ThreadPool(const ThreadPool&) = delete;
	ThreadPool(ThreadPool&&) = delete;
	ThreadPool& operator=(const ThreadPool&) = delete;
	ThreadPool& operator=(ThreadPool&&) = delete;

public:
	void Init()
	{
		for (int i=0;i
    auto submit(F&& f, Args&&... args) -> std::future {
        std::function func = std::bind(std::forward(f), std::forward(args)...);
        //封装获取任务对象,方便另外一个线程查看结果
        auto task_ptr = std::make_shared>(func);
        // Wrap packaged task into void function
        //利用正则表达式,返回一个函数对象
        std::function wrapper_func = [task_ptr]() {
            (*task_ptr)();               //这一步调用内部类的 *** 作符了
        };
        // 队列通用安全封包函数,并压入安全队列
        m_queue.Enqueue(wrapper_func);
        // 唤醒一个等待中的线程
		m_cv.notify_one();
        // 返回先前注册的任务指针
        return task_ptr->get_future();
    }
private:
	atomic m_shutdown;                //线程池是否关闭
	SafeQueue> m_queue;//执行函数安全队列,即任务队列
	vector m_threads;           //工作线程队列
	mutex mu;
	condition_variable m_cv;
};

Test.cpp

#include "ThreadPool.h"
#include  
using namespace std;

random_device rd;
mt19937 mt(rd());
std::uniform_int_distribution dist(-1000, 1000);//生成-1000到1000之间的离散均匀分部数
auto rnd = std::bind(dist, mt);
//设置线程睡眠时间
void simulate_hard_computation() {
    std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}
// 添加两个数字的简单函数并打印结果
void multiply(const int a, const int b) {
    simulate_hard_computation();
    const int res = a * b;
    std::cout << a << " * " << b << " = " << res << std::endl;
}

//添加并输出结果
void multiply_output(int& out, const int a, const int b) {
    simulate_hard_computation();
    out = a * b;
    std::cout << a << " * " << b << " = " << out << std::endl;
}

// 结果返回
int multiply_return(const int a, const int b) {
    simulate_hard_computation();
    const int res = a * b;
    std::cout << a << " * " << b << " = " << res << std::endl;
    return res;
}

void example() {
    // 创建3个线程的线程池
    ThreadPool pool(3);
    // 初始化线程池
    pool.Init();
    // 提交乘法操作,总共30个
    for (int i = 1; i < 2; ++i) {
        for (int j = 1; j < 10; ++j) {
            pool.submit(multiply, i, j);
        }
    }
    // 使用ref传递的输出参数提交函数
    int output_ref;
    auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
    // 等待乘法输出完成
    future1.get();
    std::cout << "Last operation result is equals to " << output_ref << std::endl;
    // 使用return参数提交函数
    auto future2 = pool.submit(multiply_return, 5, 3);
    // 等待乘法输出完成
    int res = future2.get();
    std::cout << "Last operation result is equals to " << res << std::endl;

    //关闭线程池
    pool.ShutDown();
}



int main()
{
    example();
    return 0;
}
第二种:

Threadpool.h

#pragma once

#include 
#include 
#include 
#include 
#include 
using namespace std;
using Task = function;

class ThreadPool
{
public:
    ThreadPool(size_t size = 4);
	~ThreadPool();
public:
    template
    auto Commit(T&& t, Args&&...args)->future
    {
        if (m_stop.load())
        {
            throw runtime_error("task has closed commit");
        }
        using ResType = decltype(t(args...));
        auto task = make_shared>(
            bind(forward(t), forward(args)...));

        unique_lock lock(mu);
        m_tasks.emplace([task]() {
            (*task)();
            });
        m_cv.notify_all(); //唤醒等待线程
        future fu = task->get_future();
        return fu;
    }
public:
	void ShutDown(); //停止任务提交
	void Restart(); //重启任务提交
	
private:
	Task GetoneTask();//获取一个待执行的task
	void Schedual();  //任务调度
private:
	vector m_pool;
	mutex mu;
	queue m_tasks;
	condition_variable m_cv;
	atomic m_stop;
};

ThreadPool.cpp

#include "ThreadPool.h"
#include 

ThreadPool::ThreadPool(size_t size) :m_stop{false}
{
    size = size < 1 ? 1 : size;
    for (size_t i=0;i lock(mu);
    m_cv.wait(lock, [this] {return !m_tasks.empty(); });
    Task task(move(m_tasks.front()));
    m_tasks.pop();
    return task;
}

void ThreadPool::Schedual()
{
    while (true)
    {
        if (Task task =GetoneTask())
        {
            task();
        }
        else
        {
            return;   //结束
        }
    }
}

Test.cpp

// Test.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//

#include 
#include 
#include "ThreadPool.h"
using namespace std;

void fun()
{
    for (int i = 0; i < 100000; ++i)
    {
        cout << "hello"< ff = task.Commit(fun);
        future fg = task.Commit(Gan());
        future fs = task.Commit([]()->string {
            return "hello,fs";
            });
       
        task.ShutDown();
        ff.get();
        
        cout << "fg.get : " << fg.get ()<< endl;
        this_thread::sleep_for(chrono::seconds(5));
        task.Restart(); //重启任务
        cout << "end " << endl;
        return 0;
	}
	catch (const std::exception& e)
	{
        cout << "soming is wrong "<< e.what() << endl;
	}
    return 0;
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5693426.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)