目录
线程池引言
池分类
池化优势
用在何处
线程池组件
任务队列
线程队列
线程池
Reactor分解
何为Reactor
Reactor组件
网络IO处理分解
Reactor抛入线程池的方式
抛线程池方式1: 将parser业务处理抛入线程池
抛线程池方式2: 将IO *** 作 + parser都抛入线程池处理
充分利用多核CPU,主从Reactor
面试项目书写小技巧 (文末彩蛋)
如下是小杰自己封装的一款单线程 reactor + 方式1 抛入线程池.
线程池引言 池分类
- 线程池
- 数据库连接池
- 内存池
- 异步请求池
缓冲, 重复利用, 大大减少重建, 节约资源, 提高效率, 提高利用率
核心优势在哪里?
- 提前创建, 申请, 反复利用, 而不是重新创建, 申请.
- 反复利用所以利用率高, 也节约了资源
- 提前创建, 而不是临时创建, 省去了创建时间, 提高了效率
- 频繁需要申请释放处。 反正经常用, 我何不提前创建好, 等待你用, 用完我也不扔掉, 继续等你其他时候用.
- 多线程处:均可以考虑抛入线程池, 减少线程频繁创建销毁
- 注意:频繁是核心.
- 生活例子: 蓄水池, 酒池肉林, 好处何在? 方便吧, 提前放好, 随用随取
线程队列:
提前开启线程,
多线程同步消费任务队列中的任务
多线程同步消费:加锁 + 条件消费, 同步等待, 一个线程需要消费必须同时具备两个条件, 1.获取锁 2. 满足条件 (存在任务)
同步:核心在于条件等待, 等待着一个条件满足,然后同步触发一个事件, 阻塞函数, 同步等待.
同步优势:对于共享资源的有序消费, 多线程之间互相等待, 互相同步, 稳定消费
任务队列也可叫做阻塞队列: blocking queue 学名
阻塞队列作用:异步解耦合, 任务放入任务队列中, 立刻返回, 在工作线程繁忙的时候,不至于需要等待线程空闲.
线程队列组织成双向队列的多线程. 工作线程, 消费者线程, 提前开启, 等待处理任务
生活实例:办事窗口
线程池将上述两大组件组合在一起 + mutex锁 + cond条件变量 实现同步消费
Reactor分解 何为Reactor
反应堆, 事件反应堆, 反射堆: 将对io的 *** 作封装成对事件的 *** 作
Reactor组件- 多路复用器 收集反应事件 epoll
- 事件处理器 回调处理机制
- 利用回调封装事件循环
- io检测封装: epoll活跃io事件收集
- io *** 作封装, 读写io
- 对数据的解析 *** 作封装, parser 业务逻辑
single reactor thread + worker threadpool
抛线程池方式1: 将parser业务处理抛入线程池单线程reactor + 工作线程池
做法: 将业务逻辑处理单独抛入一个工作线程池进行处理. 实现网络IO跟业务的解耦合
使用场景: 相比IO, 业务逻辑处理耗时相当严重. 比如说写日志呀, XML 文件的解析、数据库记录的查找、文件资料的读取和传输、计算型工作的处理等,它们会拖慢整个反应堆模式的执行效率。此时我们就可以将其单独抛到另外的Thread pool 中去执行业务需求.
好处:反应堆线程仅仅处理网络IO 而 decode、compute、enode 型工作放置到另外的线程池中, 两者解耦,在业务处理耗时情况下大大提高效率
抛线程池方式2: 将IO *** 作 + parser都抛入线程池处理使用场景:IO *** 作跟parser的处理都相当耗时的情境下, 将其放在事件循环中会拖慢整个事件循环的进程。
好处:事件循环可以最快的响应活跃事件.
缺陷:针对IO *** 作: 我们可能存在对于fd的一个共享问题. 一个线程在 *** 作fd,另外一个线程给fd关闭了,这个就是一个大的问题. (核心在于可能出现fd的多线程共用的问题)
处理方式: 要么不应该使多个线程共享同一个fd,要么对fd进行简单的加锁 *** 作。
充分利用多核CPU,主从Reactor单Reactor的时候, reactor 反应堆同时分发Acceptor 上的连接建立事件和已建立连接的 I/O 事件。这样对于客户端接入量不高的情况下是完全OK的.
但是一旦客户端接入量特别大的情况下, reactor既要分发连接建立,又分发已建立连接的 I/O,有点忙不过来,在实战中的表现可能就是客户端连接成功率偏低。
引出 --- 主从Reactor模式, 多Reactor模型, 将 acceptor 上的连接建立事件和已建立连接的 I/O 事件分离
核心思想: main Reactor只负责分发连接建立事件, sub Reactor 来负责已经建立连接的事件的分发.
sub Reactor的数量设置: 依据CPU数量而定
优势:服务器稳定性大大提升, 客户端连接成功率大大提高
面试项目书写小技巧 (文末彩蛋)对比书写, 分版本书写, 不同的版本优势是什么, 加入了什么技术, 带来了什么样的好处, 获益是什么
eg:本文中的reactor.
我在简历项目上书写, 实现了一个多线程reactor网络服务器框架
最开始使用什么样的技术, 实现了单线程reactor的框架, 然后对比单线程reacor跟多线程reactor, 表明加入线程池 + 主从reactor之后的优势所在. 带来了什么样的提升, 服务器稳定性提升呀, 性能抗压性提升等. (此处仅为小杰己见, 可能会有益处)
如下是我在网上找到的一个大佬写的单Reactor + 线程池的实现, 我觉得写的很棒, 其中的代码很多都是特别值得我们去品味的
reactor: 一个使用c语言实现的“单Reactor多线程”的简易reactor模型。https://gitee.com/chenwifi/reactor
如下是小杰自己封装的一款单线程 reactor + 方式1 抛入线程池.threadpool.h
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#include
#include
#include
#include
typedef void (*Func)(void*);
/*
线程池组件
1. 任务队列 (阻塞队列)
2. 工作队列 (消化任务队列中的任务)
3. 管理组件, 管理平衡工作 + 任务队列 (线程池)
*/
#define LL_ADD(item, list) do {\
item->pre = NULL;\
item->next = list;\
list = item;\
} while (0)
#define LL_REMOVE(item, list) do {\
if (item->pre != NULL) item->pre->next = item->next;\
if (item->next != NULL) item->next->pre = item->pre;\
if (item == list) list = item->next;\
item->pre = item->next = NULL;\
} while (0)
typedef struct Task {
Func task_run; //执行task任务, 处理user_data
void* user_data;
struct Task* pre;
struct Task* next;
} Task;
//工作线程, 消化执行 task
typedef struct Wocker {
pthread_t tid;
int terminate;//终止, 停止工作
struct Mannger* pool;
struct Wocker* pre;
struct Wocker* next;
} Wocker;
//管理组件, 管理平衡上述的Task + Wocker
typedef struct Mannger {
struct Wocker* wockers;//工作队列
struct Task* tasks;//任务队列
pthread_mutex_t lock;
pthread_cond_t cond;
} ThreadPool;
//线程执行函数, 核心所在
void* thread_routine(void* arg) {
Wocker* wocker = (Wocker*)arg;
while (1) {
pthread_mutex_lock(&wocker->pool->lock);
if (wocker->pool->tasks == NULL) {
if (wocker->terminate) break;//中断
pthread_cond_wait(&wocker->pool->cond, &wocker->pool->lock);
}
//至此说明获取到锁了
if (wocker->terminate) {
pthread_mutex_unlock(&wocker->pool->lock);
break;
}
Task* task = wocker->pool->tasks;
if (task != NULL) {
LL_REMOVE(task, wocker->pool->tasks);
}
pthread_mutex_unlock(&wocker->pool->lock);
task->task_run(task);
}
free(wocker);//delete掉
}
//创建线程池, 开启消费者线程
ThreadPool* thread_pool_create(int thread_num) {
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
pool->wockers = NULL;
pool->tasks = NULL;
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->cond, NULL);
int i = 0;
for (i = 0; i < thread_num; ++i) {
Wocker* wocker = (Wocker*)calloc(sizeof(Wocker), 1);
wocker->pool = pool;
pthread_create(&wocker->tid, NULL, thread_routine, (void*)wocker);
LL_ADD(wocker, pool->wockers);
}
return pool;
}
void thread_pool_destroy(ThreadPool* pool) {
if (pool == NULL) return ;
Wocker* wocker = NULL;
for (wocker = pool->wockers; wocker != NULL; wocker = wocker->next) {
wocker->terminate = 1;//中断运行, 所有的工作线程中断工作
}
pthread_mutex_lock(&pool->lock);
pthread_cond_broadcast(&pool->cond);//核心所在
//广播让所有的工作线程退出工作
pthread_mutex_unlock(&pool->lock);
pthread_cond_destroy(&pool->cond);
pthread_mutex_destroy(&pool->lock);
//释放所有资源
//free_all(pool);
}
void thread_pool_push_task(ThreadPool* pool, Task* task) {
pthread_mutex_lock(&pool->lock);
LL_ADD(task, pool->tasks);
pthread_cond_signal(&pool->cond);//通知有任务可以消费了
pthread_mutex_unlock(&pool->lock);
}
#endif
reactor.h
#ifndef _REACTOR_H_
#define _REACTOR_H_
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "threadpool.h"
//简易reactor封装, 实现事件循环, 事件驱动
//io检测封装: epoll事件收集
//事件驱动, 事件循环封装, 设置回调
//IO *** 作封装, 设置收发缓冲区 + 处理返回
//reactor进一步升级, 更符合业务需求.
//网络 跟 业务需求 隔离解除耦合性
#define MAX_N 512
typedef struct sockaddr SA;
typedef int (*CallBack)(int fd, int events, void* arg);
#define BUFFSIZE 1024
void err_exit(const char* reason) {
fprintf(stderr, "%s : %d : %s\n", reason, errno, strerror(errno));
exit(EXIT_FAILURE);
}
//封装epoll
typedef struct reactor {
int epfd;
struct epoll_event* events;//容器, 收集活跃事件
int stop;
ThreadPool* pool;
} reactor;
typedef struct sockitem {
int sockfd;
//封装回调, 事件驱动
CallBack callback;
struct reactor* eventloop;
//每条连接读写缓冲区封装
char recvbuffer[BUFFSIZE];
int rlen;
char sendbuffer[BUFFSIZE];
int slen;
} sockitem;
//事件处理器声明
int accept_cb(int fd, int events, void* arg);
int recv_cb(int fd, int events, void* arg);
int send_cb(int fd, int events, void* arg);
struct reactor* init_reactor(int n) {
struct reactor* eventloop = (struct reactor*)malloc(sizeof(struct reactor));
eventloop->epfd = epoll_create(1);
eventloop->events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * n);
eventloop->stop = 0;
eventloop->pool = thread_pool_create(10);
return eventloop;
}
void release_reactor(struct reactor* eventloop) {
if (NULL == eventloop) return;
close(eventloop->epfd); //关闭epfd
free(eventloop);
return ;
}
struct sockitem* init_sockitem(struct reactor* eventloop, int sockfd, CallBack callback) {
struct sockitem* si = (struct sockitem*)malloc(sizeof(struct sockitem));
si->sockfd = sockfd;
si->callback = callback;
si->eventloop = eventloop;
return si;
}
void setnoblock(int fd) {
int flag = fcntl(fd, F_GETFL, 0);
if (-1 == fcntl(fd, F_SETFL, flag | O_NONBLOCK)) {
err_exit("fcntl");
}
}
int add_event(struct reactor* eventloop, int events, sockitem* si) {
struct epoll_event ev;
ev.events = events;
ev.data.ptr = si;
if (-1 == epoll_ctl(eventloop->epfd, EPOLL_CTL_ADD, si->sockfd, &ev)) {
err_exit("epoll_ctl add");
}
return 0;
}
int del_event(struct reactor* eventloop, sockitem* si) {
if (NULL == si) return 1;
if (-1 == epoll_ctl(eventloop->epfd, EPOLL_CTL_DEL, si->sockfd, NULL)) {
err_exit("epoll_ctl del");
}
free(si);
return 0;
}
int mod_event(struct reactor* eventloop, int events, sockitem* si) {
struct epoll_event ev;
ev.events = events;
ev.data.ptr = si;
if (-1 == epoll_ctl(eventloop->epfd, EPOLL_CTL_MOD, si->sockfd, &ev)) {
err_exit("epoll_ctl mod");
}
return 0;
}
int accept_cb(int fd, int events, void* arg) {
struct sockitem* si = (struct sockitem*)arg;
socklen_t cli_addr_len = sizeof(struct sockaddr_in);
struct sockaddr_in cli_addr;
char ip_buff[INET_ADDRSTRLEN] = {0};
int cli_fd = accept(fd, (SA*)&cli_addr, &cli_addr_len);
setnoblock(cli_fd);//非阻塞, 避免因为一条连接的事件阻塞其他连接得不到处理
if (-1 == cli_fd) {
err_exit("accept");
}
printf("recv from ip %s at port %d\n", inet_ntop(AF_INET, &cli_addr.sin_addr, ip_buff, sizeof(ip_buff)),
ntohs(cli_addr.sin_port));
//封装si事件
struct sockitem* newsi = init_sockitem(si->eventloop, cli_fd, recv_cb);
//将si 放入eventloop事件循环
add_event(si->eventloop, EPOLLIN | EPOLLET, newsi);
return cli_fd;
}
void task_run(void* arg) {
Task* task = (Task*)arg;
//拿取任务执行
struct sockitem* si = (struct sockitem*)task->user_data;
char* data = si->recvbuffer;
//处理数据, 进行运算
int i = 0;
char op;
int lhs = 0, rhs = 0;
int flag = 0;//标记lhs, rhs
int ans = 0;
while (data[i]) {
if (data[i] >= '0' && data[i] <= '9') {
if (!flag) {
for (; data[i] >= '0' && data[i] <= '9'; ++i) {
lhs = lhs * 10 + (data[i] - '0');
}
flag = 1;
i -= 1;
} else {
for (; data[i] >= '0' && data[i] <= '9'; ++i) {
rhs = rhs * 10 + (data[i] - '0');
}
break;
}
} else if (data[i] == ' ') {
i += 1;
continue;
} else {
op = data[i];
}
i += 1;
}
switch(op) {
case '+' : {
ans = lhs + rhs;
} break;
case '-' : {
ans = lhs - rhs;
} break;
case '*' : {
ans = lhs * rhs;
} break;
case '/' : {
ans = lhs / rhs;
} break;
}
printf("%d %c %d = %d\n", lhs, op, rhs, ans);
sprintf(si->sendbuffer, "%d %c %d = %d\n", lhs, op, rhs, ans);
//数据写入到sendbuffer种去
si->slen = strlen(si->sendbuffer);
memset(si->recvbuffer, 0, BUFFSIZE);//清空读缓冲区.
si->rlen = 0;
si->callback = send_cb;
mod_event(si->eventloop, EPOLLOUT | EPOLLET, si);
free(task);
}
int recv_cb(int fd, int events, void* arg) {
struct sockitem* si = (struct sockitem*)arg;
struct epoll_event ev;
int ret = 0;
while (1) {
ret = recv(fd, si->recvbuffer, BUFFSIZE, 0);
if (ret < 0) {
if (errno == EINTR) {//信号打断
continue;
}
if (errno == EWOULDBLOCK) {//写缓冲区满了
break;
}
del_event(si->eventloop, si);
close(fd);
err_exit("recv");//出错了
} else if (ret == 0) {
//对端断开连接
printf("fd %d disconnect\n", fd);
del_event(si->eventloop, si);
close(fd);
return 0;
} else {
break;
}
}
//接收到数据之后进行处理, 将其抛入到线程池处理
#if 0
//打印接收到的数据
printf("recv: %s, %d Bytes\n", si->recvbuffer, ret);
//设置sendbuffer
si->rlen = ret;
memcpy(si->sendbuffer, si->recvbuffer, si->rlen);
si->slen = si->rlen;
//清空recvbuffer
memset(si->recvbuffer, 0,BUFFSIZE);
si->callback = send_cb;
mod_event(si->eventloop, EPOLLOUT | EPOLLET, si);
#elif 1
//如何抛入到线程池进行处理?
/*
假设业务逻辑是: 将字符串转换为算式进行计算
*/
printf("recv: %s, %d Bytes\n", si->recvbuffer, ret);
si->rlen = ret;
Task* task = (Task*)malloc(sizeof(Task));
//先创建任务结构体
task->next = task->pre = NULL;
task->task_run = task_run;
task->user_data = (void*)si;
thread_pool_push_task(si->eventloop->pool, task);
#endif
return 0;
}
int send_cb(int fd, int events, void* arg) {
struct sockitem* si = (struct sockitem*)arg;
while (1) {
int n = send(fd, si->sendbuffer, BUFFSIZE, 0);
if (-1 == n) {
if (errno == EINTR) {//信号打断
continue;
}
if (errno == EWOULDBLOCK) {//写缓冲区满了
break;
}
//出错了
err_exit("send");
}
printf("send %d bytes\n", n);
//每一次send之后从新将senbuffer置为空
memset(si->sendbuffer, 0, BUFFSIZE);
break;//正常写完
}
si->callback = recv_cb;
mod_event(si->eventloop, EPOLLIN | EPOLLET, si);
}
//事件循环一次
void eventloop_once(struct reactor* eventloop) {
int nready = epoll_wait(eventloop->epfd, eventloop->events, MAX_N, -1);
int i = 0;
for (i = 0; i < nready; ++i) {
int mask = 0;
struct epoll_event* ev= &eventloop->events[i];
if (ev->events & EPOLLIN) mask |= EPOLLIN;
if (ev->events & EPOLLOUT) mask |= EPOLLOUT;
//将EPOLLERR + EPOLLHUP 的处理交付到io函数中进行处理, 放到回调中处理
if (ev->events & EPOLLERR) mask |= EPOLLIN|EPOLLOUT;
if (ev->events & EPOLLHUP) mask |= EPOLLIN|EPOLLOUT;
if (mask & EPOLLIN) {
struct sockitem* si = (struct sockitem*)ev->data.ptr;
si->callback(si->sockfd, mask, si);
}
if (mask & EPOLLOUT) {
struct sockitem* si = (struct sockitem*)ev->data.ptr;
si->callback(si->sockfd, mask, si);
}
}
}
//开启事件循环
void start_eventloop(struct reactor* eventloop) {
while (!eventloop->stop) {
eventloop_once(eventloop);
}
}
//停止事件循环
void stop_eventloop(struct reactor* eventloop) {
eventloop->stop = 1;
}
#endif
reator.c
#include "reactor.h"
int init_sock(short port) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == sockfd) {
err_exit("socket");
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = INADDR_ANY;
if (-1 == bind(sockfd, (SA*)&addr, sizeof(addr))) {
err_exit("bind");
}
if (-1 == listen(sockfd, 5)) {
err_exit("listen");
}
return sockfd;
}
int main(int argc, char* argv[]) {
if (2 != argc) {
fprintf(stderr, "usage: %s ", argv[0]);
exit(EXIT_FAILURE);
}
short port = atoi(argv[1]);
int sockfd = init_sock(port);
//至此完成了网络连接了
struct reactor* mainloop = init_reactor(MAX_N);
//封装listen的
struct sockitem* si = init_sockitem(mainloop, sockfd, accept_cb);
add_event(mainloop, EPOLLIN, si);//对于监视套接字一般是水平触发
start_eventloop(mainloop);
//回收资源
release_reactor(mainloop);
return 0;
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)