Linux 中断子系统(四)-Workqueue

Linux 中断子系统(四)-Workqueue,第1张

Linux 中断子系统(四)-Workqueue 1、数据结构


        1)Workqueue 工作队列是利用内核线程来异步执行工作任务的通用机制;
        2)Workqueue 工作队列可以用作中断处理的Bottom-half机制,利用进程上下文来执行中断处理中耗时的任务,因此它允许睡眠,而Softirg和Tasklet在处理任务时不能睡眠;
        3)在中断处理过程中,或者其他子系统中,调用workqueue的调度或入队接口后,通过建立好的链接关系图逐级找到合适的worker,最终完成工作任务的执行
        4)关键的数据结构:
                work_struct:工作队列调度的最小单位,work item;
                workqueue_struct:工作队列,work item都挂入到工作队列中;
                worker:work item的处理者,每个worker对应一个内核线程;
                worker_pool:worker池(内核线程池),是一个共享资源池,提供不同的worker来对work item进行处理;
                pool_workqueue:充当桥梁纽带的作用,用于连接workqueue和worker_pool,建立链接关系


2、workqueue子系统初始化


workqueue 子系统的初始化分成两步来完成的:workqueue_init_early和workqueue_init


2.1.1 workqueue_init_early()
workqueue_init_early()                              //     kernelWorkqueue.c

         ----→KMEM_CACHE(pool_workqueue,SLAB_PANIC) //  为pool_workqueue 分配 slab 缓存

        ----→init_worker_pool(pool)                 //     为每个cpu 创建 worker_pool

        ----→worker_pool_assign_id(pool)            //      为worker_pool 分配id

        ----→alloc_workqueue_attrs()                //     为 unbound 工作队列创建默认属性

        ----→system_wq =alloc_workqueue();         //     kernel 创建默认的消息队列                     system_highpri_wq=alloc_workqueue();
             system_long_wq=alloc_workqueue();
             system_unbound_wq=alloc_workqueue(();
             system_freezable_wq =alloc_workqueue();


alloc_workqueue()                                                    //    includelinuxWorkqueue.h

alloc_workqueue()                               //    includelinuxWorkqueue.h

        ----→kzalloc(sizeof(*wq)+tbl_size,GFP_KERNEL)  //    分配内存

        ----→wq->flags=flags;                          //    初始化wq

                wq->saved_max_active = max_active; 

        ----→alloc_and_link_pwqs(wq)               //    分配 pool_workqueue 并建立连接

        ----→!(wq->flags & WQ_UNBOUND)

                ---Y--→alloc_percpu(struct pool_workqueue)   //    为每个CPU分配 pool_workqueue

                ------→init_pwq(pwq,wq,&cpu_pools[highpri])   //    初始化pool_workqueue
                ------→link_pwq(pwq)                   //    链接 pool_workqueue 和wq
                --N--→apply_workqueue_attrs(wq,ordered_wq_attrs(highpri])
                        //    将新的 workqueue_attrs 应用于未绑定的工作队列 
                    -------→apply_workqueue_attrs_locked(wq,attrs)
                        -------→apply_wqattrs_prepare(wq,attrs)
                            //    分配attrs和pwq(pool_workqueue)
                            -------→alloc_unbound_pwq(wq,tmp_attrs)
                                //    获取匹配@attr 的池并创建一个将池和@wq 相关联的pwq
                                -------→get_unbound_pool(attrs)
                                -------→init_pwq(pwq,wq,pool)
                        -------→apply_wqattrs_commit(ctx)
                            //    设置wq熟悉,并将wq和pwq建立连接
                            -------→numa_pwq_tbl_install(ctx->wq,node,ctx->pwq_tbl[node])
                                -------→link_pwq(pwq)
        ----→init_rescuer(wq)
                //    wq 内存紧张时的处理逻辑
        ----→list_add_tail_rcu(&wq->list,&workqueues);
                //    将创建的wq添加到全局链表中

alloc_workqueue 完成的主要工作包括:
        1)首先当然是要分配一个 struct workqueue_struct的数据结构,并且对该结构中的字段进行初始化 *** 作;
        2)workqueue 最终需要和worker_pool 关联起来,而这个纽带就是 pool_workqueue,alloc_and_link_pwqs函数就是完成这个功能:

                A)如果工作队列是绑定到CPU上的,则为每个CPU都分配pool_workqueue 并且初始化,通过link_pwq将工作队列与pool_workqueue 建立连接;

                B)如果工作队列不绑定到CPU上,则按内存节点(NUMA)来分配pool_workqueue,用 get_unbound_pool来实现,它会根据wq属性先去查找,如果没有找到相同的就创建一个新的pool_workqueue,并且添加到 unbound_pool_hash 哈希表中,最后也会调用link_pwq来建立连接;
                C)创建工作队列时,如果设置了WQ_MEM_RECLAIM标志,则会新建 rescuer worker,对应 rescuer_thread内核线程。当内存紧张时,新创建worker可能会失败,这时候由rescuer来处理这种情况;
                D)最终将新建好的工作队列添加到全局链表workqueues中;


