C++11 - 构建一个符合实际应用要求的线程池

C++11 - 构建一个符合实际应用要求的线程池,第1张

C++11 - 构建一个符合实际应用要求的线程

文章目录
  • 1 什么是线程池
  • 2 C++实现一个线程池
    • 2.1 线程安全的单例类
    • 2.2 线程安全的队列类
    • 2.3 线程池
    • 2.4 工作线程
      • 2.4.1 ThreadWorker.h
      • 2.4.2 ThreadWorker.cpp
    • 2.5 线程池测试代码

1 什么是线程池

线程池从本质上可以看做是一个多生产者多消费者的多线程应用。

一个线程池包括以下四个基本组成部分:

  • 线程池管理器:用于创建并管理线程池,包括创建线程池,销毁线程池,添加新的工作线程,添加工作任务;
  • 工作线程:属于线程池中的线程,用于处理实际任务,在没有工作任务时等待,在任务队列不为空时主动获取任务并处理任务;
  • 任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行;
  • 工作任务队列:用于存放需要处理的工作任务,采用先进先出机制;

线程池根据机器性能预先创建多个工作线程,位于主线程的线程池接收到工作任务并存入到工作任务队列中,工作线程从工作队列中取出工作任务进行处理,如果工作队列为空,则工作线程进入挂起状态。

2 C++实现一个线程池

在C++中实现一个线程池,通过对线程池的特性分析,线程池主要有以下功能:

  • 线程池可以创建给定数量的工作线程,工作线程执行在子线程中开启任务函数,并等待工作队列的新任务
  • 线程池可主动关闭线程池并结束所创建的工作线程;
  • 线程池对象可被多个工作线程互斥访问,我们可以将线程池看作为一个单例,而这个单例可满足被多个线程互斥访问,这需要实现一个线程安全的单例模式;
  • 需要实现一个线程安全的队列,在队列中有新任务压入时通知工作线程领取工作任务,当队列为空时阻塞线程,避免资源开销;并支持主动解锁。
2.1 线程安全的单例类

Singleton.h

#ifndef SINGLETON_H
#define SINGLETON_H

#include 
#include 


template
class Singleton
{
public:
	// 获取全局单例对象
	template
	static std::shared_ptr GetInstance(Args&&... args)
	{
		if (!m_pSingleton)
		{
			std::lock_guard gLock(m_Mutex);

			if (nullptr == m_pSingleton)
			{
				m_pSingleton = std::make_shared(std::forward(args)...);
			}
		}
		return m_pSingleton;
	}

	// 主动析构单例对象(提供接口,但是不建议主动调用)
	static void DeleteInstance()
	{
		if (m_pSingleton != nullptr)
		{
			m_pSingleton.reset();
			m_pSingleton = nullptr;
		}
	}

private:
	explicit Singleton();
	Singleton(const Singleton&) = delete;
	Singleton& operator=(const Singleton&) = delete;
	~Singleton() = default;

private:
	static std::shared_ptr m_pSingleton;
	static std::mutex m_Mutex;
};

template
std::shared_ptr Singleton::m_pSingleton = nullptr;

template
std::mutex Singleton::m_Mutex;

#endif

2.2 线程安全的队列类

参考项目链接:https://github.com/alfredopons/queue-thread-safe

并对该线程安全类进行了魔改,主要增加了不管当前队列是否有工作任务,都可强制退出的新功能,方便线程池退出。

SafeQueue.hpp

#ifndef SAFEQUEUE_HPP_
#define SAFEQUEUE_HPP_

#include 
#include 
#include 
#include 
#include 
#include 
 
 
 

template >
class SafeQueue
{
 
    typedef typename Container::value_type value_type;
    typedef typename Container::size_type size_type;
    typedef Container container_type;
 
