libhv学习路线 之 IO复用

libhv学习路线 之 IO复用,第1张

目录

一 I/O事件的创建

二 I/O事件的执行

三 I/O事件执行顺序


        以epoll为例,libhv在LINUX *** 作系统使用的是基于epoll的I/O复用.

         对于高性能服务器中,I/O复用事件的出现使得服务器能够同时监听多个端口,同时处理多个TCP,UDP请求;客户端也可以同时处理多个用户输入和网络连接,甚至可以处理非阻塞class="superseo">connect()(如果第一次调用非阻塞connect连接没有立即建立返回,则调用select()监听socket上面的写事件,如果存在写事件再判断错误码是否是0,如果为0,则表示连接成功建立).

一 I/O事件的创建

        first.调用hio_get()函数创建I/O事件(hio_t变量):

        1.首先判断loop回环事件中io事件数组ios(类比于loop->idles和loop->timers)的最大长度是否大于文件描述符fd大小,如果小于则将数组的最大长度扩大以可以容纳文件描述符fd;

        2.创建(存在则获取)该文件描述符所对应的I/O事件(hio_t变量),给该事件的事件类型设置为I/O事件,该事件对应的文件描述符为fd,再将其存于ios数组中,有则不用存取.

        3.设置该I/O事件准备完毕(设置io->ready=1,io->closed=0,io->close=0).

hio_t* hio_get(hloop_t* loop, int fd) {
    if (fd >= loop->ios.maxsize) {
        int newsize = ceil2e(fd);//向上取整,取得整数都是2的倍数,如,2,4,8等
        io_array_resize(&loop->ios, newsize > fd ? newsize : 2*fd);//判断向上取整后是否能够容纳文件描述符,如果不可以则直接将最大长度设为文件描述符值的两倍,然后设置ios数组的最大长度为该最大长度
    }

    hio_t* io = loop->ios.ptr[fd];//从ios数组中取得该文件描述符所对应的I/O事件
    if (io == NULL) {//如果该数组中不存在该描述符,则重新创建一个描述符
        HV_ALLOC_SIZEOF(io);
        hio_init(io);
        io->event_type = HEVENT_TYPE_IO;
        io->loop = loop;
        io->fd = fd;
        loop->ios.ptr[fd] = io;
    }

    if (!io->ready) {
        hio_ready(io);
    }

    return io;
}

        second.设置该I/O事件的回调函数,不同的I/O事件的回调函数都不一样,比如accept事件,connect事件,read事件,write事件,close事件的设置回调函数的函数都不一样,分别为:

void hio_setcb_accept(hio_t* io, haccept_cb accept_cb) {
    io->accept_cb = accept_cb;
}

void hio_setcb_connect(hio_t* io, hconnect_cb connect_cb) {
    io->connect_cb = connect_cb;
}

void hio_setcb_read(hio_t* io, hread_cb read_cb) {
    io->read_cb = read_cb;
}

void hio_setcb_write(hio_t* io, hwrite_cb write_cb) {
    io->write_cb = write_cb;
}

void hio_setcb_close(hio_t* io, hclose_cb close_cb) {
    io->close_cb = close_cb;
}

        third.将该I/O事件(hio_t变量)初始化为以上所对应的各种事件,以下我将以read事件为例:

        1.首先判断该I/O事件是否已经关闭,如果关闭则直接返回,不进行读取事件;

        2.否则调用hio_add()函数设置该I/O事件为读事件;

int hio_read (hio_t* io) {
    if (io->closed) {
        hloge("hio_read called but fd[%d] already closed!", io->fd);
        return -1;}
    hio_add(io, hio_handle_events, HV_READ);
    if (io->readbuf.tail > io->readbuf.head &&
        io->unpack_setting == NULL &&
        io->read_flags == 0) {
        hio_read_remain(io);
    }
    return 0;
}

        对于hio_add()函数,传入参数分别为I/O事件(hio_t类型变量),I/O事件回调函数cb(与之前用户定义的回调函数不一样,不过用户定义的回调函数也是由该回调函数cb进行间接调用的)以及I/O事件类型events,定义如下:

        1.由于开始为该I/O事件分配对应的事件(read,write等),则可以调用EVENT_ADD()函数将该I/O事件激活;

        2.调用hio_ready()函数将该I/O事件的ready参数置1(该步已经在之前的hio_get()函数中已经执行过了);

        3.设置该I/O事件的回调函数cb;

        4.判断要加入的read事件在该I/O已有事件中是否存在,不存在则添加进去,即调用iowatcher_add_event()函数(以epoll为例,iowatcher_add_event()函数调用epoll_create()函数创建一个事件表描述符,使用loop->iowatcher指向该事件文件描述符,调用epoll_ctl()函数注册该I/O文件描述符io->fd下面的events事件).