2.1.2 workqueue_init()

workqueue_init()                    //  让wq子系统完全在线-kernel\Workqueue.c
    ---→wq_numa_init()             //   对于numa系统,初始化 cpumask和wq属性
    ---→list_for_each_entry(wq,&workqueues,list)
        wq_update_unbound_numa(wq,smp_processor_id(),true)    //    更新 numa affinity
    ---→for_each_online_cpu(cpu)
            for_each_cpu_worker_pool(pool,cpu)
                create_worker(pool)                  //    为worker_pool 创建初始 worker
    ---→hash_for_each(unbound_pool_hash,bkt,pool,hash_node)
        create_worker(pool)        //    create a new workqueue worker
            ----→alloc_worker(pool->node)        //    分配 worker 结构体
            ----→kthread_create_on_node(worker_thread,...)//    为worker 创建内核线程
            ----→worker_attach_to_pool(worker,pool)
                //   将 worker 添加到 worker_pool 中
            ----→worker_enter_idle(worker)
                //   worker 进入IDLE 状态
    ---→wq_watchdog_init()
                //   初始化工作队列监视程序

        create worker函数中,创建的内核线程名字为 kworker/xxcYY 或者kworker/uxx:YY,其中xx表示worker_pool的编号,YY表示worker的编号,u表示 unbound;workqueue子系统初始化完成后,基本就已经将数据结构的关联建立好了,当有work来进行调度的时候,就可以进行处理了


3、work 调度
3.1 schedule_work
schedule_work                        //  将work task提交到全局 workqueue
    ----→queue_work(system_wq,work)
        ----→queue_work_on
                 //  queue work on specific cpu-kernelWorkqueue.c
            ----→!test_and_set_bit(WORK_STRUCT_PENDING_BIT,work_data_bits(work))
                //  判断work是否添加到队列中
                 _queue_work(cpu,wq,work)

            ----→wq->flags & WQ_UNBOUND         //  根据工作队列类型来获取pwq
                 unbound_pwq_by_node(wq,cpu_to_node(cpu))
                 per_cpu_ptr(wq->cpu_pwqs,cpu)

            ----→get_work_pool(work)        //  返回work 对应的work_pool

            ----→last_pool && last_pool !=pwq->pool //  判断上次的pwq和这次pwq是否为同一个
            find_worker_executing_work(last_pool,work)//  寻找 worker 在执行的work

            ----→insert_work(pwq,work,worklist,work_flags) 
                        //  将work 插入到worklist 中
                ----→set_work_pwq(work, pwq,extra_flags)
                        //  设置work的data字段
                ----→wake_up_worker(pool)
                        //  唤醒 worker 内核线程


        A)schedule_work 完成的工作是将work添加到对应的链表中,而在添加的过程中,首先是需要确定 pool_workqueue;
        B) pool_workqueue 对应一个worker_pool,因此确定了 pool_workqueue 也就确定了worker_pool,进而可以将work添加到工作链表中;
        C)pool_workqueue的确定分为三种情况:

                1)bound类型的工作队列,直接根据CPU号获取;

                2)unbound类型的工作队列,根据node号获取,针对unbound类型工作队列,pool_workqueue的释放是异步执行的,需要判断refcnt的计数值,因此在获取 pool_workqueue 时可能要多次retry;

                3)根据缓存热度,优先选择正在被执行的worker_pool;


3.2 worker thread
worker_thread(void*_worker)                //  worker thread function-kernelWorkqueue.c
    ---→set_pf_worker(true)               //  告诉调度器这是 workqueue worker
    ---→woke_up:
            unlikely(worker->flags &WORKER_DIE)         //  判断work 是否消亡
                --Y--→ida_simple_remove(&pool->worker_ida,worker->id);
                    worker_detach_from_pool(worker);    //  若消亡则进行清理
                --N--→worker_leave_idle(worker)        //  work 离开IDLE状态

    ---→recheck:
            list_first_entry(&pool->worklist,struct work_struct,entry)
                process_one_work(worker,work)    //  遍历 worklist,执行work的回调函数
    ---→sleep:
            set_current_state(TASK_IDLE)             //  设置内核状态
            schedule()                               //  让出CPU
----→
----

在创建worker时,创建内核线程,执行函数为worker_thread:
        woker_thread都PF WQWORKER,调度器在进行调度处理时会对task进行判断,针对workerqueue worker有特殊处理;
        worker 对应的内核线程,在没有处理work的时候是睡眠状态,当被唤醒的时候,跳转到woke_up开始执行;
        woke_up 之后,如果此时 worker是需要销毁的,那就进行清理工作并返回。否则,离开IDLE状态,并进入recheck模块执行;
        recheck部分,首先判断是否需要更多的worker来处理,如果没有任务处理,跳转到sleep 地方进行睡眠。有任务需要处理时,会判断是否有空闲内核线程以及是否需要动态创建,再清除掉worker的标志位,然后遍历工作链表,对链表中的每个节点调用 process_one_worker来处理;
        sleep 部分比较好理解,没有任务处理时,worker进入空闲状态,并将当前的内核线程设置成睡眠状态,让出CPU;