  public:
 
    
    SafeQueue() = default;
    SafeQueue (SafeQueue&& sq)
    {
      m_queue = std::move (sq.m_queue);
    }
    SafeQueue (const SafeQueue& sq)
    {
      std::lock_guard lock (sq.m_mutex);
      m_queue = sq.m_queue;
    }
 
    
    ~SafeQueue()
    {
      std::lock_guard lock (m_mutex);
    }
 
    
    void set_max_num_items (unsigned int max_num_items)
    {
      m_max_num_items = max_num_items;
    }
 
    
    bool push (const value_type& item)
    {
      std::lock_guard lock (m_mutex);
 
      if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
        return false;
 
      m_queue.push (item);
      m_condition.notify_one();
      return true;
    }
 
    
    bool push (const value_type&& item)
    {
      std::lock_guard lock (m_mutex);
 
      if (m_max_num_items > 0 && m_queue.size() > m_max_num_items)
        return false;
 
      m_queue.push (item);
      m_condition.notify_one();
      return true;
    }
 
    
    void pop (value_type& item)
    {
      std::unique_lock lock (m_mutex);
      m_condition.wait (lock, [this]() // Lambda funct
      {
        return !m_queue.empty() || m_Unlock;
      });

      if (!m_queue.empty())
      {
          item = m_queue.front();
          m_queue.pop();
      }

    }
 
    
    void move_pop (value_type& item)
    {
      std::unique_lock lock (m_mutex);
      m_condition.wait (lock, [this]() // Lambda funct
      {
        return !m_queue.empty() || m_Unlock;
      });

      if (!m_queue.empty())
      {
          item = std::move(m_queue.front());
          m_queue.pop();
      }

    }
 
    
    bool try_pop (value_type& item)
    {
      std::unique_lock lock (m_mutex);
 
      if (m_queue.empty())
        return false;
 
      item = m_queue.front();
      m_queue.pop();
      return true;
    }
 
    
    bool try_move_pop (value_type& item)
    {
      std::unique_lock lock (m_mutex);
 
      if (m_queue.empty())
        return false;
 
      item = std::move (m_queue.front());
      m_queue.pop();
      return true;
    }
 
    
    bool timeout_pop (value_type& item, std::uint64_t timeout)
    {
      std::unique_lock lock (m_mutex);
 
      if (m_queue.empty())
        {
          if (timeout == 0)
            return false;
 
          if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
            return false;
        }
 
      item = m_queue.front();
      m_queue.pop();
      return true;
    }
 
    
    bool timeout_move_pop (value_type& item, std::uint64_t timeout)
    {
      std::unique_lock lock (m_mutex);
 
      if (m_queue.empty())
        {
          if (timeout == 0)
            return false;
 
          if (m_condition.wait_for (lock, std::chrono::microseconds (timeout)) == std::cv_status::timeout)
            return false;
        }
 
      item = std::move (m_queue.front());
      m_queue.pop();
      return true;
    }
 
    
    size_type size() const
    {
      std::lock_guard lock (m_mutex);
      return m_queue.size();
    }
 
    
    bool empty() const
    {
      std::lock_guard lock (m_mutex);
      return m_queue.empty();
    }
 
    
    void swap (SafeQueue& sq)
    {
      if (this != &sq)
        {
          std::lock_guard lock1 (m_mutex);
          std::lock_guard lock2 (sq.m_mutex);
          m_queue.swap (sq.m_queue);
 
          if (!m_queue.empty())
            m_condition.notify_all();
 
          if (!sq.m_queue.empty())
            sq.m_condition.notify_all();
        }
    }
 
    
    SafeQueue& operator= (const SafeQueue& sq)
    {
      if (this != &sq)
        {
          std::lock_guard lock1 (m_mutex);
          std::lock_guard lock2 (sq.m_mutex);
          std::queue temp {sq.m_queue};
          m_queue.swap (temp);
 
          if (!m_queue.empty())
            m_condition.notify_all();
        }
 
      return *this;
    }
 
    
    SafeQueue& operator= (SafeQueue && sq)
    {
      std::lock_guard lock (m_mutex);
      m_queue = std::move (sq.m_queue);
 
      if (!m_queue.empty())  m_condition.notify_all();
 
      return *this;
    }

    void lock()
    {
        m_Unlock = false;
    }

    void unlock()
    {
        m_Unlock = true;
        m_condition.notify_all();
    }
 
    void clear()
    {
        std::lock_guard lock(m_mutex);

        while (!m_queue.empty())
        {
            m_queue.pop();
        }
    }
 
  private:
 
    std::queue m_queue;
    mutable std::mutex m_mutex;
    std::condition_variable m_condition;
    unsigned int m_max_num_items = 0;
    std::atomic m_Unlock = false;
};
 

