协程的基本原理

协程的基本原理,第1张

协程是用同步的编程方式达到异步的性能的框架。


现在较为成熟的框架有go语言实现的libgo和C++实现的libco。



同步为什么性能低?因为检测IO与读写IO在一个流程,当IO未就绪时需要进行等待,而异步的话由于不在一个流程,所以不需要等待。


但是,异步的流程不符合人的思维,而同步的好处就是逻辑清晰。


这里专门提示一下,mysql的连接池是同步 *** 作。


协程的实现

协程的实现主要是yield()让出CPU和resume()恢复运行。


有三种方式:
1、longjmp和setjmp,这个之前讲过,不再赘述。



2、linux提供的ucontext。



3、用汇编语言自己实现跳转。


相比1、2方法而言,此方法可读性较强。


利用switch()原语。


yield=switch(a,b);
resume=switch(a,b);

切换_switch()函数定义

int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);

参数 1:即将运行协程的上下文,是寄存器列表。



参数 2:正在运行协程的上下文,是寄存器列表。



其实协程的切换就是将CPU中各寄存器的值(上下文)进行更换,将正在运行中的CPU寄存器中的值保存到内存中,再将要运行的协程对应的寄存器值从内存拷贝到寄存器中。



我们 nty_cpu_ctx 结构体的定义,为了兼容 x86,结构体项命令采用的是 x86 的寄存器名字命名。


typedef struct _nty_cpu_ctx 
{
    void *esp;
    void *ebp;
    void *eip;
    void *edi;
    void *esi;
    void *ebx;
    void *r1;
    void *r2;
    void *r3;
    void *r4;
    void *r5;
} nty_cpu_ctx;

这里介绍两个寄存器。



eip指向入口函数func,协程根据这一指针执行任务,函数的参数放到对应位置即可。



esp栈指针,指向栈空间的首地址。



_switch 返回后,执行即将运行协程的上下文,实现了上下文的切换。


几个问题

1、还是一个CPU在运行,效率为什么提高了?
因为减少了阻塞等待的时间,使得性能与异步差不多。



2、如何知道IO已经就绪?使用epoll

epoll_ctl(add);
yield();
epoll_ctl(del);

在需要进行IO *** 作的地方执行yield()让出CPU,但是在让出之前先将fd加入epoll进行监听,当IO就绪后恢复运行,注意需要把epoll中的该节点删除。



3、与reactor的对比。


reactor是一种回调的方式,是异步 *** 作,没有协程直白,性能都差不多。


当然,reactor自己实现就可以,而协程却需要依赖库。



4、有栈协程与无栈协程。


区别在于有没有自己独立的栈空间。



有栈协程,每个协程都有自己独立的栈空间。



无栈协程,所有协程共享栈空间,需要计算某一个协程在栈的某个位置。



不推荐使用无栈协程,因为栈的管理复杂,实现复杂。


虽然无栈协程内存利用率较高,但是有栈协程的效率更高且实现容易,而且内存利用率也没有低多少,还可以根据需求自己定义栈空间的大小。


比如,如果服务器时传输文件或是较大的数据,栈开得大一些,1M,甚至10M都可以;如果只是单独的接收一些字符数据,就可以开小一些,1K、4K都可以。


所以,两者性能、利用率其实差不多,建议使用有栈协程。



5、协程相比于多线程的长处。


协程可读性强,实现相对简单。


对于多线程而言,如果同时 *** 作同一个fd,会很复杂,例如加锁之类。


因为如果一个线程正在用这个fd发送数据,而另一个线程直接关闭了这个fd,会出错。



6、协程一直不让出怎么办?
协程一直不让出,说明协程一直在进行处理的是一个计算密集型的任务。


协程是为了解决IO等待时挂起问题的框架,如果一直不让出,说明没有这种IO *** 作,此时用协程的意义就不大,不建议使用协程,可以用rpc。



7、协程的应用
一是文件 *** 作,如日志落盘。



二是对mysql等数据库的 *** 作。



