IO多路复用机制——Select

IO多路复用机制——Select,第1张

IO多路复用机制——Seleclass="superseo">ct 服务器端使用select机制监听可读的文件描述符(客户端)的一般流程如下:下面将说下 如何将select来实现多并发的双向通信:
select函数族如下:

服务器端使用select机制监听可读的文件描述符(客户端)的一般流程如下: 执行fd_set myset; 实例化一个fd_set对象。FD_ZERO(&myset); 把myset的所有位置为0,例如 0000 0000.若新连接的客户端的文件描述符为2,用一个变量fd来记录该文件描述符,执行FD_SET(fd , &myset); 使myset的从右往左的第三位置为1,因此myset变为 0010 0000.若此时又有两个新客户端连接,例如fd=3、fd=5,则再次调用FD_SET函数使myset变为 0011 0100.执行select(6 , &myset ,NULL, NULL , 0); 第一个参数是此时的最大文件描述符再加1,因为我这里步骤4新增的客户端种fd最大为5,所以第一个参数应该填为6;第二个参数是表示用去监听可读文件描述符的fd_set对象,这里就是myset;第三个参数是监听可写文件描述符的fd_set对象,因为我这里只监听可读的文件描述符,所以这里填NULL;第四个参数是监听异常的,这里也不使用,所以填NULL;第五个参数是阻塞的时间,填0表示永久阻塞,知道有某个文件描述符可读,如果不想永久阻塞,可以填入一个stuct timeval 对象。执行完select后,myset的所有位被置为0。此时如果只有客户端fd=2发生了可读事件,则第五步中select的返回值大于0,同时myset从左往右第三位被重新置为1( 0010 0000);如果没有客户端文件描述符发生可读事件,同时又阻塞的时间到,则返回值为0;如果返回值小于0,则表示发生了错误。select返回之后,最后调用FD_ISSET( 2 , &myset); 来判断fd=2 是否可读,也就是判断myset的第三位是否被置为1。如果fd=2可读(也就是myset的第三位被置为1),则 FD_ISSET返回值为 非零,否则则返回0。至此select的使用流程结束。 下面将说下 如何将select来实现多并发的双向通信:

1. 首先主要的思想是启动三个线程。
2. 第一个线程用select阻塞并监听客户端的fd,判断该客户端是否可读。
3. 第二个线程从标准输入获取要发送的信息,并将信息发送给各个客户端。
4. 第三个线程是主线程,用来accpet阻塞,时刻监听是否有客户端发起tcp连接请求。

代码如下:

#include 
#include 
#include 
#include  //POSIX  *** 作系统API
#include 
#include 
#include 
#include 
#include 
#include 

using namespace std;

pthread_mutex_t mutex;             //锁
unordered_set<int> Client_Fd_Set;  //存放客户端文件描述符的SET

time_t t = time(0);          //记录发送和接收数据的时间


[[noreturn]] void *recv_thread(void *arg) //接收线程
{
    struct timeval tv;
    tv.tv_sec=10;
    tv.tv_usec=0;
    while(1)
    {
        pthread_mutex_lock(&mutex);
        while(Client_Fd_Set.size()==0);
        fd_set Fd_set;
        FD_ZERO(&Fd_set);
        int max_fd=0;
        unordered_set<int>::iterator it;
        for(it = Client_Fd_Set.begin() ; it!=Client_Fd_Set.end() ;++it)
        {
            FD_SET(*it,&Fd_set);
            max_fd=max(max_fd,*it);
        }
        int res = select(max_fd+1,&Fd_set , NULL ,NULL,&tv);
        if(res<0)
        {
            perror("select error");
        }
        else if(res==0)
        {
            cout<<"no new message"<<endl;
        }
        else
        {
            char rx_buf[1024];
            cout<<"have new message"<<endl;
            for(it=Client_Fd_Set.begin() ; it!=Client_Fd_Set.end() ; ++it)
            {
                if( FD_ISSET(*it,&Fd_set)!=0)
                {
                    memset(rx_buf,0,sizeof(rx_buf));
                    int len = recv(*it,rx_buf,sizeof(rx_buf),0);//最后一个参数可改为MSG_WAITALL,会导致一直阻塞直到读取到指定字节
                    if(len<=0){
                        cout<<"len:"<<len<<endl;
                        close(*it);
                        Client_Fd_Set.erase(*it);
                        close(*(int*)arg);
                        exit(0);
                    }
                    t = time(0);
                    struct tm *timemsg = localtime(&t);
                    string time;
                    time+="(";time+= to_string(timemsg->tm_hour);time+=':';time+=to_string(timemsg->tm_min);time+=':';time+=to_string(timemsg->tm_sec);time+=")";
                    cout<<time<<"recv from "<<*it<<":"<<rx_buf<<endl;
                    if(strcmp(rx_buf,"end")==0)
                    {
                        close(*it);
                        cout<<"Close Client fd:"<<*it<<endl;
                        Client_Fd_Set.erase(*it);
//                        close(*(int*)arg);
                        cout<<"recv_thread exit"<<endl;
                        pthread_exit(0);
                    }
                    //数据回传,将接收到的数据回传给客户端,有需要的话把0改成1
                    #if 0
                    send(*it,rx_buf,len ,0);
                    cout<<time<<"send to "<<*it<<":"<<rx_buf<<endl;
                    #endif
                }
            }
        }
        pthread_mutex_unlock(&mutex);
        sleep(0.01);
    }
}

