Error[8]: Undefined offset: 1318, File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 121
File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 473, decode(

epoll-reactor模型原理即代码解析

epoll-reactor模型原理即代码解析

epoll

网络IO *** 作一般会涉及两个系统对象,一个是用户空间调用 IO 的进程或者线程,另一个是内核空间的内核系统,比如发生 IO *** 作 read 时,它会经历两个阶段:

  1. 等待数据准备就绪

  2. 将数据从内核拷贝到进程或者线程中。


不同情况地处理这两个阶段上,可以分为不同的服务器模型,最常用的如Reactor 与 Proactor ,本文主要记录自己对epoll到reactor模型的演变及分析。


下面是利用epoll多路复用和非阻塞IO+ET边缘触发设计的并发服务器程序:(详细了epoll原理分析可查看该文章链接: link.)

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#define MAXLNE  4096

#define EPOLL_SIZE 1024
int main() {

    int listenfd, connfd, n;

    struct sockaddr_in servaddr;
    char buff[MAXLNE];
 
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(10000);
 
    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
        printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    if (listen(listenfd, 10) == -1) {
        printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }

    int epfd = epoll_create(1);

    struct epoll_event events[EPOLL_SIZE] = {0};
    struct epoll_event ev;
    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = listenfd;
    
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);

    while(1) {

        int nready = epoll_wait(epfd, events, EPOLL_SIZE, 5);//nready是返回的监控文件描述符中有响应的描述符个数
        if(nready == -1) {
            continue;
        }

        for(int i = 0; i < nready; i++) {
            int clientfd = events[i].data.fd;
            if(clientfd == listenfd) {
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                if((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
                    printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
                    return 0;
                }
                printf("accept Success fd=  %d\n", connfd);

                int flag = 0;
                if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {//设置与客户端通信的socket为非阻塞
                    printf(" fcntl nonblocking failed\n");
                    break;
                }


                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = connfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
            }
            else if(events[i].events & EPOLLIN) {
                n = recv(clientfd, buff, MAXLNE, 0);//因为使用的ET模式,这里其实最好使用while循环将fd的数据全部读到buff中(防止数据多一次读不完的情况)
                if (n > 0) {
                    buff[n] = '\0';
                    printf("recv msg from client: %s\n", buff);
                    ev.events = EPOLLOUT;//注册EPOLLOUT事件,让服务器发送数据
                    ev.data.fd = clientfd;
                    epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);              
                } 
                else if (n == 0) {//当客户端调用close时,会返回n=0
                    ev.events = EPOLLIN;
                    ev.data.fd = clientfd;
                    epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &ev);//删除EPOLL事件
                    close(clientfd);
                }

            }else if(events[i].events & EPOLLOUT) {
                send(clientfd, buff, n, 0);//将发送来的数据依次返回发送回去
                ev.events = EPOLLIN | EPOLLET;//重新注册EPOLL事件
                ev.data.fd = clientfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
            }
        }
    }
    return 0;
}


epoll监控事件描述图:

Reactor服务器模型

epoll的Reactor模型监控事件描述图:

从图中可以看出,reactor多了一个结构体。


服务进程处理epoll事件也不是单纯的逻辑 *** 作,而是通过对应的事件数组中事件对应的fd指向的回调函数来处理接下来的逻辑 *** 作。


回想一下普通函数调用的机制:程序调用某函数,函数执行,程序等待,函数将结果和控制权返回给程序,程序继续处理。


Reactor 释义“反应堆”,是一种事件驱动机制。


和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰相反,Reactor 逆置了事件处理流程(不再是主动地等事件就绪,而是它提前注册好的回调函数,当有对应事件发生时就调用回调函数),应用程序需要提供相应的接口并注册到 Reactor 上,如果相应的事件发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函
数”。


Reactor程序如下:

#include 
#include 
#include 
#include 
#include 
#include 

#include 
#include 
#include 
#include 


#define BUFFER_LENGTH		4096
#define MAX_EPOLL_EVENTS	1024
#define SERVER_PORT			10000

typedef int NCALLBACK(int ,int, void*);

struct ntyevent {
	int fd;//文件描述符
	int events;//事件
	void *arg;//回调函数的参数
	int (*callback)(int fd, int events, void *arg);
	
	int status;
	char buffer[BUFFER_LENGTH];//缓存区
	int length;//
	long last_active;
};



struct ntyreactor {//结构体包含一个文件描述符和一个事件指针
	int epfd;
	struct ntyevent *events;
};


int recv_cb(int fd, int events, void *arg);
int send_cb(int fd, int events, void *arg);


void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) {

	ev->fd = fd;
	ev->callback = callback;
	ev->events = 0;
	ev->arg = arg;
	ev->last_active = time(NULL);

	return ;
	
}


