1)Workqueue 工作队列是利用内核线程来异步执行工作任务的通用机制;
2)Workqueue 工作队列可以用作中断处理的Bottom-half机制,利用进程上下文来执行中断处理中耗时的任务,因此它允许睡眠,而Softirg和Tasklet在处理任务时不能睡眠;
3)在中断处理过程中,或者其他子系统中,调用workqueue的调度或入队接口后,通过建立好的链接关系图逐级找到合适的worker,最终完成工作任务的执行
4)关键的数据结构:
work_struct:工作队列调度的最小单位,work item;
workqueue_struct:工作队列,work item都挂入到工作队列中;
worker:work item的处理者,每个worker对应一个内核线程;
worker_pool:worker池(内核线程池),是一个共享资源池,提供不同的worker来对work item进行处理;
pool_workqueue:充当桥梁纽带的作用,用于连接workqueue和worker_pool,建立链接关系
2、workqueue子系统初始化
workqueue 子系统的初始化分成两步来完成的:workqueue_init_early和workqueue_init
2.1.1 workqueue_init_early()
workqueue_init_early() // kernelWorkqueue.c ----→KMEM_CACHE(pool_workqueue,SLAB_PANIC) // 为pool_workqueue 分配 slab 缓存 ----→init_worker_pool(pool) // 为每个cpu 创建 worker_pool ----→worker_pool_assign_id(pool) // 为worker_pool 分配id ----→alloc_workqueue_attrs() // 为 unbound 工作队列创建默认属性 ----→system_wq =alloc_workqueue(); // kernel 创建默认的消息队列 system_highpri_wq=alloc_workqueue(); system_long_wq=alloc_workqueue(); system_unbound_wq=alloc_workqueue((); system_freezable_wq =alloc_workqueue();
alloc_workqueue() // includelinuxWorkqueue.h
alloc_workqueue() // includelinuxWorkqueue.h ----→kzalloc(sizeof(*wq)+tbl_size,GFP_KERNEL) // 分配内存 ----→wq->flags=flags; // 初始化wq wq->saved_max_active = max_active; ----→alloc_and_link_pwqs(wq) // 分配 pool_workqueue 并建立连接 ----→!(wq->flags & WQ_UNBOUND) ---Y--→alloc_percpu(struct pool_workqueue) // 为每个CPU分配 pool_workqueue ------→init_pwq(pwq,wq,&cpu_pools[highpri]) // 初始化pool_workqueue ------→link_pwq(pwq) // 链接 pool_workqueue 和wq --N--→apply_workqueue_attrs(wq,ordered_wq_attrs(highpri]) // 将新的 workqueue_attrs 应用于未绑定的工作队列 -------→apply_workqueue_attrs_locked(wq,attrs) -------→apply_wqattrs_prepare(wq,attrs) // 分配attrs和pwq(pool_workqueue) -------→alloc_unbound_pwq(wq,tmp_attrs) // 获取匹配@attr 的池并创建一个将池和@wq 相关联的pwq -------→get_unbound_pool(attrs) -------→init_pwq(pwq,wq,pool) -------→apply_wqattrs_commit(ctx) // 设置wq熟悉,并将wq和pwq建立连接 -------→numa_pwq_tbl_install(ctx->wq,node,ctx->pwq_tbl[node]) -------→link_pwq(pwq) ----→init_rescuer(wq) // wq 内存紧张时的处理逻辑 ----→list_add_tail_rcu(&wq->list,&workqueues); // 将创建的wq添加到全局链表中
alloc_workqueue 完成的主要工作包括:
1)首先当然是要分配一个 struct workqueue_struct的数据结构,并且对该结构中的字段进行初始化 *** 作;
2)workqueue 最终需要和worker_pool 关联起来,而这个纽带就是 pool_workqueue,alloc_and_link_pwqs函数就是完成这个功能:
A)如果工作队列是绑定到CPU上的,则为每个CPU都分配pool_workqueue 并且初始化,通过link_pwq将工作队列与pool_workqueue 建立连接;
B)如果工作队列不绑定到CPU上,则按内存节点(NUMA)来分配pool_workqueue,用 get_unbound_pool来实现,它会根据wq属性先去查找,如果没有找到相同的就创建一个新的pool_workqueue,并且添加到 unbound_pool_hash 哈希表中,最后也会调用link_pwq来建立连接;
C)创建工作队列时,如果设置了WQ_MEM_RECLAIM标志,则会新建 rescuer worker,对应 rescuer_thread内核线程。当内存紧张时,新创建worker可能会失败,这时候由rescuer来处理这种情况;
D)最终将新建好的工作队列添加到全局链表workqueues中;
2.1.2 workqueue_init()
workqueue_init() // 让wq子系统完全在线-kernel\Workqueue.c ---→wq_numa_init() // 对于numa系统,初始化 cpumask和wq属性 ---→list_for_each_entry(wq,&workqueues,list) wq_update_unbound_numa(wq,smp_processor_id(),true) // 更新 numa affinity ---→for_each_online_cpu(cpu) for_each_cpu_worker_pool(pool,cpu) create_worker(pool) // 为worker_pool 创建初始 worker ---→hash_for_each(unbound_pool_hash,bkt,pool,hash_node) create_worker(pool) // create a new workqueue worker ----→alloc_worker(pool->node) // 分配 worker 结构体 ----→kthread_create_on_node(worker_thread,...)// 为worker 创建内核线程 ----→worker_attach_to_pool(worker,pool) // 将 worker 添加到 worker_pool 中 ----→worker_enter_idle(worker) // worker 进入IDLE 状态 ---→wq_watchdog_init() // 初始化工作队列监视程序
create worker函数中,创建的内核线程名字为 kworker/xxcYY 或者kworker/uxx:YY,其中xx表示worker_pool的编号,YY表示worker的编号,u表示 unbound;workqueue子系统初始化完成后,基本就已经将数据结构的关联建立好了,当有work来进行调度的时候,就可以进行处理了
3、work 调度
3.1 schedule_work
schedule_work // 将work task提交到全局 workqueue ----→queue_work(system_wq,work) ----→queue_work_on // queue work on specific cpu-kernelWorkqueue.c ----→!test_and_set_bit(WORK_STRUCT_PENDING_BIT,work_data_bits(work)) // 判断work是否添加到队列中 _queue_work(cpu,wq,work) ----→wq->flags & WQ_UNBOUND // 根据工作队列类型来获取pwq unbound_pwq_by_node(wq,cpu_to_node(cpu)) per_cpu_ptr(wq->cpu_pwqs,cpu) ----→get_work_pool(work) // 返回work 对应的work_pool ----→last_pool && last_pool !=pwq->pool // 判断上次的pwq和这次pwq是否为同一个 find_worker_executing_work(last_pool,work)// 寻找 worker 在执行的work ----→insert_work(pwq,work,worklist,work_flags) // 将work 插入到worklist 中 ----→set_work_pwq(work, pwq,extra_flags) // 设置work的data字段 ----→wake_up_worker(pool) // 唤醒 worker 内核线程
A)schedule_work 完成的工作是将work添加到对应的链表中,而在添加的过程中,首先是需要确定 pool_workqueue;
B) pool_workqueue 对应一个worker_pool,因此确定了 pool_workqueue 也就确定了worker_pool,进而可以将work添加到工作链表中;
C)pool_workqueue的确定分为三种情况:
1)bound类型的工作队列,直接根据CPU号获取;
2)unbound类型的工作队列,根据node号获取,针对unbound类型工作队列,pool_workqueue的释放是异步执行的,需要判断refcnt的计数值,因此在获取 pool_workqueue 时可能要多次retry;
3)根据缓存热度,优先选择正在被执行的worker_pool;
3.2 worker thread
worker_thread(void*_worker) // worker thread function-kernelWorkqueue.c ---→set_pf_worker(true) // 告诉调度器这是 workqueue worker ---→woke_up: unlikely(worker->flags &WORKER_DIE) // 判断work 是否消亡 --Y--→ida_simple_remove(&pool->worker_ida,worker->id); worker_detach_from_pool(worker); // 若消亡则进行清理 --N--→worker_leave_idle(worker) // work 离开IDLE状态 ---→recheck: list_first_entry(&pool->worklist,struct work_struct,entry) process_one_work(worker,work) // 遍历 worklist,执行work的回调函数 ---→sleep: set_current_state(TASK_IDLE) // 设置内核状态 schedule() // 让出CPU ----→ ----
在创建worker时,创建内核线程,执行函数为worker_thread:
woker_thread都PF WQWORKER,调度器在进行调度处理时会对task进行判断,针对workerqueue worker有特殊处理;
worker 对应的内核线程,在没有处理work的时候是睡眠状态,当被唤醒的时候,跳转到woke_up开始执行;
woke_up 之后,如果此时 worker是需要销毁的,那就进行清理工作并返回。否则,离开IDLE状态,并进入recheck模块执行;
recheck部分,首先判断是否需要更多的worker来处理,如果没有任务处理,跳转到sleep 地方进行睡眠。有任务需要处理时,会判断是否有空闲内核线程以及是否需要动态创建,再清除掉worker的标志位,然后遍历工作链表,对链表中的每个节点调用 process_one_worker来处理;
sleep 部分比较好理解,没有任务处理时,worker进入空闲状态,并将当前的内核线程设置成睡眠状态,让出CPU;
work的执行函数为 process_one_worker:
process_one_work(worker,work) ---→find_worker_executing_work(pool,work) // 找到正在执行work的worker ---→move_linked_works(work,&collision->scheduled,NULL) // 将正在执行的work移动到scheduled 链表中 ---→hash_add(pool->busy_hash,&worker->hentry,(unsigned long)work); worker->current_work =work; // 设置 worker 字段 ---→if(need_more_worker(pool)) wake_up_worker(pool); // 若需要更多worker,则唤醒idle work ---→set_work_pool_and_clear_pending(work,pool->id) // 清除 pending 状态 ---→worker->current_func(work); // 执行work的回调函数 ---→hash_del(&worker->hentry); // 执行完后的清理工作 worker->current_work =NULL;3.3 worker 动态管理
3.3.1 worker 状态机变化
1)worker_pool 通过 nr_running 字段来在不同的状态机之间进行切换;
2)worker_pool 中有work需要处理时,需要至少保证有一个运行状态的worker,当nr_running大于1时,将多余的worker进入IDLE状态,没有work需要处理时,所有的worker都会进入IDLE状态;
3)执行work时,如果回调函数阻塞运行,那么会让 worker 进入睡眠状态,此时调度器会进行判断是否需要唤醒另一个worker;
4)IDLE状态的worker 都存放在idle_list链表中,如果空闲时间超过了300秒,则会将其进行销毁;
1、Worker Running---→Suspend
_schedule()
// 当worker进入睡眠状态时,如果该worker_pool 没有其他的worker处于运行状态,那么是需要唤醒一个空闲的worker来维持并发处理的能力;
2、Worker Suspend---→Running
wake_up_worker // 唤醒idle worker-\kernelWorkqueue.c ---→wake_up_process(worker->task) ---→try_to_wake_up(p,TASK_NORMAL,0) // 唤醒一个线程-Kernel\sched\Core.c ---→ttwu_queue(p,cpu, wake_flags) ---→ttwu_do_activate(rq, p, wake_flags,&rf)
睡眠状态可以通过 wake_up_worker 来进行唤醒处理,最终判断如果该worker 不在运行状态,则增加worker_pool的nr_running 值
3.3.2 worker的动态添加和删除
1、动态删除
init_worker_pool // 初始化一个worker_pool ————kernel\Workqueue.c ---→timer_setup(&pool->idle_timer,idle_worker_timeout,TIMER_DEFERRABLE) ---→while (too_many_workers(pool)) list_entry(pool->idle_list.prev,struct worker, entry) destroy_worker(worker) // 遍历 idle 空闲链表,若worker 空闲时间超过IDLE_WORKER_TIMEOUT 则进行销毁
worker_pool 初始化时,注册了 timer回调函数,用于定时对空闲链表上的worker进行处理,如果worker太多,且空闲时间太长,超过了5分钟,那么就直接进行销毁处理了。
2、动态添加
Worker_thread ---→manage_workers(worker) // 管理 worker pool ---→maybe_create_worker(pool) // 创建更多的worker
内核线程执行worker_thread函数时,如果没有空闲的worker,会调用manage_workers接口来创建更多的worker来处理工作
https://www.cnblogs.com/wahaha02/p/6341095.html
APIC介绍:
APIC相较于PIC来说,最大的优点是能适用于MP平台,当然,管脚多是它另一个优点。APIC由两部分组成,一个称为LAPIC(Local APIC,本地高级中断控制器),一个称IOAPIC (I/O APCI,I/O高级中断控制器)。前者位于CPU中,在MP平台,每个CPU都有一个自己的LAPIC。后者通常位于南桥上,像PIC一样,连接各个产生中断的设备。在一个典型的具有多个处理器的PC平台,通常有一个IOAPIC和多个LAPIC,它们相互配合,形成一个中断的分发网络。
图中的中断控制器通信总线,是IOAPIC和LAPIC通信的桥梁,在Intel的P6架构和Pentium系列CPU中,它是一条单独的APIC总线。时代在进步,Pentium4和Xeon系列CPU出现后APIC Bus已经不存在,系统的前端总线代替了它。
话分两头,让我们先来看看IOAPIC。和PIC对比,IOAPIC最大的作用在于中断分发。根据其内部的PRT(Programmable Redirection Table)表,IOAPIC可以格式化出一条中断消息,发送给某个CPU的LAPIC,由LAPIC通知CPU进行处理。目前典型的IOAPIC具有24个中断管脚,每个管脚对应一个RTE(Redirection Table Entry,PRT表项)。与PIC不同的是,IOAPIC的管脚没有优先级,也就是说,连接在管脚上的设备是平等的。
PCIE之 MSI/MSI-X中断:
https://blog.csdn.net/yhb1047818384/article/details/106676560
https://www.cnblogs.com/dream397/p/13614666.html
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)