int hio_add(hio_t* io, hio_cb cb, int events) {
    printd("hio_add fd=%d io->events=%d events=%d\n", io->fd, io->events, events);
#ifdef OS_WIN
    // Windows iowatcher not work on stdio
    if (io->fd < 3) return -1;
#endif
    hloop_t* loop = io->loop;
    if (!io->active) {
        EVENT_ADD(loop, io, cb);
        loop->nios++;
    }

    if (!io->ready) {
        hio_ready(io);
    }

    if (cb) {
        io->cb = (hevent_cb)cb;
    }

    if (!(io->events & events)) {
        iowatcher_add_event(loop, io->fd, events);
        io->events |= events;
    }
    return 0;
}
二 I/O事件的执行

        对于任何事件的执行,都是要调用hloop_process_events()函数间接执行的,对于I/O事件在该函数的相关执行代码如下所示,通过如下程序我们可以得知,I/O事件的执行函数hloop_process_ios()也是由hloop_process_events()函数间接调用的:

if (loop->nios) {
        nios = hloop_process_ios(loop, blocktime);
}

        接下来简单分析以下hloop_process_ios()函数都做了些什么工作,通过如下程序我们可以看到,该按树还是在里面间接调用了iowatcher_poll_events()函数:

static int hloop_process_ios(hloop_t* loop, int timeout) {
    // That is to call IO multiplexing function such as select, poll, epoll, etc.
    int nevents = iowatcher_poll_events(loop, timeout);
    if (nevents < 0) {
        hlogd("poll_events error=%d", -nevents);
    }
    return nevents < 0 ? 0 : nevents;
}

        所以我们再来看一下该函数内部是什么样的(在此我们以epoll为例,该方式是在LINUX *** 作系统下使用的):

        1.调用epoll_wait()函数阻塞监听该事件描述符下面的事件,如果存在I/O事件,则返回触发的I/O事件个数;

        2.循环判断这些事件都属于哪些类型的事件(读或者写);

        3.将这些事件加入待处理事件集合中.       

int iowatcher_poll_events(hloop_t* loop, int timeout) {
    epoll_ctx_t* epoll_ctx = (epoll_ctx_t*)loop->iowatcher;
    if (epoll_ctx == NULL)  return 0;
    if (epoll_ctx->events.size == 0) return 0;
    int nepoll = epoll_wait(epoll_ctx->epfd, epoll_ctx->events.ptr, epoll_ctx->events.size, timeout);
    if (nepoll < 0) {
        if (errno == EINTR) {
            return 0;
        }
        perror("epoll");
        return nepoll;
    }
    if (nepoll == 0) return 0;
    int nevents = 0;
    for (int i = 0; i < epoll_ctx->events.size; ++i) {
        struct epoll_event* ee = epoll_ctx->events.ptr + i;
        int fd = ee->data.fd;
        uint32_t revents = ee->events;
        if (revents) {
            ++nevents;
            hio_t* io = loop->ios.ptr[fd];
            if (io) {
                if (revents & (EPOLLIN | EPOLLHUP | EPOLLERR)) {
                    io->revents |= HV_READ;
                }
                if (revents & (EPOLLOUT | EPOLLHUP | EPOLLERR)) {
                    io->revents |= HV_WRITE;
                }
                EVENT_PENDING(io);
            }
        }
        if (nevents == nepoll) break;
    }
    return nevents;
}

        最后,在hloop_process_events()函数的末尾调用hloop_process_pendings()函数对待处理事件集合中的所有事件进行处理(对于I/O事件,则执行响应回调函数).

        接下来,我们再看看这些回调函数都是怎么执行的:

        之前我们说过两种回调函数,一种是用户定义的回调函数,一种是hio_handle_events()函数,我们首先来看hio_handle_events()函数:

        1.这个函数首先会判断是哪一种类型的I/O事件,如果是读事件且发生了,进一步判断io->accept是否被置位,如果置位,则表示该I/O事件所对应的文件描述符属于监听socket,调用nio_accept()来处理accept事件;如果没有被置位,说明该描述符指的是连接socket,调用nio_read()函数来读取客户端发来的数据;

        2.如果该I/O事件是写事件且发生了,则判断io->connect是否被置位,如果置位了,说明该socket想要与服务器建立连接,调用nio_connect();如果未被置位,说明该socket要与建立连接的服务器发送数据nio_write().