int nty_event_add(int epfd, int events, struct ntyevent *ev) {

	struct epoll_event ep_ev = {0, {0}};
	ep_ev.data.ptr = ev;//设置要处理的事件的指针,含有文件描述符等信息
	ep_ev.events = ev->events = events;//设置要处理的事件类型

	int op;
	if (ev->status == 1) {
		op = EPOLL_CTL_MOD;
	} else {
		op = EPOLL_CTL_ADD;
		ev->status = 1;
	}

	if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {//注册epoll事件
		printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
		return -1;
	}

	return 0;
}

int nty_event_del(int epfd, struct ntyevent *ev) {

	struct epoll_event ep_ev = {0, {0}};

	if (ev->status != 1) {
		return -1;
	}

	ep_ev.data.ptr = ev;
	ev->status = 0;
	epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);//删除注册epoll事件

	return 0;
}

int recv_cb(int fd, int events, void *arg) {

	struct ntyreactor *reactor = (struct ntyreactor*)arg;
	struct ntyevent *ev = reactor->events+fd;

	int len = recv(fd, ev->buffer, 16, 0); //BUFFER_LENGTH   //读取fd的数据到相应的buffer 中,16是指定buffer长度(可以理解乘读取数据最大长度)
	nty_event_del(reactor->epfd, ev);

	if (len > 0) {
		
		ev->length = len;
		ev->buffer[len] = ';'printf

		("C[%d]:%s\n",, fd) ev->buffer;nty_event_set

		(,ev, fd, send_cb) reactor;//设置send_cb回调函数nty_event_add
		(,reactor->epfd, EPOLLOUT) ev;//注册EPOLLOUT事件}
		
		
	else if ( ==len 0 )close {

		()ev->fd;printf
		("[fd=%d] pos[%ld], closed\n",, fd- ev)reactor->events;}
		 
	else close {

		()ev->fd;printf
		("recv[fd=%d] error[%d]:%s\n",, fd, errnostrerror ()errno);}
		
	return

	; len}
int


send_cb (int, fdint , eventsvoid * )argstruct {

	ntyreactor * =reactor ( structntyreactor *);argstruct
	ntyevent * =ev + reactor->events;fdint

	= len send (,fd, ev->buffer, ev->length0 );//发送数据if
	( 0len > )printf {
		("send[fd=%d], [%d]%s\n",, fd, len) ev->buffer;nty_event_del

		(,reactor->epfd) ev;//删除epoll注册事件nty_event_set
		(,ev, fd, recv_cb) reactor;//设置回调函数nty_event_add
		(,reactor->epfd| EPOLLIN , EPOLLET) ev;//条件epoll注册事件}
		
	else close {

		()ev->fd;nty_event_del

		(,reactor->epfd) ev;printf
		("send[fd=%d] error %s\n",, fdstrerror ()errno);}

	return

	; len}
int

accept_cb (int, fdint , eventsvoid * )argstruct {

	ntyreactor * =reactor ( structntyreactor *);argif
	( ==reactor NULL )return - 1;struct

	sockaddr_in ; client_addr=
	socklen_t len sizeof ()client_addr;int

	; clientfdif

	( (=clientfd accept (,fd( structsockaddr *)&,client_addr& )len)== - 1)//accept绑定已连接队列中的socket {if
		( !=errno && EAGAIN != errno ) EINTR} {
			
		printf
		("accept: %s\n",strerror ()errno);return
		- 1;}
	int

	= i 0 ;do
	for {
		
		( =i 3 ;<i ; MAX_EPOLL_EVENTS++i )//0,1,2是系统固定的文件描述符 {if
			( [reactor->events]i.==status 0 )// {break
				;}
			}
		if
		( ==i ) MAX_EPOLL_EVENTSprintf {
			("%s: max connect limit[%d]\n",__func__ ,) MAX_EPOLL_EVENTS;break
			;}
		int

		= flag 0 ;if
		( (=flag fcntl (,clientfd, F_SETFL) O_NONBLOCK)< 0 )//设置与客户端通信的socket为非阻塞 {printf
			("%s: fcntl nonblocking failed, %d\n",__func__ ,) MAX_EPOLL_EVENTS;break
			;}
		nty_event_set

		(&[reactor->events]clientfd,, clientfd, recv_cb) reactor;//设置当前fd的回调函数等信息  nty_event_add
		(,reactor->epfd| EPOLLIN , EPOLLET& [reactor->events]clientfd);//添加当前fd的触发信号EPOLLIN | EPOLLET}

	while ( 0);printf

	("new connect [%s:%d][time:%ld], pos[%d]\n",inet_ntoa 
		(.client_addr)sin_addr,ntohs (.client_addr)sin_port,[ reactor->events]i.,last_active) i;//输出连接的地址:端口:时间:fdreturn

	0 ;}

int

