线程池
线程是一种比较昂贵的资源.有些系统为了重用线程.引入了线程池的机制.
线程池的工作原理如下:
首先.系统会启动一定数量的线程.这些线程就构成了一个线程池.当有任务要做的时候.系统就从线程池里面选一个空闲的线程.然后把这个线程标记为“正在运行”.然后把任务传给这个线程执行.线程执行任务完成之后.就把自己标记为空闲.这个过程并不难以理解.难以理解的是.一般来说.线程执行完成之后.运行栈等系统资源就会释放.线程对象就被回收答答了.一个已经完成的线程.又如何能回到线程池的空闲线程队列中呢 秘诀就在于.线程池里面的线程永消友远不会执行完成.线程池里面的线程都是一个无穷循环
ThreadStarter.h
#ifndef __THREADSTARTER_H__
#define __THREADSTARTER_H__
#include windows.h
线程接口
class ThreadBase
{
public
ThreadBase() {}
virtual ~ThreadBase() {}
virtual bool run() = 0线程函数
virtual void OnShutdown() {}
HANDLE THREAD_HANDLE
}
#endif
Threads.h
#ifndef __CTHREADS_H__
#define __CTHREADS_H__
#include ThreadStarter.h
线程的状态
enum CThreadState
{
THREADSTATE_TERMINATE = 0,终止
THREADSTATE_PAUSED= 1,暂停
THREADSTATE_SLEEPING = 2,睡眠
THREADSTATE_BUSY = 3,忙碌
THREADSTATE_AWAITING = 4,等候
}
线程基类
class CThread public ThreadBase
{
public
CThread()
~CThread()
virtual bool run()
virtual void OnShutdown()
设置清桥慧线程的状态
__forceinline void SetThreadState(CThreadState thread_state)
{
ThreadState = thread_state
}
返回线程的状态
__forceinline CThreadState GetThreadState()
{
return ThreadState
}
返回线程ID
int GetThreadId()
{
return ThreadId
}
time_t GetStartTime()
{
return start_time
}
protected
CThreadState ThreadState线程的状态
time_t start_time
int ThreadId线程ID
}
#endif
Threads.cpp
#include stdafx.h
#include CThreads.h
CThreadCThread() ThreadBase()
{
初试化线程的状态为等候
ThreadState = THREADSTATE_AWAITING
start_time = 0
}
CThread~CThread()
{
}
bool CThreadrun()
{
return false
}
void CThreadOnShutdown()
{
SetThreadState(THREADSTATE_TERMINATE)
}
Mutex.h
#ifndef __MUTEX_H__
#define __MUTEX_H__
#include windows.h
多个线程 *** 作相同的数据时,一般是需要按顺序访问的,否则会引导数据错乱
为解决这个问题,就需要引入互斥变量,让每个线程都按顺序地访问变量。
class Mutex
{
public
Mutex()
~Mutex()
__forceinline void Acquire()
{
EnterCriticalSection(&cs)
}
__forceinline void Release()
{
LeaveCriticalSection(&cs)
}
例如:
线程 *** 作函数。
int AddCount(void)
{
EnterCriticalSection(&cs)
int nRet = m_nCount++
LeaveCriticalSection(&cs)
return nRet
}
在函数AddCount里调用EnterCriticalSection和LeaveCriticalSection来互斥访问变量m_nCount。
通过上面这种方法,就可以实现多线程按顺序地访问相同的变量
__forceinline bool AttemptAcquire()
{
一个线程也可以调用TryEnterCriticalSection函数来请求某个临界区的所有权,此时即
使请求失败也不会被阻塞
return 0(TryEnterCriticalSection(&cs) == TRUE true false)
}
protected
CRITICAL_SECTION cs临界区是一种防止多个线程同时执行一个特定代码节的机制
}
#endif
Mutex.cpp
#include stdafx.h
#include Mutex.h
MutexMutex()
{
创建临界区对象
InitializeCriticalSection(&cs)
}
Mutex~Mutex()
{
删除临界区对象
DeleteCriticalSection(&cs)
}
ThreadPool.h
#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__
#include ThreadStarter.h
#include Mutex.h
#include windows.h
#include assert.h
#include set
typedef unsigned int uint32
typedef signed __int32 int32
线程管理
class ThreadController
{
public
HANDLE hThread
uint32 thread_id
void Setup(HANDLE h)
{
hThread = h
}
void Suspend()
{
assert(GetCurrentThreadId() == thread_id)
当线程做完任务或者现在想暂停线程运行,就需要使用SuspendThread来暂停线程的执行
SuspendThread(hThread)
}
恢复线程的执行就是使用ResumeThread函数了
void Resume()
{
assert(GetCurrentThreadId() != thread_id)
if(!ResumeThread(hThread))
{
DWORD le = GetLastError()
printf(error %un, le)
}
}
void Join()
{
WaitForSingleObject函数用来检测hHandle事件的信号状态,当函数的执行时间超过dwMilliseconds就返回
WaitForSingleObject(hThread, INFINITE)
}
uint32 GetId()
{
return thread_id
}
}
struct Thread
{
ThreadBase ExecutionTarget
ThreadController ControlInterface
Mutex SetupMutex线程的互斥
bool DeleteAfterExit
}
typedef stdsetThread ThreadSet
线程池类
class CThreadPool
{
uint32 _threadsRequestedSinceLastCheck
uint32 _threadsFreedSinceLastCheck
uint32 _threadsExitedSinceLastCheck
uint32 _threadsToExit
int32 _threadsEaten可用线程数量
Mutex _mutex
ThreadSet m_activeThreads正在执行任务线程对列
ThreadSet m_freeThreads可用线程对列
public
CThreadPool()
void IntegrityCheck()
创建指定数量的线程并加到线程池
void Startup()
销毁线程
void Shutdown()
bool ThreadExit(Thread t)
Thread StartThread(ThreadBase ExecutionTarget)
从线程池取得可用线程并执行任务
void ExecuteTask(ThreadBase ExecutionTarget)
void ShowStats()
void KillFreeThreads(uint32 count)
__forceinline void Gobble(){
_threadsEaten=(int32)m_freeThreads.size()
}
__forceinline uint32 GetActiveThreadCount(){
return (uint32)m_activeThreads.size()
}
__forceinline uint32 GetFreeThreadCount(){
return (uint32)m_freeThreads.size()
}
}
extern CThreadPool ThreadPool线程池
#endif
ThreadPool.cpp
#include stdafx.h
#include ThreadPool.h
#include process.h
CThreadPool ThreadPool
CThreadPoolCThreadPool()
{
_threadsExitedSinceLastCheck = 0
_threadsRequestedSinceLastCheck = 0
_threadsEaten = 0
_threadsFreedSinceLastCheck = 0
}
bool CThreadPoolThreadExit(Thread t)
{
_mutex.Acquire()
m_activeThreads.erase(t)
if(_threadsToExit 0)
{
--_threadsToExit
++_threadsExitedSinceLastCheck
if(t-DeleteAfterExit)
m_freeThreads.erase(t)
_mutex.Release()
delete t
return false
}
enter the suspended pool
++_threadsExitedSinceLastCheck
++_threadsEaten
stdsetThreaditerator itr = m_freeThreads.find(t)
if(itr != m_freeThreads.end())
{
}
m_freeThreads.insert(t)
_mutex.Release()
return true
}
void CThreadPoolExecuteTask(ThreadBase ExecutionTarget)
{
Thread t
_mutex.Acquire()
++_threadsRequestedSinceLastCheck
--_threadsEaten
从线程池夺取一个线程
if(m_freeThreads.size())有可用线程
{
得到一个可用线程
t = m_freeThreads.begin()
把它从可用线程对列里删掉
m_freeThreads.erase(m_freeThreads.begin())
给这个线程一个任务
t-ExecutionTarget = ExecutionTarget
恢复线程的执行
t-ControlInterface.Resume()
}
else
{
创建一个新的线程并执行任务
t = StartThread(ExecutionTarget)
}
把线程加到执行任务线程对列
m_activeThreads.insert(t)
_mutex.Release()
}
void CThreadPoolStartup()
{
int i
int tcount = 5
for(i=0i tcount++i)
StartThread(NULL)
}
void CThreadPoolShowStats()
{
_mutex.Acquire()
在这里输出线程池的状态
_mutex.Release()
}
void CThreadPoolKillFreeThreads(uint32 count)
{
_mutex.Acquire()
Thread t
ThreadSetiterator itr
uint32 i
for(i = 0, itr = m_freeThreads.begin()i count &&itr != m_freeThreads.end()++i, ++itr)
{
t = itr
t-ExecutionTarget = NULL
t-DeleteAfterExit = true
++_threadsToExit
t-ControlInterface.Resume()
}
_mutex.Release()
}
void CThreadPoolShutdown()
{
_mutex.Acquire()
size_t tcount = m_activeThreads.size() + m_freeThreads.size()
KillFreeThreads((uint32)m_freeThreads.size())
_threadsToExit += (uint32)m_activeThreads.size()
for(ThreadSetiterator itr = m_activeThreads.begin()itr != m_activeThreads.end()++itr)
{
if((itr)-ExecutionTarget)
(itr)-ExecutionTarget-OnShutdown()
}
_mutex.Release()
for()
{
_mutex.Acquire()
if(m_activeThreads.size() m_freeThreads.size())
{
_mutex.Release()
Sleep(1000)
continue
}
break
}
}
static unsigned long WINAPI thread_proc(void param)
{
Thread t = (Thread)param
t-SetupMutex.Acquire()
uint32 tid = t-ControlInterface.GetId()
bool ht = (t-ExecutionTarget != NULL)
t-SetupMutex.Release()
for()
{
if(t-ExecutionTarget != NULL)
{
if(t-ExecutionTarget-run())执行任务,返回true表示任务完成
delete t-ExecutionTarget
t-ExecutionTarget = NULL
}
if(!ThreadPool.ThreadExit(t))
{
Log.Debug(ThreadPool, Thread %u exiting., tid)
break
}
else
{
if(ht)
printf(ThreadPool线程%d正在等待新任务., tid)
t-ControlInterface.Suspend()暂停线程运行
}
}
ExitThread(0)
return 0
}
Thread CThreadPoolStartThread(ThreadBase ExecutionTarget)
{
HANDLE h
Thread t = new Thread
t-DeleteAfterExit = false
t-ExecutionTarget = ExecutionTarget
t-SetupMutex.Acquire()
/*CreateThread(
lpThreadAttributes是线程的属性,
dwStackSize是线程的栈大小,
lpStartAddress是线程函数的开始地址,
lpParameter是传送给线程函数的参数,
dwCreationFlags是创建线程标志,比如挂起线程,
lpThreadId是标识这个线程的ID)*/
h = CreateThread(NULL, 0, &thread_proc, (LPVOID)t, 0, (LPDWORD)&t-ControlInterface.thread_id)
t-ControlInterface.Setup(h)
t-SetupMutex.Release()
return t
}
在我们日常生活中会遇到许许多多的问题,如果一个服务端要接受很多客户端的数据,该怎么办?多线程并发内存不够怎么办?所以我们需要了解线程池的相关知识。
1.线程池的简介
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。
2.线程池的组成
1、线程池管理器(ThreadPoolManager):用于创建并管理线程池
2、工作线程(WorkThread): 线程池中线程
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
4、任务队列:用于存放没有处理的任务。提供一种缓冲机制。
3.线程池的主要优点
1.避免线程太多,使得内存耗尽
2.避免创建与销毁线程的代价
3.任务与执行分离
1.线程池结构体定义
代码如下(示例):
相关视频推荐
150行代码,带你手写线程池,自行准备linux环境
C++后台开发该学哪些内容,标悔盯准技术路线及面经与算法该如何刷
学习地址:C/C++Linux服务器开发/后台架构师【零好陆声教友前顷育】-学习视频教程-腾讯课堂
需要更多C/C++ Linux服务器架构师学习资料加qun 812855908 (资料包括C/C++,Linux,golang技术,内核,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg,大厂面试题 等)
2.接口定义
代码如下(示例):
3.回调函数
代码如下(示例):
4.全部代码(加注释)
代码如下(示例):
关于线程池是基本代码就在上面了,关于编程这一部分内容,我建议大家还是要自己去动手实现,如果只是单纯的看了一遍,知识这块可能会记住,但是 *** 作起来可能就比较吃力,万事开头难,只要坚持下去,总会有结果的。
可以通过以下原则解决:1、设置线程池的最大线程数
2、设置线程池的并发处理线世改程数量
3、设置线程池最大的队列线程数
4、做好线程池的线程清理工作
做好这几点,理论上没问题了,具体还得看编程者代码的质量。
PS:服务器不可能为每一个肢返侍请求都创建线程,得考虑到最大负载,当达到临界值的时候,服务器返回繁忙状态,拒绝服务即可,当然历吵这是简单的处理办法。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)