三是网络IO。


协程的定义

一个协程处理一个IO,这样实现较为容易。


当然也可以处理多个IO,但是实现复杂。


typedef struct _nty_coroutine 
{
    nty_cpu_ctx ctx;  //协程的上下文
    proc_coroutine func;  //协程的入口函数
    void *arg;  //入口函数的参数
    size_t stack_size;  //栈空间大小
    nty_coroutine_status status;  //协程的状态
    nty_schedule *sched;  //协程调度器
    uint64_t id;  //协程id
    void *stack;  //栈指针
    RB_ENTRY(_nty_coroutine) sleep_node;  //sleep集合
    RB_ENTRY(_nty_coroutine) wait_node;  //wait集合
    TAILQ_ENTRY(_nty_coroutine) ready_next;  //就绪集合
} nty_coroutine;

协程创建完成后,加入到就绪集合,等待调度器的调度。


协程在运行完成后,进行 IO *** 作,此时 IO 并未准备好,进入等待状态集合。


IO 准备就绪,协程开始运行,后续进行 sleep *** 作,此时进入到睡眠状态集合。



就绪(ready),睡眠(sleep),等待(wait)集合该采用如何数据结构来存储?
就绪(ready)集合并不没有设置优先级的选型,所有在协程优先级一致,所以可以使用队列来存储就绪的协程,简称为就绪队列(ready_queue)。



睡眠(sleep)集合需要按照睡眠时长进行排序,采用红黑树来存储,简称睡眠树(sleep_tree)红黑树在工程实用为, key 为睡眠时长后的时间戳,value 为对应的协程结点。



等待(wait)集合,其功能是在等待 IO 准备就绪,等待 IO 也是有时长的,所以等待(wait)集合采用红黑树的来存储,简称等待树(wait_tree),此处借鉴 nginx 的设计。


协程的启动

1、查看调度器是否存在,不存在则创建。


调度器作为全局的单例,将调度器的实例存储在线程的私有空间 pthread_setspecific。



2、分配一个 coroutine 的内存空间,分别设置 coroutine 的数据项,栈空间,栈大小,初始状态,创建时间,子过程回调函数,子过程的调用参数。



3、将新分配协程添加到就绪队列 ready_queue 中。


int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) 
{

    assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
    nty_schedule *sched = nty_coroutine_get_sched();
    if (sched == NULL) 
    {
        nty_schedule_create(0);
        sched = nty_coroutine_get_sched();
        if (sched == NULL) 
        {
            printf("Failed to create scheduler\n");
            return -1;
        }
    }
    nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
    if (co == NULL) 
    {
        printf("Failed to allocate memory for new coroutine\n");
        return -2;
    }
    int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
    if (ret) 
    {
        printf("Failed to allocate stack for new coroutine\n");
        free(co);
        return -3;
    }
    co->sched = sched;
    co->stack_size = sched->stack_size;
    co->status = BIT(NTY_COROUTINE_STATUS_NEW); 
    co->id = sched->spawned_coroutines ++;
    co->func = func;
    co->fd = -1;
    co->events = 0;
    co->arg = arg;
    co->birth = nty_coroutine_usec_now();
    *new_co = co;
    TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);  //尾插
    return 0;
}
协程的调度

先给出调度器的定义