init_sock (short) portint {

	= fd socket (,AF_INET, SOCK_STREAM0 );//创建socket文件描述符fcntl
	(,fd, F_SETFL) O_NONBLOCK;//设置非阻塞fdstruct

	sockaddr_in ; server_addrmemset
	(&,server_addr0 ,sizeof ()server_addr);.
	server_addr=sin_family ; AF_INET.
	server_addr.sin_addr=s_addr htonl ()INADDR_ANY;.
	server_addr=sin_port htons ()port;bind

	(,fd( structsockaddr *)&,server_addrsizeof ()server_addr);//绑定端口和IP地址if

	( listen(,fd20 )< 0 )//监听端口 {printf
		("listen failed : %s\n",strerror ()errno);}
	return

	; fd}
int


ntyreactor_init (structntyreactor * )reactorif {

	( ==reactor NULL )return - 1;memset
	(,reactor0 ,sizeof (structntyreactor ));//memset清0=

	reactor->epfd epoll_create (1);//创建epoll,返回文件描述符根节点(epoll是以红黑树存储描述符) if
	( <=reactor->epfd 0 )printf {
		("create epfd in %s err %s\n",__func__ ,strerror ()errno);return
		- 2;}
	=

	reactor->events ( structntyevent *)malloc(()MAX_EPOLL_EVENTS* sizeof (structntyevent ));//分配存储事件的空间if
	( ==reactor->events NULL )printf {
		("create epfd in %s err %s\n",__func__ ,strerror ()errno);close
		()reactor->epfd;return
		- 3;}
	}
int

ntyreactor_destory (structntyreactor * )reactorclose {

	()reactor->epfd;free
	()reactor->events;}

int



ntyreactor_addlistener (structntyreactor * ,reactorint , sockfd* NCALLBACK )acceptorif {

	( ==reactor NULL )return - 1;if
	( ==reactor->events NULL )return - 1;nty_event_set

	(&[reactor->events]sockfd,, sockfd, acceptor) reactor;//设置回调函数nty_event_add
	(,reactor->epfd, EPOLLIN& [reactor->events]sockfd);//添加epoll注册事件return

	0 ;}
int



ntyreactor_run (structntyreactor * )reactorif {
	( ==reactor NULL )return - 1;if
	( <reactor->epfd 0 )return - 1;if
	( ==reactor->events NULL )return - 1;struct
	
	epoll_event [ events+MAX_EPOLL_EVENTS1];int
	
	= checkpos 0 ,; iwhile

	( 1)//此处循环是看连接是否超时,采用轮询验证 {long

		= now time (NULL);for
		( =i 0 ;<i 100 ;++i ,++ checkpos )if {
			( ==checkpos ) MAX_EPOLL_EVENTS= {
				checkpos 0 ;}
			if

			( [reactor->events]checkpos.!=status 1 )continue {
				;}
			long

			= duration - now [ reactor->events]checkpos.;last_active//延时时间if

			( 60duration >= )//时间大于60,关闭文件描述符 {close
				([reactor->events]checkpos.)fd;//关闭文件描述符printf
				("[fd=%d] timeout\n",[ reactor->events]checkpos.)fd;nty_event_del
				(,reactor->epfd& [reactor->events]checkpos);//删除epoll注册事件}
			}
		int


		= nready epoll_wait (,reactor->epfd, events, MAX_EPOLL_EVENTS1000 );//events用于回传要处理的事件if
		( <nready 0 )printf {
			("epoll_wait error, exit\n");continue
			;}
		for

		( =i 0 ;<i ; nready++i )struct {

			ntyevent * =ev ( structntyevent *)[events]i..data;ptrif

			( ([events]i.&events ) EPOLLIN&& ( &ev->events ) EPOLLIN)//监控epoll事件EPOLLIN {callback
				ev->(,ev->fd[ events]i.,events) ev->arg;//callback}
			if
			( ([events]i.&events ) EPOLLOUT&& ( &ev->events ) EPOLLOUT)//监控epoll事件EPOLLOUT {callback
				ev->(,ev->fd[ events]i.,events) ev->arg;//callback}
			}
			
		}

	}
int

main (int, argcchar * [argv])unsigned {

	short = port ; SERVER_PORTif
	( ==argc 2 )= {
		port atoi ([argv1]);}
	int

	= sockfd init_sock ()port;//传入端口号,初始化socket,,返回得到监听的socketfdstruct

	ntyreactor * =reactor ( structntyreactor *)malloc(sizeof(structntyreactor ));//创建ntyreactor结构体对象ntyreactor_init
	()reactor;//初始化结构体,, 包含一个文件描述符和一个ntyreact类型的事件指针,reactor中epfd是监听红黑树的根节点fdntyreactor_addlistener
	
	(,reactor, sockfd) accept_cb;ntyreactor_run
	()reactor;ntyreactor_destory

	()reactor;close
	()sockfd;return
	

	0 ;}