static void hio_handle_events(hio_t* io) {
    if ((io->events & HV_READ) && (io->revents & HV_READ)) {
        if (io->accept) {
            nio_accept(io);
        }
        else {
            nio_read(io);
        }
    }

    if ((io->events & HV_WRITE) && (io->revents & HV_WRITE)) {
        // NOTE: del HV_WRITE, if write_queue empty
        hrecursive_mutex_lock(&io->write_mutex);
        if (write_queue_empty(&io->write_queue)) {
            iowatcher_del_event(io->loop, io->fd, HV_WRITE);
            io->events &= ~HV_WRITE;
        }
        hrecursive_mutex_unlock(&io->write_mutex);
        if (io->connect) {
            // NOTE: connect just do once
            // ONESHOT
            io->connect = 0;

            nio_connect(io);
        }
        else {
            nio_write(io);
        }
    }

    io->revents = 0;
}

        接下来以nio_read()函数为例,讲解该函数是做了哪些工作:

        1.该函数首先调用__nio_read()函数(该函数主要是在其内部判断是TCP协议还是UDP协议,然后调用响应的读取函数,再将读取函数的返回作为该函数的返回值返回出来)读取I/O数据.

        2.判断返回值如果小于零,看能有两个方面的原因:a.对非阻塞socket调用阻塞读取函数,如果暂时没有数据会返回小于零的值,且返回错误EAGAIN;b.其他错误.如果返回的是0,说明客户端断开连接.如果返回的值大于0,说明读取到数据.

        3.根据返回的值,如果小于0且错误码不是EAGAIN或者等于0,关闭socket;否则不关闭socket.并且执行用户回调函数(由__read_cb()函数间接调用).

static void nio_read(hio_t* io) {
    // printd("nio_read fd=%d\n", io->fd);
    void* buf;
    int len = 0, nread = 0, err = 0;
read:
    buf = io->readbuf.base + io->readbuf.tail;
    if (io->read_flags & HIO_READ_UNTIL_LENGTH) {
        len = io->read_until_length - (io->readbuf.tail - io->readbuf.head);
    } else {
        len = io->readbuf.len - io->readbuf.tail;
    }
    assert(len > 0);
    nread = __nio_read(io, buf, len);
    // printd("read retval=%d\n", nread);
    if (nread < 0) {
        err = socket_errno();
        if (err == EAGAIN) {
            // goto read_done;
            return;
        } else if (err == EMSGSIZE) {
            // ignore
            return;
        } else {
            // perror("read");
            io->error = err;
            goto read_error;
        }
    }
    if (nread == 0) {
        goto disconnect;
    }
    io->readbuf.tail += nread;
    __read_cb(io, buf, nread);
    // if (nread == len) goto read;
    return;
read_error:
disconnect:
    hio_close(io);
}
三 I/O事件执行顺序

       1.首先调用hio_get()函数获取I/O事件;

        2.绑定用户回调函数;

        3.调用hio_accept(),hio_connect(),hio_read()或者io_write()对该I/O事件所对应的文件描述符赋予响应的事件,并且利用epoll注册相应的事件表.

        4.调用hloop_process_events()函数间接阻塞等待响应的I/O事件,并且将其绑定至待处理事件队列之中,从而处理相应的事件.

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/993142.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存