template 
void swap (SafeQueue& q1, SafeQueue& q2)
{
  q1.swap (q2);
}
#endif 

2.3 线程池

ThreadPool.h

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include 
#include 
#include 

#include "SafeQueue.hpp"
#include "ThreadWorker.h"
#include "Singleton.h"

class ThreadPool
{
public:
	ThreadPool():
		m_bThreadPoolStop(false)
	{

	}

	static std::shared_ptr GetSingleton()
	{
		return Singleton::GetInstance();
	}

	void CreateThreads(unsigned int worker_num)
	{
		for (int i = 0; i < worker_num; i++)
		{
			std::shared_ptr pThreadWorker = std::make_shared(i);

			std::shared_ptr pThread = std::make_shared(&ThreadWorker::Run, pThreadWorker);

			m_ThreadWorkers.push_back(pThreadWorker);

			m_Threads.push_back(pThread);
		}
	}

	virtual~ThreadPool()
	{
		m_TaskQueue.clear();
	}

	void AddTask(const std::string& task_str)
	{
		m_TaskQueue.push(task_str);
	}

	bool IsStop()
	{
		return m_bThreadPoolStop;
	}

	void Stop()
	{
		m_bThreadPoolStop = true;
		m_TaskQueue.unlock();

		for (int i = 0; i < m_Threads.size(); ++i)
		{
			if (m_Threads[i]->joinable())
			{
				m_Threads[i]->join();
			}
		}
	}

	SafeQueue& GetTaskQueue()
	{
		return m_TaskQueue;
	}

private:
	std::atomic m_bThreadPoolStop;
	SafeQueue m_TaskQueue;
	std::vector> m_ThreadWorkers;
	std::vector> m_Threads;
	
};

#endif // !THREAD_POOL_H

2.4 工作线程 2.4.1 ThreadWorker.h
#ifndef THREAD_WORKER_H
#define THREAD_WORKER_H

class ThreadPool;
class ThreadWorker
{
public:
	ThreadWorker();

	ThreadWorker(int thread_index);

	virtual~ThreadWorker();

	void Run();

private:
	int m_ThreadIndex;
};

#endif // !THREAD_WORKER_H

2.4.2 ThreadWorker.cpp
#include 
#include "ThreadWorker.h"
#include "ThreadPool.h"

ThreadWorker::ThreadWorker()
{
}

ThreadWorker::ThreadWorker(int thread_index)
	:m_ThreadIndex(-1)
{
	m_ThreadIndex = thread_index;

	std::cout << "线程" << m_ThreadIndex << "被创建" << std::endl;
}

ThreadWorker::~ThreadWorker()
{
}

void ThreadWorker::Run()
{
	while (!ThreadPool::GetSingleton()->IsStop())
	{
		std::string task_str = "";
		ThreadPool::GetSingleton()->GetTaskQueue().pop(task_str);
		if (!task_str.empty())
		{
			std::cout << "线程" << m_ThreadIndex << "处理了" << task_str << std::endl;
		}		
	}

	std::cout << "子线程"< 
2.5 线程池测试代码 

main.cpp

#include 
#include 
#include 

#include "ThreadPool.h"


int main()
{


	ThreadPool::GetSingleton()->CreateThreads(10);

	std::thread prod_thread1([&]() {
		for(int i=0;i<3000;i++)
		{
			ThreadPool::GetSingleton()->AddTask(std::to_string(i));
			std::this_thread::sleep_for(std::chrono::milliseconds(1));
		}

		std::cout << "生产线程1退出" << std::endl;
		});

	std::thread prod_thread2([&]() {
		for (int i = 0; i < 3000; i++)
		{
			ThreadPool::GetSingleton()->AddTask(std::to_string(i));
			std::this_thread::sleep_for(std::chrono::milliseconds(1));
		}

		std::cout << "生产线程2退出" << std::endl;
		});

	if(prod_thread1.joinable())
		prod_thread1.join();

	if (prod_thread2.joinable())
		prod_thread2.join();

	ThreadPool::GetSingleton()->Stop();


	std::cout << "执行完成" << std::endl;

	return 0;
}


执行结果:

如果有兴趣可以访问我的个站:https://www.stubbornhuang.com/,更多干货!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5155070.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-18
下一篇 2022-11-18

发表评论

登录后才能评论

评论列表(0条)

保存