[[noreturn]] void *send_thread(void *arg)  //发送线程
{
    while(true)
    {
        string temp_buf;
        getline(cin,temp_buf);
        if(temp_buf=="end")             //当标准输入 输入end时,关闭嵌套字,导致主线程里会跳出while
        {
            cout<<"send_thread exit"<<endl;
            close(*(int *)arg);
            pthread_exit(0);
        }
        unordered_set<int>::iterator it;
        for(it = Client_Fd_Set.begin();it!=Client_Fd_Set.end();++it) //这里是往所有客户端发信息,可以再优化
        {
            char *send_buf;
            send_buf = (char *)temp_buf.data();
            send(*it,send_buf,strlen(send_buf) ,0);

            t = time(0);
            struct tm *timemsg = localtime(&t);
            string time;
            time+="(";time+= to_string(timemsg->tm_hour);time+=':';time+=to_string(timemsg->tm_min);time+=':';time+=to_string(timemsg->tm_sec);time+=")";
            cout<<time<<"send to "<<*it<<":"<<send_buf<<endl;
        }
        sleep(0.01);
    }
}


int main() {
    pthread_t thread_write,thread_read;
    std::cout << "Server start!" << std::endl;
    int server_fd = socket(AF_INET, SOCK_STREAM , 0);
    std::cout << "Server fd="<<server_fd << std::endl;
    struct sockaddr_in server_addr , client_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(995);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 字符串转网络字节序
//    server_addr.sin_addr.s_addr = inet_addr("0,0,0,0"); 跟上面一句等价
    if(-1 == bind(server_fd,(struct  sockaddr*)&server_addr ,sizeof(server_addr)))
    {
        perror("bind error");
        exit(1);
    }

    if(-1 == listen(server_fd,20))
    {
        perror("listen error");
        exit(1);
    }
    pthread_mutex_init(&mutex,NULL);
    pthread_create(&thread_write ,NULL , send_thread ,&server_fd);
    pthread_detach(thread_write);
    pthread_create(&thread_read ,NULL , recv_thread ,&server_fd);
    pthread_detach(thread_read);

    socklen_t client_addr_len = sizeof(client_addr);
    while(true)
    {
        int client_fd = accept(server_fd,(struct sockaddr*)&client_addr ,&client_addr_len);
        if(client_fd < 0 ){
            perror("accept error");
            break;
        }
        cout<<"New Client fd="<<client_fd<<endl;
        pthread_mutex_lock(&mutex);
        Client_Fd_Set.insert(client_fd);
        pthread_mutex_unlock(&mutex);
    }

    unordered_set<int>::iterator  it;
    for(it = Client_Fd_Set.begin() ; it!=Client_Fd_Set.end();++it)
    {
        close(*it);
    }
    close(server_fd);
    cout<<"main_thread exit"<<endl;
    exit(0);
    return 0;
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/995092.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存