int

从程序可以看出

= len send (,fd, ev->buffer, ev->length0 );//发送数据if
	( 0len > )printf {
		("send[fd=%d], [%d]%s\n",, fd, len) ev->buffer;nty_event_del

		(,reactor->epfd) ev;//删除epoll注册事件nty_event_set
		(,ev, fd, recv_cb) reactor;//设置回调函数nty_event_add
		(,reactor->epfd| EPOLLIN , EPOLLET) ev;//条件epoll注册事件
  • 响应快,不必为单个同步事件所阻塞,虽然 Reactor 本身依然是同步的;
  • 发送完数据就删除epoll注册事件,再重新注册。


    如此频繁的增加删除是否浪费CPU资源?
    答:对于同一个socket而言,完成收发至少占用两个树上的位置(不删除的情况)。


    而交替(本文情况)只需要一个。


    因此从这个角度看增加删除 *** 作epoll事件不会更加浪费CPU资源。


    Reactor 模式是编写高性能网络服务器的必备技术之一,它具有如下的优点:

    Reactor 模型开发效率上比起直接使用 IO 复用要高,它通常是单线程的,设计目标是希望单线程使用一颗 CPU 的全部资源,但也有附带优点,即每个事件处理中很多时候可以不考虑共享资源的互斥访问。


    可是缺点也是明显的,现在的硬件发展,已经不再遵循摩尔定律,CPU 的频率受制于材料的限制不再有大的提升,而改为是从核数的增加上提升能力,当程序需要使用多核资源时,Reactor 模型就会悲剧。


    网络服务器中还有个Proactor模型也比较流行, proactor 模型最大的特点就是 Proactor 最大的特点是使用异步 I/O(Reactor采用的同步IO)。


    所有的 I/O *** 作都交由系统提供的异步 I/O 接口去执行。


    工作线程仅仅负责业务逻辑。


    在 Proactor 中,用户函数启动一个异步的文件 *** 作。


    同时将这个 *** 作注册到多路复用器上。


    多路复用器并不关心文件是否可读或可写而是关心这个异步读 *** 作是否完成。



    步 *** 作是 *** 作系统完成,用户程序不需要关心。


    多路复用器等待直到有完成通知到来。


    当 *** 作系统完成了读文件 *** 作——将读到的数据复制到了用户先前提供的缓冲区之后,通知多路复用器相关 *** 作已完成。


    多路复用器再调用相应的处理程序,处理数据。



    Proactor 增加了编程的复杂度,但给工作线程带来了更高的效率。


    Proactor 可以在系统态将读写优化,利用 I/O 并行能力,提供一个高性能单线程模型。


    在 windows 上,由于没有 epoll 这样的机制,因此提供了 IOCP 来支持高并发, 由于 *** 作系统做了较好的优化,windows 较常采用 Proactor 的模型利用完成端口来实现服务器。


    在 linux 上,在2.6 内核出现了 aio 接口,但 aio 实际效果并不理想,它的出现,主要是解决 poll 性能不佳的问题,但实际上经过测试,epoll 的性能高于 poll+aio,并且 aio 不能处理 accept,因此 linux 主要还是以 Reactor 模型为主。


    本文的代码源码以及使用可在GitHub下载链接: link.

    )
    File: /www/wwwroot/outofmemory.cn/tmp/route_read.php, Line: 126, InsideLink()
    File: /www/wwwroot/outofmemory.cn/tmp/index.inc.php, Line: 165, include(/www/wwwroot/outofmemory.cn/tmp/route_read.php)
    File: /www/wwwroot/outofmemory.cn/index.php, Line: 30, include(/www/wwwroot/outofmemory.cn/tmp/index.inc.php)
    epoll-reactor模型原理代码解析_C_内存溢出

    epoll-reactor模型原理代码解析

    epoll-reactor模型原理代码解析,第1张

    epoll-reactor模型原理即代码解析

    epoll-reactor模型原理即代码解析
    • epoll-reactor模型原理即代码解析
      • epoll
      • Reactor服务器模型

    epoll

    网络IO *** 作一般会涉及两个系统对象,一个是用户空间调用 IO 的进程或者线程,另一个是内核空间的内核系统,比如发生 IO *** 作 read 时,它会经历两个阶段:

    1. 等待数据准备就绪

    2. 将数据从内核拷贝到进程或者线程中。


    不同情况地处理这两个阶段上,可以分为不同的服务器模型,最常用的如Reactor 与 Proactor ,本文主要记录自己对epoll到reactor模型的演变及分析。


    下面是利用epoll多路复用和非阻塞IO+ET边缘触发设计的并发服务器程序:(详细了epoll原理分析可查看该文章链接: link.)

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #define MAXLNE  4096
    
    #define EPOLL_SIZE 1024
    int main() {
    
        int listenfd, connfd, n;
    
        struct sockaddr_in servaddr;
        char buff[MAXLNE];
     
        if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
            printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
            return 0;
        }
     
        memset(&servaddr, 0, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
        servaddr.sin_port = htons(10000);
     
        if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
            printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
            return 0;
        }
     
        if (listen(listenfd, 10) == -1) {
            printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
            return 0;
        }
    
        int epfd = epoll_create(1);
    
        struct epoll_event events[EPOLL_SIZE] = {0};
        struct epoll_event ev;
        ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = listenfd;
        
        epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
    
        while(1) {
    
            int nready = epoll_wait(epfd, events, EPOLL_SIZE, 5);//nready是返回的监控文件描述符中有响应的描述符个数
            if(nready == -1) {
                continue;
            }
    
            for(int i = 0; i < nready; i++) {
                int clientfd = events[i].data.fd;
                if(clientfd == listenfd) {
                    struct sockaddr_in client;
                    socklen_t len = sizeof(client);
                    if((connfd = accept(listenfd, (struct sockaddr *)&client, &len)) == -1) {
                        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
                        return 0;
                    }
                    printf("accept Success fd=  %d\n", connfd);
    
                    int flag = 0;
                    if ((flag = fcntl(clientfd, F_SETFL, O_NONBLOCK)) < 0) {//设置与客户端通信的socket为非阻塞
                        printf(" fcntl nonblocking failed\n");
                        break;
                    }
    
    
                    ev.events = EPOLLIN | EPOLLET;
                    ev.data.fd = connfd;
                    epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
                }
                else if(events[i].events & EPOLLIN) {
                    n = recv(clientfd, buff, MAXLNE, 0);//因为使用的ET模式,这里其实最好使用while循环将fd的数据全部读到buff中(防止数据多一次读不完的情况)
                    if (n > 0) {
                        buff[n] = '\0';
                        printf("recv msg from client: %s\n", buff);
                        ev.events = EPOLLOUT;//注册EPOLLOUT事件,让服务器发送数据
                        ev.data.fd = clientfd;
                        epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);              
                    } 
                    else if (n == 0) {//当客户端调用close时,会返回n=0
                        ev.events = EPOLLIN;
                        ev.data.fd = clientfd;
                        epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &ev);//删除EPOLL事件
                        close(clientfd);
                    }
    
                }else if(events[i].events & EPOLLOUT) {
                    send(clientfd, buff, n, 0);//将发送来的数据依次返回发送回去
                    ev.events = EPOLLIN | EPOLLET;//重新注册EPOLL事件
                    ev.data.fd = clientfd;
                    epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);
                }
            }
        }
        return 0;
    }
    
    
    

    epoll监控事件描述图:

    Reactor服务器模型

    epoll的Reactor模型监控事件描述图:

    从图中可以看出,reactor多了一个结构体。


    服务进程处理epoll事件也不是单纯的逻辑 *** 作,而是通过对应的事件数组中事件对应的fd指向的回调函数来处理接下来的逻辑 *** 作。


    回想一下普通函数调用的机制:程序调用某函数,函数执行,程序等待,函数将结果和控制权返回给程序,程序继续处理。


    Reactor 释义“反应堆”,是一种事件驱动机制。


    和普通函数调用的不同之处在于:应用程序不是主动的调用某个 API 完成处理,而是恰恰相反,Reactor 逆置了事件处理流程(不再是主动地等事件就绪,而是它提前注册好的回调函数,当有对应事件发生时就调用回调函数),应用程序需要提供相应的接口并注册到 Reactor 上,如果相应的事件发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函
    数”。


    Reactor程序如下:

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    #include 
    #include 
    #include 
    #include 
    
    
    #define BUFFER_LENGTH		4096
    #define MAX_EPOLL_EVENTS	1024
    #define SERVER_PORT			10000
    
    typedef int NCALLBACK(int ,int, void*);
    
    struct ntyevent {
    	int fd;//文件描述符
    	int events;//事件
    	void *arg;//回调函数的参数
    	int (*callback)(int fd, int events, void *arg);
    	
    	int status;
    	char buffer[BUFFER_LENGTH];//缓存区
    	int length;//
    	long last_active;
    };
    
    
    
    struct ntyreactor {//结构体包含一个文件描述符和一个事件指针
    	int epfd;
    	struct ntyevent *events;
    };
    
    
    int recv_cb(int fd, int events, void *arg);
    int send_cb(int fd, int events, void *arg);
    
    
    void nty_event_set(struct ntyevent *ev, int fd, NCALLBACK callback, void *arg) {
    
    	ev->fd = fd;
    	ev->callback = callback;
    	ev->events = 0;
    	ev->arg = arg;
    	ev->last_active = time(NULL);
    
    	return ;
    	
    }
    
    
    int nty_event_add(int epfd, int events, struct ntyevent *ev) {
    
    	struct epoll_event ep_ev = {0, {0}};
    	ep_ev.data.ptr = ev;//设置要处理的事件的指针,含有文件描述符等信息
    	ep_ev.events = ev->events = events;//设置要处理的事件类型
    
    	int op;
    	if (ev->status == 1) {
    		op = EPOLL_CTL_MOD;
    	} else {
    		op = EPOLL_CTL_ADD;
    		ev->status = 1;
    	}
    
    	if (epoll_ctl(epfd, op, ev->fd, &ep_ev) < 0) {//注册epoll事件
    		printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
    		return -1;
    	}
    
    	return 0;
    }
    
    int nty_event_del(int epfd, struct ntyevent *ev) {
    
    	struct epoll_event ep_ev = {0, {0}};
    
    	if (ev->status != 1) {
    		return -1;
    	}
    
    	ep_ev.data.ptr = ev;
    	ev->status = 0;
    	epoll_ctl(epfd, EPOLL_CTL_DEL, ev->fd, &ep_ev);//删除注册epoll事件
    
    	return 0;
    }
    
    int recv_cb(int fd, int events, void *arg) {
    
    	struct ntyreactor *reactor = (struct ntyreactor*)arg;
    	struct ntyevent *ev = reactor->events+fd;
    
    	int len = recv(fd, ev->buffer, 16, 0); //BUFFER_LENGTH   //读取fd的数据到相应的buffer 中,16是指定buffer长度(可以理解乘读取数据最大长度)
    	nty_event_del(reactor->epfd, ev);
    
    	if (len > 0) {
    		
    		ev->length = len;
    		ev->buffer[len] = ';'printf
    
    		("C[%d]:%s\n",, fd) ev->buffer;nty_event_set
    
    		(,ev, fd, send_cb) reactor;//设置send_cb回调函数nty_event_add
    		(,reactor->epfd, EPOLLOUT) ev;//注册EPOLLOUT事件}
    		
    		
    	else if ( ==len 0 )close {
    
    		()ev->fd;printf
    		("[fd=%d] pos[%ld], closed\n",, fd- ev)reactor->events;}
    		 
    	else close {
    
    		()ev->fd;printf
    		("recv[fd=%d] error[%d]:%s\n",, fd, errnostrerror ()errno);}
    		
    	return
    
    	; len}
    int
    
    
    send_cb (int, fdint , eventsvoid * )argstruct {
    
    	ntyreactor * =reactor ( structntyreactor *);argstruct
    	ntyevent * =ev + reactor->events;fdint
    
    	= len send (,fd, ev->buffer, ev->length0 );//发送数据if
    	( 0len > )printf {
    		("send[fd=%d], [%d]%s\n",, fd, len) ev->buffer;nty_event_del
    
    		(,reactor->epfd) ev;//删除epoll注册事件nty_event_set
    		(,ev, fd, recv_cb) reactor;//设置回调函数nty_event_add
    		(,reactor->epfd| EPOLLIN , EPOLLET) ev;//条件epoll注册事件}
    		
    	else close {
    
    		()ev->fd;nty_event_del
    
    		(,reactor->epfd) ev;printf
    		("send[fd=%d] error %s\n",, fdstrerror ()errno);}
    
    	return
    
    	; len}
    int
    
    accept_cb (int, fdint , eventsvoid * )argstruct {
    
    	ntyreactor * =reactor ( structntyreactor *);argif
    	( ==reactor NULL )return - 1;struct
    
    	sockaddr_in ; client_addr=
    	socklen_t len sizeof ()client_addr;int
    
    	; clientfdif
    
    	( (=clientfd accept (,fd( structsockaddr *)&,client_addr& )len)== - 1)//accept绑定已连接队列中的socket {if
    		( !=errno && EAGAIN != errno ) EINTR} {
    			
    		printf
    		("accept: %s\n",strerror ()errno);return
    		- 1;}
    	int
    
    	= i 0 ;do
    	for {
    		
    		( =i 3 ;<i ; MAX_EPOLL_EVENTS++i )//0,1,2是系统固定的文件描述符 {if
    			( [reactor->events]i.==status 0 )// {break
    				;}
    			}
    		if
    		( ==i ) MAX_EPOLL_EVENTSprintf {
    			("%s: max connect limit[%d]\n",__func__ ,) MAX_EPOLL_EVENTS;break
    			;}
    		int
    
    		= flag 0 ;if
    		( (=flag fcntl (,clientfd, F_SETFL) O_NONBLOCK)< 0 )//设置与客户端通信的socket为非阻塞 {printf
    			("%s: fcntl nonblocking failed, %d\n",__func__ ,) MAX_EPOLL_EVENTS;break
    			;}
    		nty_event_set
    
    		(&[reactor->events]clientfd,, clientfd, recv_cb) reactor;//设置当前fd的回调函数等信息  nty_event_add
    		(,reactor->epfd| EPOLLIN , EPOLLET& [reactor->events]clientfd);//添加当前fd的触发信号EPOLLIN | EPOLLET}
    
    	while ( 0);printf
    
    	("new connect [%s:%d][time:%ld], pos[%d]\n",inet_ntoa 
    		(.client_addr)sin_addr,ntohs (.client_addr)sin_port,[ reactor->events]i.,last_active) i;//输出连接的地址:端口:时间:fdreturn
    
    	0 ;}
    
    int
    
    init_sock (short) portint {
    
    	= fd socket (,AF_INET, SOCK_STREAM0 );//创建socket文件描述符fcntl
    	(,fd, F_SETFL) O_NONBLOCK;//设置非阻塞fdstruct
    
    	sockaddr_in ; server_addrmemset
    	(&,server_addr0 ,sizeof ()server_addr);.
    	server_addr=sin_family ; AF_INET.
    	server_addr.sin_addr=s_addr htonl ()INADDR_ANY;.
    	server_addr=sin_port htons ()port;bind
    
    	(,fd( structsockaddr *)&,server_addrsizeof ()server_addr);//绑定端口和IP地址if
    
    	( listen(,fd20 )< 0 )//监听端口 {printf
    		("listen failed : %s\n",strerror ()errno);}
    	return
    
    	; fd}
    int
    
    
    ntyreactor_init (structntyreactor * )reactorif {
    
    	( ==reactor NULL )return - 1;memset
    	(,reactor0 ,sizeof (structntyreactor ));//memset清0=
    
    	reactor->epfd epoll_create (1);//创建epoll,返回文件描述符根节点(epoll是以红黑树存储描述符) if
    	( <=reactor->epfd 0 )printf {
    		("create epfd in %s err %s\n",__func__ ,strerror ()errno);return
    		- 2;}
    	=
    
    	reactor->events ( structntyevent *)malloc(()MAX_EPOLL_EVENTS* sizeof (structntyevent ));//分配存储事件的空间if
    	( ==reactor->events NULL )printf {
    		("create epfd in %s err %s\n",__func__ ,strerror ()errno);close
    		()reactor->epfd;return
    		- 3;}
    	}
    int
    
    ntyreactor_destory (structntyreactor * )reactorclose {
    
    	()reactor->epfd;free
    	()reactor->events;}
    
    int
    
    
    
    ntyreactor_addlistener (structntyreactor * ,reactorint , sockfd* NCALLBACK )acceptorif {
    
    	( ==reactor NULL )return - 1;if
    	( ==reactor->events NULL )return - 1;nty_event_set
    
    	(&[reactor->events]sockfd,, sockfd, acceptor) reactor;//设置回调函数nty_event_add
    	(,reactor->epfd, EPOLLIN& [reactor->events]sockfd);//添加epoll注册事件return
    
    	0 ;}
    int
    
    
    
    ntyreactor_run (structntyreactor * )reactorif {
    	( ==reactor NULL )return - 1;if
    	( <reactor->epfd 0 )return - 1;if
    	( ==reactor->events NULL )return - 1;struct
    	
    	epoll_event [ events+MAX_EPOLL_EVENTS1];int
    	
    	= checkpos 0 ,; iwhile
    
    	( 1)//此处循环是看连接是否超时,采用轮询验证 {long
    
    		= now time (NULL);for
    		( =i 0 ;<i 100 ;++i ,++ checkpos )if {
    			( ==checkpos ) MAX_EPOLL_EVENTS= {
    				checkpos 0 ;}
    			if
    
    			( [reactor->events]checkpos.!=status 1 )continue {
    				;}
    			long
    
    			= duration - now [ reactor->events]checkpos.;last_active//延时时间if
    
    			( 60duration >= )//时间大于60,关闭文件描述符 {close
    				([reactor->events]checkpos.)fd;//关闭文件描述符printf
    				("[fd=%d] timeout\n",[ reactor->events]checkpos.)fd;nty_event_del
    				(,reactor->epfd& [reactor->events]checkpos);//删除epoll注册事件}
    			}
    		int
    
    
    		= nready epoll_wait (,reactor->epfd, events, MAX_EPOLL_EVENTS1000 );//events用于回传要处理的事件if
    		( <nready 0 )printf {
    			("epoll_wait error, exit\n");continue
    			;}
    		for
    
    		( =i 0 ;<i ; nready++i )struct {
    
    			ntyevent * =ev ( structntyevent *)[events]i..data;ptrif
    
    			( ([events]i.&events ) EPOLLIN&& ( &ev->events ) EPOLLIN)//监控epoll事件EPOLLIN {callback
    				ev->(,ev->fd[ events]i.,events) ev->arg;//callback}
    			if
    			( ([events]i.&events ) EPOLLOUT&& ( &ev->events ) EPOLLOUT)//监控epoll事件EPOLLOUT {callback
    				ev->(,ev->fd[ events]i.,events) ev->arg;//callback}
    			}
    			
    		}
    
    	}
    int
    
    main (int, argcchar * [argv])unsigned {
    
    	short = port ; SERVER_PORTif
    	( ==argc 2 )= {
    		port atoi ([argv1]);}
    	int
    
    	= sockfd init_sock ()port;//传入端口号,初始化socket,,返回得到监听的socketfdstruct
    
    	ntyreactor * =reactor ( structntyreactor *)malloc(sizeof(structntyreactor ));//创建ntyreactor结构体对象ntyreactor_init
    	()reactor;//初始化结构体,, 包含一个文件描述符和一个ntyreact类型的事件指针,reactor中epfd是监听红黑树的根节点fdntyreactor_addlistener
    	
    	(,reactor, sockfd) accept_cb;ntyreactor_run
    	()reactor;ntyreactor_destory
    
    	()reactor;close
    	()sockfd;return
    	
    
    	0 ;}
    int
    
    

    从程序可以看出

    = len send (,fd, ev->buffer, ev->length0 );//发送数据if
    	( 0len > )printf {
    		("send[fd=%d], [%d]%s\n",, fd, len) ev->buffer;nty_event_del
    
    		(,reactor->epfd) ev;//删除epoll注册事件nty_event_set
    		(,ev, fd, recv_cb) reactor;//设置回调函数nty_event_add
    		(,reactor->epfd| EPOLLIN , EPOLLET) ev;//条件epoll注册事件
  • 响应快,不必为单个同步事件所阻塞,虽然 Reactor 本身依然是同步的;
  • 发送完数据就删除epoll注册事件,再重新注册。


    如此频繁的增加删除是否浪费CPU资源?
    答:对于同一个socket而言,完成收发至少占用两个树上的位置(不删除的情况)。


    而交替(本文情况)只需要一个。


    因此从这个角度看增加删除 *** 作epoll事件不会更加浪费CPU资源。


    Reactor 模式是编写高性能网络服务器的必备技术之一,它具有如下的优点:

    • 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
    • 可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源;
    • 可复用性,reactor 框架本身与具体事件处理逻辑无关,具有很高的复用性;

    Reactor 模型开发效率上比起直接使用 IO 复用要高,它通常是单线程的,设计目标是希望单线程使用一颗 CPU 的全部资源,但也有附带优点,即每个事件处理中很多时候可以不考虑共享资源的互斥访问。


    可是缺点也是明显的,现在的硬件发展,已经不再遵循摩尔定律,CPU 的频率受制于材料的限制不再有大的提升,而改为是从核数的增加上提升能力,当程序需要使用多核资源时,Reactor 模型就会悲剧。


    网络服务器中还有个Proactor模型也比较流行, proactor 模型最大的特点就是 Proactor 最大的特点是使用异步 I/O(Reactor采用的同步IO)。


    所有的 I/O *** 作都交由系统提供的异步 I/O 接口去执行。


    工作线程仅仅负责业务逻辑。


    在 Proactor 中,用户函数启动一个异步的文件 *** 作。


    同时将这个 *** 作注册到多路复用器上。


    多路复用器并不关心文件是否可读或可写而是关心这个异步读 *** 作是否完成。



    步 *** 作是 *** 作系统完成,用户程序不需要关心。


    多路复用器等待直到有完成通知到来。


    当 *** 作系统完成了读文件 *** 作——将读到的数据复制到了用户先前提供的缓冲区之后,通知多路复用器相关 *** 作已完成。


    多路复用器再调用相应的处理程序,处理数据。



    Proactor 增加了编程的复杂度,但给工作线程带来了更高的效率。


    Proactor 可以在系统态将读写优化,利用 I/O 并行能力,提供一个高性能单线程模型。


    在 windows 上,由于没有 epoll 这样的机制,因此提供了 IOCP 来支持高并发, 由于 *** 作系统做了较好的优化,windows 较常采用 Proactor 的模型利用完成端口来实现服务器。


    在 linux 上,在2.6 内核出现了 aio 接口,但 aio 实际效果并不理想,它的出现,主要是解决 poll 性能不佳的问题,但实际上经过测试,epoll 的性能高于 poll+aio,并且 aio 不能处理 accept,因此 linux 主要还是以 Reactor 模型为主。


    本文的代码源码以及使用可在GitHub下载链接: link.

    欢迎分享,转载请注明来源:内存溢出

    原文地址: http://outofmemory.cn/langs/577995.html

    (0)
    打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
    上一篇 2022-04-11
    下一篇 2022-04-11

    发表评论

    登录后才能评论

    评论列表(0条)

    保存