网络程序需要处理的第三类事件是定时事件,比如定期检测一个客户连接的活动状态。服务器程序通常管理着众多定时事件,因此有效地组织这些定时事件,使之能在预期的时间点被触发且不影响服务器的主要逻辑,对于服务器的性能有着至关重要的影响。为此,我们要将每个定时事件分别封装成定时器,并使用某种容器类数据结构,比如链表、排序链表和时间轮,时间堆将所有定时器串联起来。
不过,在讨论如何组织定时器之前,我们先要介绍定时的方法。定时是指在一段时间之后触发某段代码的机制,我们可以在这段代码中依次处理所有到期的定时器。换言之,定时机制是定时器得以被处理的原动力。Linux 提供了三种定时方法,它们是:
socket选项SO_RCVTIMEO和SO_SNDTIMEO。SIGALRM信号。I/O复用系统调用的超时参数。 socket选项SO_RCVTIMEO和SO_SNDTIMEO
socket选项SO_RCVTIMEO 和SO_SNDTIMEO,它们分别用来设置socket接收数据超时时间和发送数据超时时间。因此,这两个选项仅对与数据接收和发送相关的socket专用系统调用有效,这些系统调用包括send、sendmsg、 recv、 recvmsg、accept 和connect.我们将选项SO_RCVTIMEO和SO_SNDTIMEO对这些系统调用的影响总结于表中。
由表可见,在程序中,我们可以根据系统调用(send、 sendmsg、 recv、recvmsg、accept和connect)的返回值以及errno来判断超时时间是否已到,进而决定是否开始处理定时任务。
#includeSIGALRM信号#include #include #include #include #include #include #include #include #include #include int timeout_connect( const char* ip, int port, int time ) { int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int sockfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( sockfd >= 0 ); struct timeval timeout; timeout.tv_sec = time; timeout.tv_usec = 0; socklen_t len = sizeof( timeout ); ret = setsockopt( sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len ); assert( ret != -1 ); ret = connect( sockfd, ( struct sockaddr* )&address, sizeof( address ) ); if ( ret == -1 ) { if( errno == EINPROGRESS ) //超时对应的错误,成立就可以处理定时任务了 { printf( "connecting timeoutn" ); return -1; } printf( "error occur when connecting to servern" ); return -1; } return sockfd; } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_numbern", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int sockfd = timeout_connect( ip, port, 10 ); if ( sockfd < 0 ) { return 1; } return 0; }
由alarm和setitimer函数设置的实时闹钟一旦超时,将触发SIGALRM信号。因此,我们可以利用该信号的信号处理函数来处理定时任务。但是,如果要处理多个定时任务,我们就需要不断地触发SIGALRM信号,并在其信号处理函数中执行到期的任务。一般而言,SIGALRM信号按照固定的频率生成,即由alarm或setitimer函数设置的定时周期T保持不变。如果某个定时任务的超时时间不是T的整数倍,那么它实际被执行的时间和预期的时间将略有偏差。因此定时周期T反映了定时的精度。
#ifndef LST_TIMER #define LST_TIMER #include#include #define BUFFER_SIZE 64 class util_timer; struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; util_timer *timer; }; class util_timer { public: util_timer() : prev(NULL), next(NULL) {} public: time_t expire; void (*cb_func)(client_data *); client_data *user_data; util_timer *prev; util_timer *next; }; class sort_timer_lst { public: sort_timer_lst() : head(NULL), tail(NULL) {} ~sort_timer_lst() { util_timer *tmp = head; while (tmp) { head = tmp->next; delete tmp; tmp = head; } } void add_timer(util_timer *timer) { if (!timer) { return; } if (!head) { head = tail = timer; return; } if (timer->expire < head->expire) { timer->next = head; head->prev = timer; head = timer; return; } add_timer(timer, head); } void adjust_timer(util_timer *timer) { if (!timer) { return; } util_timer *tmp = timer->next; if (!tmp || (timer->expire < tmp->expire)) { return; } if (timer == head) { head = head->next; head->prev = NULL; timer->next = NULL; add_timer(timer, head); } else { timer->prev->next = timer->next; timer->next->prev = timer->prev; add_timer(timer, timer->next); } } void del_timer(util_timer *timer) { if (!timer) { return; } if ((timer == head) && (timer == tail)) { delete timer; head = NULL; tail = NULL; return; } if (timer == head) { head = head->next; head->prev = NULL; delete timer; return; } if (timer == tail) { tail = tail->prev; tail->next = NULL; delete timer; return; } timer->prev->next = timer->next; timer->next->prev = timer->prev; delete timer; } void tick() { if (!head) { return; } printf("timer tickn"); time_t cur = time(NULL); util_timer *tmp = head; while (tmp) { if (cur < tmp->expire) { break; } tmp->cb_func(tmp->user_data); head = tmp->next; if (head) { head->prev = NULL; } delete tmp; tmp = head; } } private: void add_timer(util_timer *timer, util_timer *lst_head) { util_timer *prev = lst_head; util_timer *tmp = prev->next; while (tmp) { if (timer->expire < tmp->expire) { prev->next = timer; timer->next = tmp; tmp->prev = timer; timer->prev = prev; break; } prev = tmp; tmp = tmp->next; } if (!tmp) { prev->next = timer; timer->prev = prev; timer->next = NULL; tail = timer; } } private: util_timer *head; util_timer *tail; }; #endif
现在我们考虑上述升序定时器链表的实际应用一处理非活动连接。服务器程序通常要定期处理非活动连接:给客户端发一个重连请求,或者关闭该连接,或者其他。Linux在内核中提供了对连接是否处于活动状态的定期检查机制,我们可以通过socket选项KEEPALIVE来激活它。不过使用这种方式将使得应用程序对连接的管理变得复杂。因此,我们可以考虑在应用层实现类似于KEEPALIVE的机制,以管理所有长时间处于非活动状态的连接。比如利用alarm函数周期性地触发SIGALRM信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务一关闭非活动的连接。
#includeI/O复用系统调用的超时参数#include #include #include #include #include #include #include #include #include #include #include #include #include #include "lst_timer.h" #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define TIMESLOT 5 static int pipefd[2]; static sort_timer_lst timer_lst; static int epollfd = 0; int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void sig_handler(int sig) { int save_errno = errno; int msg = sig; send(pipefd[1], (char *)&msg, 1, 0); errno = save_errno; } void addsig(int sig) { struct sigaction sa; memset(&sa, '', sizeof(sa)); sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL) != -1); } void timer_handler() { timer_lst.tick(); alarm(TIMESLOT); } void cb_func(client_data *user_data) { epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); assert(user_data); close(user_data->sockfd); printf("close fd %dn", user_data->sockfd); } int main(int argc, char *argv[]) { if (argc <= 2) { printf("usage: %s ip_address port_numbern", basename(argv[0])); return 1; } const char *ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd); ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); assert(ret != -1); setnonblocking(pipefd[1]); addfd(epollfd, pipefd[0]); // add all the interesting signals here addsig(SIGALRM); addsig(SIGTERM); bool stop_server = false; client_data *users = new client_data[FD_LIMIT]; bool timeout = false; alarm(TIMESLOT); while (!stop_server) { int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if ((number < 0) && (errno != EINTR)) { printf("epoll failuren"); break; } for (int i = 0; i < number; i++) { int sockfd = events[i].data.fd; if (sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength); addfd(epollfd, connfd); users[connfd].address = client_address; users[connfd].sockfd = connfd; util_timer *timer = new util_timer; timer->user_data = &users[connfd]; timer->cb_func = cb_func; time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; users[connfd].timer = timer; timer_lst.add_timer(timer); } else if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) { int sig; char signals[1024]; ret = recv(pipefd[0], signals, sizeof(signals), 0); if (ret == -1) { // handle the error continue; } else if (ret == 0) { continue; } else { for (int i = 0; i < ret; ++i) { switch (signals[i]) { case SIGALRM: { timeout = true; break; } case SIGTERM: { stop_server = true; } } } } } else if (events[i].events & EPOLLIN) { memset(users[sockfd].buf, '', BUFFER_SIZE); ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0); printf("get %d bytes of client data %s from %dn", ret, users[sockfd].buf, sockfd); util_timer *timer = users[sockfd].timer; if (ret < 0) { if (errno != EAGAIN) { cb_func(&users[sockfd]); if (timer) { timer_lst.del_timer(timer); } } } else if (ret == 0) { cb_func(&users[sockfd]); if (timer) { timer_lst.del_timer(timer); } } else { //send( sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0 ); if (timer) { time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; printf("adjust timer oncen"); timer_lst.adjust_timer(timer); } } } else { // others } } if (timeout) { timer_handler(); timeout = false; } } close(listenfd); close(pipefd[1]); close(pipefd[0]); delete[] users; return 0; }
Linux下的3组I/O复用系统调用都带有超时参数,因此它们不仅能统一处理信号和I/O事件,也能统一处理定时事件。 但是由于I/O复用系统调用可能在超时时间到期之前就返回(有I/O事件发生),所以如果我们要利用它们来定时,就需要不断更新定时参数以反映剩余的时间。
#define TIMEOUT 5000 int timeout = TIMEOUT; time_t start = time(NULL); time_t end = time(NULL); while (1) { printf("the timeout is now %d mill-secondsn", timeout); start = time(NULL); int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, timeout); if ((number < 0) && (errno != EINTR)) { printf("epoll failuren"); break; } //如果epo11_ wait成功返回0,则说明超时时间到,此时便可处理定时任务,并重置定时时间 if (number == 0) { // timeout timeout = TIMEOUT; continue; } end = time(NULL); timeout -= (end - start) * 1000; if (timeout <= 0) { // timeout timeout = TIMEOUT; } // handle connections }时间轮
图所示的时间轮内,(实线) 指针指向轮子上的-一个槽(slot)。 它以恒定的速度顺时针转动,每转动一步就指向下一个槽( 虚线指针指向的槽),每次转动称为一个滴答(tick)。一个滴答的时间称为时间轮的槽间隔si (slot interval), 它实际上就是心搏时间。该时间轮共有N个槽,因此它运转一周的时间是N*si每个槽指向一条定时器链表,每条链表上的定时器具有相同的特征:它们的定时时间相差N*si的整数倍。时间轮正是利用这个关系将定时器散列到不同的链表中。假如现在指针指向槽cs,我们要添加一个定时时间为ti的定时器,则该定时器将被插人槽ts (timer slot)对应的链表中:
ts=(cs+(ti/si))%N
基于排序链表的定时器使用唯一的一条链表来管理所有定时器,所以插人 *** 作的效率随着定时器数目的增多而降低。而时间轮使用哈希表的思想,将定时器散列到不同的链表上。这样每条链表上的定时器数目都将明显少于原来的排序链表上的定时器数目,插入 *** 作的效率基本不受定时器数目的影响。
很显然,对时间轮而言,要提高定时精度,就要使si值足够小:要提高执行效率,则要求N值足够大。
图描述的是一种简单的时间轮,因为它只有一个轮子。而复杂的时间轮可能有多个轮子,不同的轮子拥有不同的粒度。相邻的两个轮子,精度高的转一圈, 精度低的仅往前移动一槽,就像水表一样。
#ifndef TIME_WHEEL_TIMER #define TIME_WHEEL_TIMER #include时间堆#include #include #define BUFFER_SIZE 64 class tw_timer; struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; tw_timer *timer; }; class tw_timer { public: tw_timer(int rot, int ts) : next(NULL), prev(NULL), rotation(rot), time_slot(ts) {} public: int rotation; //记录定时器在时间轮转多少圈生效 int time_slot; //记录定时器在时间轮上哪个槽 void (*cb_func)(client_data *); client_data *user_data; tw_timer *next; tw_timer *prev; }; class time_wheel { public: time_wheel() : cur_slot(0) { for (int i = 0; i < N; ++i) { slots[i] = NULL; } } ~time_wheel() { for (int i = 0; i < N; ++i) { tw_timer *tmp = slots[i]; while (tmp) { slots[i] = tmp->next; delete tmp; tmp = slots[i]; } } } tw_timer *add_timer(int timeout) { if (timeout < 0) { return NULL; } int ticks = 0; if (timeout < TI) { ticks = 1; } else { ticks = timeout / TI; } int rotation = ticks / N; int ts = (cur_slot + (ticks % N)) % N; tw_timer *timer = new tw_timer(rotation, ts); if (!slots[ts]) { printf("add timer, rotation is %d, ts is %d, cur_slot is %dn", rotation, ts, cur_slot); slots[ts] = timer; } else { timer->next = slots[ts]; slots[ts]->prev = timer; slots[ts] = timer; } return timer; } void del_timer(tw_timer *timer) { if (!timer) { return; } int ts = timer->time_slot; if (timer == slots[ts]) { slots[ts] = slots[ts]->next; if (slots[ts]) { slots[ts]->prev = NULL; } delete timer; } else { timer->prev->next = timer->next; if (timer->next) { timer->next->prev = timer->prev; } delete timer; } } void tick() { tw_timer *tmp = slots[cur_slot]; printf("current slot is %dn", cur_slot); while (tmp) { printf("tick the timer oncen"); if (tmp->rotation > 0) { tmp->rotation--; tmp = tmp->next; } else { tmp->cb_func(tmp->user_data); if (tmp == slots[cur_slot]) { printf("delete header in cur_slotn"); slots[cur_slot] = tmp->next; delete tmp; if (slots[cur_slot]) { slots[cur_slot]->prev = NULL; } tmp = slots[cur_slot]; } else { tmp->prev->next = tmp->next; if (tmp->next) { tmp->next->prev = tmp->prev; } tw_timer *tmp2 = tmp->next; delete tmp; tmp = tmp2; } } } cur_slot = ++cur_slot % N; } private: static const int N = 60; //槽的个数 static const int SI = 1; //1s转动一次 tw_timer *slots[N]; //时间轮的槽,其中每个元素指向一个定时器无序 链表 int cur_slot; //时间轮的当前槽 }; #endif
前面讨论的定时方案都是以固定的频率调用心搏函数tick,并在其中依次检测到期的定时器,然后执行到期定时器上的回调函数。设计定时器的另外一种思路是:将所有定时器中超时时间最小的一个定时器的超时值作为心搏间隔。这样,一旦心搏函数tick被调用,超时时间最小的定时器必然到期,我们就可以在tick函数中处理该定时器。然后,再次从剩余的定时器中找出超时时间最小的一个,并将这段最小时间设置为下一次心搏间隔。如此反复,就实现了较为精确的定时。
树的基本 *** 作是插人节点和删除节点。对最小堆而言,它们都很简单。为了将一个元素X插人最小堆,我们可以在树的下一个空闲位置创建一个空穴。如果X可以放在空穴中而不破坏堆序,则插人完成。否则就执行上虑 *** 作,即交换空穴和它的父节点_上的元素。不断执行上述过程,直到X可以被放人空穴,则插人 *** 作完成。比如,我们要往图所示的最小堆中插入值为14的元素,则可以按照图 所示的步骤来 *** 作。
关于最小堆的性质可以看这篇文章
#ifndef intIME_HEAP #define intIME_HEAP #include#include #include using std::exception; #define BUFFER_SIZE 64 class heap_timer; struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; heap_timer *timer; }; class heap_timer { public: heap_timer(int delay) { expire = time(NULL) + delay; } public: time_t expire;//定时器生成的绝对时间 void (*cb_func)(client_data *); client_data *user_data; }; class time_heap { public: time_heap(int cap) throw(std::exception) : capacity(cap), cur_size(0) { array = new heap_timer *[capacity]; if (!array) { throw std::exception(); } for (int i = 0; i < capacity; ++i) { array[i] = NULL; } } time_heap(heap_timer **init_array, int size, int capacity) throw(std::exception) : cur_size(size), capacity(capacity) { if (capacity < size) { throw std::exception(); } array = new heap_timer *[capacity]; if (!array) { throw std::exception(); } for (int i = 0; i < capacity; ++i) { array[i] = NULL; } if (size != 0) { for (int i = 0; i < size; ++i) { array[i] = init_array[i]; } for (int i = (cur_size - 1) / 2; i >= 0; --i) { percolate_down(i); } } } ~time_heap() { for (int i = 0; i < cur_size; ++i) { delete array[i]; } delete[] array; } public: void add_timer(heap_timer *timer) throw(std::exception) { if (!timer) { return; } if (cur_size >= capacity) { resize(); } int hole = cur_size++; int parent = 0; for (; hole > 0; hole = parent) { parent = (hole - 1) / 2; if (array[parent]->expire <= timer->expire) { break; } array[hole] = array[parent]; } array[hole] = timer; } void del_timer(heap_timer *timer) { if (!timer) { return; } // lazy delelte timer->cb_func = NULL; } heap_timer *top() const { if (empty()) { return NULL; } return array[0]; } void pop_timer() { if (empty()) { return; } if (array[0]) { delete array[0]; array[0] = array[--cur_size]; percolate_down(0); } } void tick() { heap_timer *tmp = array[0]; time_t cur = time(NULL); while (!empty()) { if (!tmp) { break; } if (tmp->expire > cur) { break; } if (array[0]->cb_func) { array[0]->cb_func(array[0]->user_data); } pop_timer(); tmp = array[0]; } } bool empty() const { return cur_size == 0; } private: void percolate_down(int hole) { heap_timer *temp = array[hole]; int child = 0; for (; ((hole * 2 + 1) <= (cur_size - 1)); hole = child) { child = hole * 2 + 1; if ((child < (cur_size - 1)) && (array[child + 1]->expire < array[child]->expire)) { ++child; } if (array[child]->expire < temp->expire) { array[hole] = array[child]; } else { break; } } array[hole] = temp; } void resize() throw(std::exception) { heap_timer **temp = new heap_timer *[2 * capacity]; for (int i = 0; i < 2 * capacity; ++i) { temp[i] = NULL; } if (!temp) { throw std::exception(); } capacity = 2 * capacity; for (int i = 0; i < cur_size; ++i) { temp[i] = array[i]; } delete[] array; array = temp; } private: heap_timer **array; //堆数组 int capacity; //堆数组容量 int cur_size; //堆数组当前包含元素的个数 }; #endif
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)