在介绍多路复用之前,简单介绍一下socket。
socketsocket是套接字,中文意思是插座,这个词体现在哪里?因为socket有一个fd,对应了一个网络IO,而这个IO由一个五元组(sip,dip,sport,dport,proto(传输层))决定。这个fd与五元组就像是插座的关系。
*** 作系统在接受到网卡通知有数据到来时,会通知相应的进程去接收数据,通知的方法是发出SIGIO信号。在应用UDP的场景中,可以利用SIGIO信号收发数据,而TCP不行,原因在于使用TCP的时候,会有很多“冗余数据”,如三次握手等,而这些数据也会产生SIGIO。
这里将select和poll一起介绍,因为它们很类似,都是通过定期轮询的方式检验IO是否有数据或准备好(可读可写),区别在于select是使用了三个集合来分别检验可读可写和出错状态,poll是将这三个集合整合为一个来检验。这里直接贴出select和poll应用的代码。
先贴出作为服务器的初始设置。
int sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { perror("socket"); return -1; } struct sockaddr_in addr; memset(&addr, 0, sizeof(struct sockaddr_in)); addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = INADDR_ANY; if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) { perror("bind"); return -2; } if (listen(sockfd, 5) < 0) { perror("listen"); return -3; }
使用select
fd_set rfds, rset;//rfds用来存储,rset用于检测 FD_ZERO(&rfds); FD_SET(sockfd, &rfds); int max_fd = sockfd; int i = 0; while (1) { rset = rfds; int nready = select(max_fd+1, &rset, NULL, NULL, NULL); if (nready < 0) { printf("select error : %dn", errno); continue; } if (FD_ISSET(sockfd, &rset)) { //accept struct sockaddr_in client_addr; memset(&client_addr, 0, sizeof(struct sockaddr_in)); socklen_t client_len = sizeof(client_addr); int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len); if (clientfd <= 0) continue; char str[INET_ADDRSTRLEN] = {0}; printf("recvived from %s at port %d, sockfd:%d, clientfd:%dn", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port), sockfd, clientfd); if (max_fd == FD_SETSIZE) { printf("clientfd --> out rangen"); break; } FD_SET(clientfd, &rfds); if (clientfd > max_fd) max_fd = clientfd; printf("sockfd:%d, max_fd:%d, clientfd:%dn", sockfd, max_fd, clientfd); if (--nready == 0) continue; } for (i = sockfd + 1; i <= max_fd; i ++) { if (FD_ISSET(i, &rset)) { char buffer[BUFFER_LENGTH] = {0}; int ret = recv(i, buffer, BUFFER_LENGTH, 0); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { printf("read all data"); } FD_CLR(i, &rfds); close(i); } else if (ret == 0) { printf(" disconnect %dn", i); FD_CLR(i, &rfds); close(i); break; } else { printf("Recv: %s, %d Bytesn", buffer, ret); } if (--nready == 0) break; } } }
这里INET_ADDRSTRLEN以及BUFFER_LENGTH都是宏定义。
下面是poll代码,同样使用宏定义。
struct pollfd fds[POLL_SIZE] = {0}; fds[0].fd = sockfd; fds[0].events = POLLIN; int max_fd = 0;//值最大的fd,用于遍历 int i = 0; for (i = 1; i < POLL_SIZE; i ++) { //初始化 fds[i].fd = -1; fds[i].events=POLLIN; } while (1) { int nready = poll(fds, max_fd+1, 5); if (nready <= 0) continue; if ((fds[0].revents & POLLIN) == POLLIN) //监听fd有数据,说明有新连接 { struct sockaddr_in client_addr; memset(&client_addr, 0, sizeof(struct sockaddr_in)); socklen_t client_len = sizeof(client_addr); int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len); if (clientfd <= 0) continue; char str[INET_ADDRSTRLEN] = {0}; printf("recvived from %s at port %d, sockfd:%d, clientfd:%dn",inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port), sockfd, clientfd); fds[clientfd].fd = clientfd; fds[clientfd].events = POLLIN; if (clientfd > max_fd) max_fd = clientfd; if (--nready == 0) continue; } for (i = sockfd + 1; i <= max_fd; i ++) { if (fds[i].revents & (POLLIN|POLLERR)) { char buffer[BUFFER_LENGTH] = {0}; int ret = recv(i, buffer, BUFFER_LENGTH, 0); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { printf("read all data"); } //close(i); fds[i].fd = -1; } else if (ret == 0) { printf(" disconnect %dn", i); close(i); fds[i].fd = -1; break; } else { printf("Recv: %s, %d Bytesn", buffer, ret); } if (--nready == 0) break; } } }
这里还要提示一下,select和poll函数的阻塞,是阻塞在函数内部。另外,在连接断开之后,要及时把集合中对应的已断开fd删去,避免浪费空间。
epollepoll与select/poll不同,内部用红黑树维护。这里有一点先说明,epoll不是任何时候效率都比select/poll高,当IO数少时,select和poll的效率反而更高。
int epoll_fd = epoll_create(EPOLL_SIZE); struct epoll_event ev, events[EPOLL_SIZE] = {0}; ev.events = EPOLLIN; ev.data.fd = sockfd; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev); while (1) { int nready = epoll_wait(epoll_fd, events, EPOLL_SIZE, -1); if (nready == -1) { printf("epoll_waitn"); break; } int i = 0; for (i = 0;i < nready;i ++) { if (events[i].data.fd == sockfd) { struct sockaddr_in client_addr; memset(&client_addr, 0, sizeof(struct sockaddr_in)); socklen_t client_len = sizeof(client_addr); int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len); if (clientfd <= 0) continue; char str[INET_ADDRSTRLEN] = {0}; printf("recvived from %s at port %d, sockfd:%d, clientfd:%dn", inet_ntop(AF_INET, &client_addr.sin_addr, str, sizeof(str)),ntohs(client_addr.sin_port), sockfd, clientfd); ev.events = EPOLLIN | EPOLLET; ev.data.fd = clientfd; epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev); } else { int clientfd = events[i].data.fd; char buffer[BUFFER_LENGTH] = {0}; int ret = recv(clientfd, buffer, BUFFER_LENGTH, 0); if (ret < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { printf("read all data"); } close(clientfd); ev.events = EPOLLIN | EPOLLET; ev.data.fd = clientfd; epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev); } else if (ret == 0) { printf(" disconnect %dn", clientfd); close(clientfd); ev.events = EPOLLIN | EPOLLET; ev.data.fd = clientfd; epoll_ctl(epoll_fd, EPOLL_CTL_DEL, clientfd, &ev); break; } else { printf("Recv: %s, %d Bytesn", buffer, ret); } } } }
有几点需要说明一下。一是sockfd没必要设为非阻塞,因为sockfd是否阻塞与epoll没关系。虽然设为非阻塞会快一点,但是那是在epoll检测完成之后,recv/send的过程时由于非阻塞快了一点。第二,epoll_wait()的阻塞相当于是带时间的条件等待,时间到或是条件满足(即检测到有IO)就返回。第三,events数组的值没必要设置得很大,经验值是连接数的1%即可。第四,注意要及时close(),否则服务端会出现大量CLOSE_WAIT。同时也要记得及时从epoll红黑树结构中及时删除节点,避免epoll中存在大量僵尸节点。
最后,还要说明一下水平触发和边沿触发。epoll默认水平触发。边沿触发是当socket接收到数据时会触发一次epoll_wait(),之后不管数据是否读完都不会再触发epoll_wait();水平触发则是socket中只要有数据就一直触发。在实际应用中,对于大数据一般用水平触发,小数据用边沿触发。这里还有两点要注意。一是虽然ET+循环读也可以做到LT的效果,但是对于大数据读写,一直循环会导致无法响应其他IO。另外,对于listenfd,它适用LT,这样可以避免多个连接同时到达而漏掉部分IO的问题,当然也可以ET+循环,但不建议。
当然,这里epoll代码还有可改进的地方,例如改成reactor写法,这里不再介绍。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)