work的执行函数为 process_one_worker:

process_one_work(worker,work)
    ---→find_worker_executing_work(pool,work)       //  找到正在执行work的worker
    ---→move_linked_works(work,&collision->scheduled,NULL)
                //  将正在执行的work移动到scheduled 链表中
    ---→hash_add(pool->busy_hash,&worker->hentry,(unsigned long)work);
            worker->current_work =work;            //  设置 worker 字段
    ---→if(need_more_worker(pool)) wake_up_worker(pool);
                                                   //  若需要更多worker,则唤醒idle work
    ---→set_work_pool_and_clear_pending(work,pool->id)     //  清除 pending 状态
    ---→worker->current_func(work);            //  执行work的回调函数
    ---→hash_del(&worker->hentry);            //  执行完后的清理工作
        worker->current_work =NULL;
3.3 worker 动态管理

3.3.1 worker 状态机变化

        1)worker_pool 通过 nr_running 字段来在不同的状态机之间进行切换;
        2)worker_pool 中有work需要处理时,需要至少保证有一个运行状态的worker,当nr_running大于1时,将多余的worker进入IDLE状态,没有work需要处理时,所有的worker都会进入IDLE状态;
        3)执行work时,如果回调函数阻塞运行,那么会让 worker 进入睡眠状态,此时调度器会进行判断是否需要唤醒另一个worker;
        4)IDLE状态的worker 都存放在idle_list链表中,如果空闲时间超过了300秒,则会将其进行销毁;


1、Worker Running---→Suspend
_schedule()
        //    当worker进入睡眠状态时,如果该worker_pool 没有其他的worker处于运行状态,那么是需要唤醒一个空闲的worker来维持并发处理的能力;


2、Worker Suspend---→Running

wake_up_worker                               //  唤醒idle worker-\kernelWorkqueue.c
    ---→wake_up_process(worker->task)
        ---→try_to_wake_up(p,TASK_NORMAL,0)  //  唤醒一个线程-Kernel\sched\Core.c
            ---→ttwu_queue(p,cpu, wake_flags)
                ---→ttwu_do_activate(rq, p, wake_flags,&rf)

        睡眠状态可以通过 wake_up_worker 来进行唤醒处理,最终判断如果该worker 不在运行状态,则增加worker_pool的nr_running 值


3.3.2 worker的动态添加和删除


1、动态删除

init_worker_pool            //  初始化一个worker_pool     ————kernel\Workqueue.c
    ---→timer_setup(&pool->idle_timer,idle_worker_timeout,TIMER_DEFERRABLE)
        ---→while (too_many_workers(pool))
                list_entry(pool->idle_list.prev,struct worker, entry)
                destroy_worker(worker)
                 //  遍历 idle 空闲链表,若worker 空闲时间超过IDLE_WORKER_TIMEOUT 则进行销毁

        worker_pool 初始化时,注册了 timer回调函数,用于定时对空闲链表上的worker进行处理,如果worker太多,且空闲时间太长,超过了5分钟,那么就直接进行销毁处理了。


2、动态添加

Worker_thread
    ---→manage_workers(worker)                //  管理 worker pool
    ---→maybe_create_worker(pool)            //  创建更多的worker

        内核线程执行worker_thread函数时,如果没有空闲的worker,会调用manage_workers接口来创建更多的worker来处理工作


https://www.cnblogs.com/wahaha02/p/6341095.html


APIC介绍:
        APIC相较于PIC来说,最大的优点是能适用于MP平台,当然,管脚多是它另一个优点。APIC由两部分组成,一个称为LAPIC(Local APIC,本地高级中断控制器),一个称IOAPIC (I/O APCI,I/O高级中断控制器)。前者位于CPU中,在MP平台,每个CPU都有一个自己的LAPIC。后者通常位于南桥上,像PIC一样,连接各个产生中断的设备。在一个典型的具有多个处理器的PC平台,通常有一个IOAPIC和多个LAPIC,它们相互配合,形成一个中断的分发网络。


        图中的中断控制器通信总线,是IOAPIC和LAPIC通信的桥梁,在Intel的P6架构和Pentium系列CPU中,它是一条单独的APIC总线。时代在进步,Pentium4和Xeon系列CPU出现后APIC Bus已经不存在,系统的前端总线代替了它。


        话分两头,让我们先来看看IOAPIC。和PIC对比,IOAPIC最大的作用在于中断分发。根据其内部的PRT(Programmable Redirection Table)表,IOAPIC可以格式化出一条中断消息,发送给某个CPU的LAPIC,由LAPIC通知CPU进行处理。目前典型的IOAPIC具有24个中断管脚,每个管脚对应一个RTE(Redirection Table Entry,PRT表项)。与PIC不同的是,IOAPIC的管脚没有优先级,也就是说,连接在管脚上的设备是平等的。


PCIE之 MSI/MSI-X中断:
        https://blog.csdn.net/yhb1047818384/article/details/106676560
        https://www.cnblogs.com/dream397/p/13614666.html

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5694332.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存