天重点对linux网络数据包的处理做下分析,但是并不关系到上层协议,仅仅到链路层。之前转载过一篇文章,对NAPI做了比较详尽的分析,本文结合Linux内核源代码,对当前网络数据包的处理进行梳理。根据NAPI的处理特性,对
设备提出一定的要求1、设备需要有足够的缓冲区,保存多个数据分组2、可以禁用当前设备
中断,然而不影响其他的 *** 作。当前大部分的设备都支持NAPI,但是为了对之前的保持兼容,内核还是对之前中断方式提供了兼容。我们先看下NAPI具体的处理方式。我们都知道中断分为中断上半部和下半部,上半部完成的任务很是简单,仅仅负责把数据保存下来;而下半部负责具体的处理。为了处理下半部,每个CPU有维护一个softnet_data结构。我们不对此结构做详细介绍,仅仅描述和NAPI相关的部分。结构中有一个poll_list字段,连接所有的轮询设备。还 维护了两个队列input_pkt_queue和process_queue。这两个用户传统不支持NAPI方式的处理。前者由中断上半部的处理
函数吧数据包入队,在具体的处理时,使用后者做中转,相当于前者负责接收,后者负责处理。最后是一个napi_struct的backlog,代表一个虚拟设备供轮询使用。在支持NAPI的设备下,每个设备具备一个缓冲队列,存放到来数据。每个设备对应一个napi_struct结构,该结构代表该设备存放在poll_list中被轮询。而设备还需要提供一个poll函数,在设备被轮询到后,会调用poll函数对数据进行处理。基本逻辑就是这样,下面看下具体流程。中断上半部:非NAPI:非NAPI对应的上半部函数为netif_rx,位于Dev.,c中int netif_rx(struct sk_buff *skb){int ret/* if netpoll wants it, pretend we never saw it *//*如果是net_poll想要的,则不作处理*/if (netpoll_rx(skb))return NET_RX_DROP/*检查时间戳*/net_timestamp_check(netdev_tstamp_prequeue, skb)trace_netif_rx(skb)#ifdef CONFIG_RPSif (static_key_false(&rps_needed)) {struct rps_dev_flow voidflow, *rflow = &voidflowint cpu/*禁用抢占*/preempt_disable()rcu_read_lock()cpu = get_rps_cpu(skb->dev, skb, &rflow)if (cpu <0)cpu = smp_processor_id()/*把数据入队*/ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail)rcu_read_unlock()preempt_enable()} else#endif{
unsigned int qtailret = enqueue_to_backlog(skb, get_cpu(), &qtail)put_cpu()}return ret}中间RPS暂时不关心,这里直接调用enqueue_to_backlog放入CPU的全局队列input_pkt_queuestatic int enqueue_to_backlog(struct sk_buff *skb, int cpu,unsigned int *qtail){struct softnet_data *sdunsigned long flags/*获取cpu相关的softnet_data变量*/sd = &per_cpu(softnet_data, cpu)/*关中断*/local_irq_save(flags)rps_lock(sd)/*如果input_pkt_queue的长度小于最大限制,则符合条件*/if (skb_queue_len(&sd->input_pkt_queue) <= netdev_max_backlog) {/*如果input_pkt_queue不为空,说明虚拟设备已经得到调度,此时仅仅把数据加入input_pkt_queue队列即可*/if (skb_queue_len(&sd->input_pkt_queue)) {enqueue:__skb_queue_tail(&sd->input_pkt_queue, skb)input_queue_tail_incr_save(sd, qtail)rps_unlock(sd)local_irq_restore(flags)return NET_RX_SUCCESS}/* Schedule NAPI for backlog device* We can use non atomic operation since we own the queue lock*//*否则需要调度backlog 即虚拟设备,然后再入队。napi_struct结构中的state字段如果标记了NAPI_STATE_SCHED,则表明该设备已经在调度,不需要再次调度*/if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {if (!rps_ipi_queued(sd))____napi_schedule(sd, &sd->backlog)}goto enqueue}/*到这里缓冲区已经不足了,必须丢弃*/sd->dropped++rps_unlock(sd)local_irq_restore(flags)atomic_long_inc(&skb->dev->rx_dropped)kfree_skb(skb)return NET_RX_DROP}该函数逻辑也比较简单,主要注意的是设备必须先添加调度然后才能接受数据,添加调度调用了____napi_schedule函数,该函数把设备对应的napi_struct结构插入到softnet_data的poll_list链表尾部,然后唤醒软中断,这样在下次软中断得到处理时,中断下半部就会得到处理。不妨看下源码static inline void ____napi_schedule(struct softnet_data *sd,struct napi_struct *napi){list_add_tail(&napi->poll_list, &sd->poll_list)__raise_softirq_irqoff(NET_RX_SOFTIRQ)}NAPI方式NAPI的方式相对于非NAPI要简单许多,看下e100网卡的中断处理函数e100_intr,核心部分if (likely(napi_schedule_prep(&nic->napi))) {e100_disable_irq(nic)//屏蔽当前中断__napi_schedule(&nic->napi)//把设备加入到轮训队列}if条件检查当前设备是否 可被调度,主要检查两个方面:1、是否已经在调度 2、是否禁止了napi pending.如果符合条件,就关闭当前设备的中断,调用__napi_schedule函数把设备假如到轮训列表,从而开启轮询模式。分析:结合上面两种方式,还是可以发现两种方式的异同。其中softnet_data作为主导结构,在NAPI的处理方式下,主要维护轮询链表。NAPI设备均对应一个napi_struct结构,添加到链表中;非NAPI没有对应的napi_struct结构,为了使用NAPI的处理流程,使用了softnet_data结构中的back_log作为一个虚拟设备添加到轮询链表。同时由于非NAPI设备没有各自的接收队列,所以利用了softnet_data结构的input_pkt_queue作为全局的接收队列。这样就处理而言,可以和NAPI的设备进行兼容。但是还有一个重要区别,在NAPI的方式下,首次数据包的接收使用中断的方式,而后续的数据包就会使用轮询处理了;而非NAPI每次都是通过中断通知。下半部:下半部的处理函数,之前提到,网络数据包的接发对应两个不同的软中断,接收软中断NET_RX_SOFTIRQ的处理函数对应net_rx_actionstatic void net_rx_action(struct softirq_action *h){struct softnet_data *sd = &__get_cpu_var(softnet_data)unsigned long time_limit = jiffies + 2int budget = netdev_budgetvoid *havelocal_irq_disable()/*遍历轮询表*/while (!list_empty(&sd->poll_list)) {struct napi_struct *nint work, weight/* If softirq window is exhuasted then punt.* Allow this to run for 2 jiffies since which will allow* an average latency of 1.5/HZ.*//*如果开支用完了或者时间用完了*/if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))goto softnet_breaklocal_irq_enable()/* Even though interrupts have been re-enabled, this* access is safe because interrupts can only add new* entries to the tail of this list, and only ->poll()* calls can remove this head entry from the list.*//*获取链表中首个设备*/n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list)have = netpoll_poll_lock(n)weight = n->weight/* This NAPI_STATE_SCHED test is for avoiding a race* with netpoll's poll_napi(). Only the entity which* obtains the lock and sees NAPI_STATE_SCHED set will* actually make the ->poll() call. Therefore we avoid* accidentally calling ->poll() when NAPI is not scheduled.*/work = 0/*如果被设备已经被调度,则调用其处理函数poll函数*/if (test_bit(NAPI_STATE_SCHED, &n->state)) {work = n->poll(n, weight)//后面weight指定了一个额度trace_napi_poll(n)}WARN_ON_ONCE(work >weight)/*总额度递减*/budget -= worklocal_irq_disable()/* Drivers must not modify the NAPI state if they* consume the entire weight. In such cases this code* still "owns" the NAPI instance and therefore can* move the instance around on the list at-will.*//*如果work=weight的话。任务就完成了,把设备从轮询链表删除*/if (unlikely(work == weight)) {if (unlikely(napi_disable_pending(n))) {local_irq_enable()napi_complete(n)local_irq_disable()} else {if (n->gro_list) {/* flush too old packets* If HZ <1000, flush all packets.*/local_irq_enable()napi_gro_flush(n, HZ >= 1000)local_irq_disable()}/*每次处理完就把设备移动到列表尾部*/list_move_tail(&n->poll_list, &sd->poll_list)}}netpoll_poll_unlock(have)}out:net_rps_action_and_irq_enable(sd)#ifdef CONFIG_NET_DMA/** There may not be any more sk_buffs coming right now, so push* any pending DMA copies to hardware*/dma_issue_pending_all()#endifreturnsoftnet_break:sd->time_squeeze++__raise_softirq_irqoff(NET_RX_SOFTIRQ)goto out}这里有处理方式比较直观,直接遍历poll_list链表,处理之前设置了两个限制:budget和time_limit。前者限制本次处理数据包的总量,后者限制本次处理总时间。只有二者均有剩余的情况下,才会继续处理。处理期间同样是开中断的,每次总是从链表表头取设备进行处理,如果设备被调度,其实就是检查NAPI_STATE_SCHED位,则调用 napi_struct的poll函数,处理结束如果没有处理完,则把设备移动到链表尾部,否则从链表删除。NAPI设备对应的poll函数会同样会调用__netif_receive_skb函数上传协议栈,这里就不做分析了,感兴趣可以参考e100的poll函数e100_poll。而非NAPI对应poll函数为process_backlog。static int process_backlog(struct napi_struct *napi, int quota){int work = 0struct softnet_data *sd = container_of(napi, struct softnet_data, backlog)#ifdef CONFIG_RPS/* Check if we have pending ipi, its better to send them now,* not waiting net_rx_action() end.*/if (sd->rps_ipi_list) {local_irq_disable()net_rps_action_and_irq_enable(sd)}#endifnapi->weight = weight_plocal_irq_disable()while (work <quota) {struct sk_buff *skbunsigned int qlen/*涉及到两个队列process_queue和input_pkt_queue,数据包到来时首先填充input_pkt_queue,而在处理时从process_queue中取,根据这个逻辑,首次处理process_queue必定为空,检查input_pkt_queue如果input_pkt_queue不为空,则把其中的数据包迁移到process_queue中,然后继续处理,减少锁冲突。*/while ((skb = __skb_dequeue(&sd->process_queue))) {local_irq_enable()/*进入协议栈*/__netif_receive_skb(skb)local_irq_disable()input_queue_head_incr(sd)if (++work >= quota) {local_irq_enable()return work}}rps_lock(sd)qlen = skb_queue_len(&sd->input_pkt_queue)if (qlen)skb_queue_splice_tail_init(&sd->input_pkt_queue,&sd->process_queue)if (qlen <quota - work) {/** Inline a custom version of __napi_complete().* only current cpu owns and manipulates this napi,* and NAPI_STATE_SCHED is the only possible flag set on backlog.* we can use a plain write instead of clear_bit(),* and we dont need an smp_mb() memory barrier.*/list_del(&napi->poll_list)napi->state = 0quota = work + qlen}rps_unlock(sd)}local_irq_enable()return work}函数还是比较简单的,需要注意的每次处理都携带一个配额,即本次只能处理quota个数据包,如果超额了,即使没处理完也要返回,这是为了保证处理器的公平使用。处理在一个while循环中完成,循环条件正是work <quota,首先会从process_queue中取出skb,调用__netif_receive_skb上传给协议栈,然后增加work。当work即将大于quota时,即++work >= quota时,就要返回。当work还有剩余额度,但是process_queue中数据处理完了,就需要检查input_pkt_queue,因为在具体处理期间是开中断的,那么期间就有可能有新的数据包到来,如果input_pkt_queue不为空,则调用skb_queue_splice_tail_init函数把数据包迁移到process_queue。如果剩余额度足够处理完这些数据包,那么就把虚拟设备移除轮询队列。这里有些疑惑就是最后为何要增加额度,剩下的额度已经足够处理这些数据了呀?根据此流程不难发现,其实执行的是在两个队列之间移动数据包,然后再做处理。
以前每次看到中断,就觉得很神秘,后面随着学习和了解,有了大概的了解,来浅谈下中断。
我们来想下,计算机中的各个设备的运行速度差异非常大,特别是CPU和外设,为了整体性能的提升,CPU在工作的时候显然不能停下来等着外部设备 *** 作完毕。比如我们CPU读磁盘文件的时候,不可能等数据读取结束了,而是CPU切换到其他线程上继续执行。
那当外部磁盘完成了读数据之后,一般是将磁盘数据拷贝到内核缓存之后,需要通知CPU,告速它,”嘿,你要的数据准备好了,可以继续了!“ 这种通知机制,就是中断。
磁盘控制器,网络适配器和定时芯片,都可以发生中断,这些芯片通过向CPU的一个管脚发信号,并将异常号放到系统总线上。当CPU执行完毕当前的一条指令后,发现管脚的电压变高了,就从系统总线上读取异常信号,然后调用中断的处理程序,处理完成后返回原来的指令流,继续执行。
这样理解中断,即是将正在执行的指令暂停,执行相关的中断处理程序,有了中断,我们就可以让CPU相应外部的输入。
关于软中断和硬中断,极客时间《Linux性能优化和实战》中有一个非常恰当的例子,就是收外卖。
比如你定了一份外卖,你不知道什么时候来,而且没有公共放外卖的地方。如果没有”中断“,那就需要一直门口等着,是不是看看外卖来了没有。
如果你和外卖员约定好,你到小区门口就给我打电话,这样你就可以暂时不管外卖继续做你该做的事情,然后外卖人员到门口给你打电话(发生了中断) ,你就可以下来拿外卖了。所以 中断其实是一种异步的事件处理机制,可以提高系统的并发处理能力。
这里面,外卖人员给你打电话的时候就相当于发生了中断,接电话的时间相当于中断处理时间,在接电话的时候,如果你同时定了其他外卖,其他外卖人员也给你打电话的时候,你就无法接了这个外卖人员的电话,这就相当于我们在处理中断的时候,同时关闭了中断响应(防止破坏上下文)。
所以中断处理程序必须快,但是有的中断, 需要很长的时间来处理中断,对于这种情况,将中断分成上半部分和下半部分。
在网络收包的过程中,如果包都是很小的包,又是高吞吐量的环境,导致短时间内有大量的包,显然,会发生大量的硬中断和软中断,这些中断都会占用CPU,限制了网络的吞吐量,对于这种情况linux的NAPI,每次中断时候不只处理一个网络包,而是不断轮询设备队列,直到队列为空。更极端的例子是DPDK这个高性能网络包处理库,则是采用一直轮询的方式来达到不需要中断,快速进行包处理的目的,这也造成了DPDK的轮询模式下CPU占据100%的现象。
上次在top命令介绍中,我们可以从top命令中看到软中断和硬中断所占的CPU:
查看Linux下软中断情况:
查看linux下硬中断情况:
如果仅仅这样看,是看不到变化的,可以采用以下命令查看:
查看下硬中断:
其中:
网络收包的时候,会发生硬中断和软中断,如果设置不好,也会影响到收包的性能。这种问题可以这样去查:
通过此命令查看irq和soft 即硬中断和软中断在各个cpu占用上是否均衡。
如果irq的不均衡,集中在特定的cpu上,需要查看下具体的硬中断类型。
通过以上命令,来查看cpu上不同的硬中断的数量上,配合 mpstat 命令就知道是哪个中断号引起的了。
假设是44号中断引起的,我们就可以看下 此中断号对应的处理cpu:
通过这个值,我们就可以知道这个中断是在2号cpu上处理的,smp_affinity值哪一位为一,就是标识哪个cpu处理,可以同时被多个cpu处理,这就达到中断均衡的目的。
假如我们先让2个cpu,1号cpu和2号cpu同时处理此中断用以下命令即可达到目的
如果是多个cpu,比如64个,我们先让网络的收包中断均衡到这些cpu上,可以在网上找一个set_irq_affinity 的脚本(找不到可以联系我)执行下面命令,将0,2,4,6等32个cpu绑定到p1p1的中断上。
注意这种指定cpu方式,需要关闭中断自动均衡程序:
一、查看网卡型号和机器位数
1、查看网卡型号
linux系统下通过以下命令,可以查看当前的网卡驱动信息;
[box color="white" icon="none"]
[root@localhost zhangy]# lspci |grep -i eth
03:00.0 Ethernet controller: Realtek Semiconductor Co., Ltd. RTL8111/8168B PCI Express Gigabit Ethernet controller (rev 06)
[/box]
2、查看机器位数
驱动程序是要区分系统是32位系统还是64位系统的, 所以通过以下linux命令,就可以知道 *** 作系统的位数了;
[box color="white" icon="none"]
[root@localhost ~]# uname -a
Linux localhost.localdomain 2.6.18-308.13.1.el5PAE #1 SMP Tue Aug 21 17:50:26 EDT 2012 i686 i686 i386 GNU/Linux
[/box]
下载前先看一下你的网卡驱动,如果是最新的就不用在重新装了。
[box color="white" icon="none"]
[root@localhost zhangy]# ethtool -i eth0
driver: r8169
version: 2.3LK-1-NAPI
firmware-version:
bus-info: 0000:03:00.0
[/box]
RTL8111/8168B就是网卡的型号,这样你可以网卡的型号来找一下网卡驱动的官方网站,然后下载最新的网卡驱动,驱动分64位和32位的,i386,i686是32位的机器,x86_64表示是64位的机器,不要选错驱动了。
二、下载驱动,并安装
1、解压
[box color="white" icon="none"]
[root@localhost download]# tar jxvf r8168-8.032.00.tar.bz2
[/box]
2、安装
[box color="white" icon="none"]
[root@localhost r8168-8.032.00]# make &&make install
[/box]
如果报以下错误
make -C /lib/modules/2.6.18-308.8.2.el5PAE/build SUBDIRS=/home/zhangy/r8168-8.032.00/src INSTALL_MOD_DIR=kernel/drivers/net modules_install
make: *** /lib/modules/2.6.18-308.8.2.el5PAE/build: 没有那个文件或目录。 停止。
make: *** [install] 错误 2
说明你kernel源码没有安装。安装kernel源码
[box color="white" icon="none"]
[root@localhost r8168-8.032.00]# yum install kernel-xen kernel-xen-devel kernel \
>kernel-PAE kernel-PAE-devel kernel-devel kernel-headers
[/box]
安装完以后,一定要重启机器。不然下面 *** 作就过不去了,会报FATAL: Module r8168 not found.
[box color="white" icon="none"]
[root@localhost r8168-8.032.00]# depmod -a
[root@localhost r8168-8.032.00]# modprobe r8168
[/box]
编辑配置文件/etc/modprobe.cof,如果以前没有添加alias eth0 r8168,如果已经有了修改一下成alias eth0 r8168。
查一下驱动是不是加载了,如果有以下内容说明驱动安装成功了。
[box color="white" icon="none"]
[root@localhost r8168-8.032.00]# lsmod |grep r8168
r8168 231132 0
[/box]
必须重新启动一下机器,用ethtool查看驱动才会改变,不然还是老样子。下面是新的驱动。
[box color="white" icon="none"]
[root@localhost ~]# ethtool -i eth0
driver: r8168
version: 8.032.00-NAPI
firmware-version:
bus-info: 0000:02:00.0
[/box]
评论列表(0条)