typedef struct _nty_schedule 
{
    nty_cpu_ctx ctx;  //当前协程的上下文
    struct _nty_coroutine *curr_thread;  //当前运行的协程。


yield()和resume()的参数由此而来 int page_size; int poller_fd; int eventfd; //所有协程共用一个epoll,由调度器管理 struct epoll_event eventlist[NTY_CO_MAX_EVENTS]; int nevents; int num_new_events; nty_coroutine_queue ready; //就绪队列 nty_coroutine_rbtree_sleep sleeping; //sleep集合 nty_coroutine_rbtree_wait waiting; //wait集合 } nty_schedule;

sleep集合中如果key相同,可以给后加入的节点key值稍加一点点(如1毫秒),这样就可以做到红黑树key值唯一,且没有太大影响,因为一般协程的实时性要求不会那么高。


当然,也可以不对key进行处理,直接插在相同key值节点的右子树上也可以。


协程的运行

简单介绍一下运行过程,调度器遍历各个集合,从中找出IO已就绪的协程进行resume(),该协程在运行中,IO如果未准备好,再yield()让回到调度器。


所以,协程在运行的过程中,大量的时间是运行在调度器上的。


while (1) 
{
    //遍历睡眠集合,使用 resume 恢复 expired 的协程运行权
    nty_coroutine *expired = NULL;
    while ((expired = sleep_tree_expired(sched)) != 0) 
    {
        resume(expired);
    }
    //遍历等待集合,使用 resume 恢复 wait 的协程运行权
    nty_coroutine *wait = NULL;
    int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
    for (i = 0;i < nready;i ++) 
    {
        wait = wait_tree_search(events[i].data.fd);
        resume(wait);
    }
    // 使用 resume 恢复 ready 的协程运行权
    while (!TAILQ_EMPTY(sched->ready)) 
    {
        nty_coroutine *ready = TAILQ_POP(sched->ready);
        resume(ready);
    }
}

运行的过程图如下

协程多核问题

主要是用多进程或多线程实现,每个进程或线程亲和一个CPU。


多进程实现

每个进程内部都是单线程,每个进程一个调度器,好处是实现简单,对于协程代码不需要做过多修改。


多线程实现

所有线程共用一个调度器,需要对调度器加锁,锁定义在调度器中(红黑树用mutex,队列用spinlock,遍历用mutex),调度时加锁(如查找sleep红黑树中节点,要对sleep红黑树加锁),需要对协程本身的代码进行修改,实现较为复杂。


当然,如果加锁,要考虑死锁的问题。


其实,这里不太需要注意。


因为如果不掺杂业务,线程共用的就一个东西——调度器,不太可能会产生死锁。


X86指令实现

这里不单独讲解。


协程的接口

协程需要封装若干接口,一类是协程本身的,二类是 posix 的异步封装协程 API。


协程自身

1、协程创建

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg);

2、启动协程调度器

void nty_schedule_loop(void);

3、协程休眠

void nty_sleep();
posix接口

这里要注意,有些接口要做成异步的,有些接口不需要。



需要做成异步的接口有accept()、connect()、send()、write()、sendto()等。


这些接口需要判断IO是否就绪,不成功就等待。



不需要做成异步的接口有socket()、close()、listen()等。


这些接口不会引起不正常的关系,但是可以做成非阻塞的。


封装策略

有两种封装方法。


一是自己另外定义一套接口,如__read()。


这种方法有个弊端,对于mysql等这样的组件,它们内部源码调用了原来系统的接口(如read()),这样就需要修改mysql这些组件的源码。



二是做成与系统一样的接口,使用hook。


connect_f = dlsym(RLTD_NEXT, "connect");

这里有两个宏定义,dlsym()针对系统调用,dlopen()针对第三方库。



使用hook之后,虽然函数中仍然还是使用的原来的接口(connect),但是实际调用的是自己在应用层定义的函数入口。


这个过程就相当于在调用系统接口时被截获了,调用了自己实现的函数,原来的函数变成了返回值。


这样达到的效果,在使用协程连接mysql时,可以不用改动mysql的源码就实现这些功能。



另外,malloc()、free()也可以这么用,可以用来解决内存泄漏的问题。


在malloc()、free()时加一个打印 *** 作,就可以发现什么地方有malloc没有free。


jemalloc和tcmalloc就可以这样,不需要改系统代码就可以用。



写了不少,最后提示一下协程的几个关键问题。


协程到底是解决什么问题的?同步改成异步怎么做?同步、异步的性能差异?yield、resume、调度、切换?协程API接口?hook?掌握了这些,协程的大概原理就清楚了。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/565270.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-06
下一篇 2022-04-06

发表评论

登录后才能评论

评论列表(0条)

保存