linux内核之无锁缓冲队列kfifo原理(结合项目实践)

linux内核之无锁缓冲队列kfifo原理(结合项目实践),第1张

概述linux内核之无锁缓冲队列kfifo原理(结合项目实践)

无锁缓冲队列kfifo 1. kfifo概述2. kfifo功能描述3. __kfifo_put与__kfifo_get详解4. kfifo_get和kfifo_put无锁并发 *** 作5. 总结6. 项目使用介绍7. 其它 userspace 移植实现

linux kernel里面从来就不缺少简洁,优雅和高效的代码,只是我们缺少发现和品味的眼光。在linux kernel里面,简洁并不表示代码使用神出鬼没的超然技巧,相反,它使用的不过是大家非常熟悉的基础数据结构,但是kernel开发者能从基础的数据结构中,提炼出优美的特性。

kfifo就是这样的一类优美代码,它十分简洁,绝无多余的一行代码,却非常高效。

关于kfifo信息如下:

本文分析的源代码版本: 2.6.24.4
kfifo的定义文件: kernel/kfifo.c
kfifo的头文件: include/linux/kfifo.h

1. kfifo概述

kfifo是内核里面的一个First In First Out数据结构,它采用环形循环队列的数据结构来实现;它提供一个无边界的字节流服务,最重要的一点是,它使用并行无锁编程技术,即当它用于只有一个入队线程和一个出队线程的场情时,两个线程可以并发 *** 作,而不需要任何加锁行为,就可以保证kfifo的线程安全。

kfifo代码既然肩负着这么多特性,那我们先一敝它的代码:

struct kfifo {    unsigned char *buffer;    /* the buffer holding the data */    unsigned int size;    /* the size of the allocated buffer */    unsigned int in;    /* data is added at offset (in % size) */    unsigned int out;    /* data is extracted from off. (out % size) */    spinlock_t *lock;    /* protects concurrent modifications */};

这是kfifo的数据结构,kfifo主要提供了两个 *** 作,__kfifo_put(入队 *** 作)和__kfifo_get(出队 *** 作)。 它的各个数据成员如下:

buffer: 用于存放数据的缓存
size: buffer空间的大小,在初化时,将它向上扩展成2的幂
lock: 如果使用不能保证任何时间最多只有一个读线程和写线程,需要使用该lock实施同步。
in, out: 和buffer一起构成一个循环队列。 in指向buffer中队头,而且out指向buffer中的队尾,它的结构如示图如下:

+--------------------------------------------------------------+|            |<----------data---------->|                      |+--------------------------------------------------------------+             ^                          ^                      ^             |                          |                      |            out                        in                     size

当然,内核开发者使用了一种更好的技术处理了in, out和buffer的关系,我们将在下面进行详细分析。

2. kfifo功能描述 只支持一个读者和一个读者并发 *** 作无阻塞的读写 *** 作,如果空间不够,则返回实际访问空间

kfifo_alloc 分配kfifo内存和初始化工作

struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock){    unsigned char *buffer;    struct kfifo *ret;    /*     * round up to the next power of 2, since our 'let the indices     * wrap' tachnique works only in this case.     */    if (size & (size - 1)) {        BUG_ON(size > 0x80000000);        size = roundup_pow_of_two(size);    }    buffer = kmalloc(size, gfp_mask);    if (!buffer)        return ERR_PTR(-ENOMEM);    ret = kfifo_init(buffer, size, gfp_mask, lock);    if (IS_ERR(ret))        kfree(buffer);    return ret;}

1.判断一个数是不是2的次幂 :
kfifo要保证其缓存空间的大小为2的次幂,如果不是则向上取整为2的次幂。其对于2的次幂的判断方式也是很巧妙的。如果一个整数n是2的次幂,则二进制模式必然是1000…,而n-1的二进制模式则是0111…,也就是说n和n-1的每个二进制位都不相同,例如:8(1000)和7(0111);n不是2的次幂,则n和n-1的二进制必然有相同的位都为1的情况,例如:7(0111)和6(0110)。这样就可以根据 n & (n-1)的结果来判断整数n是不是2的次幂,实现如下:

static inline bool is_power_of_2(uint32_t n){    return (n != 0 && ((n & (n - 1)) == 0));}

2.将数字向上取整为2的次幂 :
如果设定的缓冲区大小不是2的次幂,则向上取整为2的次幂,例如:设定为5,则向上取为8。上面提到整数n是2的次幂,则其二进制模式为100…,故如果正数k不是n的次幂,只需找到其最高的有效位1所在的位置(从1开始计数)pos,然后1 << pos即可将k向上取整为2的次幂。实现如下:

static inline uint32_t roundup_power_of_2(uint32_t a){    if (a == 0)        return 0;    uint32_t position = 0;    for (int i = a; i != 0; i >>= 1)        position++;    return static_cast<uint32_t>(1 << position);}

fifo->in & (fifo->size - 1) 再比如这种写法取模,获取已用的大小。这样用逻辑与的方式相较于加减法更有效率

3. __kfifo_put与__kfifo_get详解

linux内核中kfifo实现技巧,主要集中在放入数据的put方法和取数据的get方法

__kfifo_put是入队 *** 作,它先将数据放入buffer里面,最后才修改in参数;
__kfifo_get是出队 *** 作,它先将数据从buffer中移走,最后才修改out。

代码如下:

unsigned int __kfifo_put(struct kfifo *fifo,             unsigned char *buffer, unsigned int len){    unsigned int l;    len = min(len, fifo->size - fifo->in + fifo->out);    /*     * Ensure that we sample the fifo->out index -before- we     * start putting bytes into the kfifo.     */    smp_mb();    /* first put the data starting from fifo->in to buffer end */    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);    /* then put the rest (if any) at the beginning of the buffer */    memcpy(fifo->buffer, buffer + l, len - l);    /*     * Ensure that we add the bytes to the kfifo -before-     * we update the fifo->in index.     */    smp_wmb();    fifo->in += len;    return len;}

6行,环形缓冲区的剩余容量为fifo->size - fifo->in + fifo->out,让写入的长度取len和剩余容量中较小的,避免写越界;
13行,加内存屏障,保证在开始放入数据之前,fifo->out取到正确的值(另一个cpu可能正在改写out值)
16行,前面讲到fifo->size已经2的次幂圆整,而且kfifo->in % kfifo->size 可以转化为 kfifo->in & (kfifo->size – 1),所以fifo->size - (fifo->in & (fifo->size - 1)) 即位 fifo->in 到 buffer末尾所剩余的长度,l取len和剩余长度的最小值,即为需要拷贝l 字节到fifo->buffer + fifo->in的位置上。
17行,拷贝l 字节到fifo->buffer + fifo->in的位置上,如果l = len,则已拷贝完成,第20行len – l 为0,将不执行,如果l = fifo->size - (fifo->in & (fifo->size - 1)) ,则第20行还需要把剩下的 len – l 长度拷贝到buffer的头部。
27行,加写内存屏障,保证in 加之前,memcpy的字节已经全部写入buffer,如果不加内存屏障,可能数据还没写完,另一个cpu就来读数据,读到的缓冲区内的数据不完全,因为读数据是通过 in – out 来判断的。
29行,注意这里 只是用了 fifo->in += len而未取模,这就是kfifo的设计精妙之处,这里用到了unsigned int的溢出性质,当in 持续增加到溢出时又会被置为0,这样就节省了每次in向前增加都要取模的性能,锱铢必较,精益求精,让人不得不佩服。

unsigned int __kfifo_get(struct kfifo *fifo,             unsigned char *buffer, unsigned int len){    unsigned int l;    len = min(len, fifo->in - fifo->out);    /*     * Ensure that we sample the fifo->in index -before- we     * start removing bytes from the kfifo.     */    smp_rmb();    /* first get the data from fifo->out until the end of the buffer */    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);    /* then get the rest (if any) from the beginning of the buffer */    memcpy(buffer + l, fifo->buffer, len - l);    /*     * Ensure that we remove the bytes from the kfifo -before-     * we update the fifo->out index.     */    smp_mb();    fifo->out += len;    return len;}

6行,可去读的长度为fifo->in – fifo->out,让读的长度取len和剩余容量中较小的,避免读越界;
13行,加读内存屏障,保证在开始取数据之前,fifo->in取到正确的值(另一个cpu可能正在改写in值)
16行,前面讲到fifo->size已经2的次幂圆整,而且kfifo->out % kfifo->size 可以转化为 kfifo->out & (kfifo->size – 1),所以fifo->size - (fifo->out & (fifo->size - 1)) 即位 fifo->out 到 buffer末尾所剩余的长度,l取len和剩余长度的最小值,即为从fifo->buffer + fifo->in到末尾所要去读的长度。
17行,从fifo->buffer + fifo->out的位置开始读取l长度,如果l = len,则已读取完成,第20行len – l 为0,将不执行,如果l =fifo->size - (fifo->out & (fifo->size - 1)) ,则第20行还需从buffer头部读取 len – l 长。
27行,加内存屏障,保证在修改out前,已经从buffer中取走了数据,如果不加屏障,可能先执行了增加out的 *** 作,数据还没取完,令一个cpu可能已经往buffer写数据,将数据破坏,因为写数据是通过fifo->size - (fifo->in & (fifo->size - 1))来判断的 。
29行,注意这里 只是用了 fifo->out += len 也未取模,同样unsigned int的溢出性质,当out 持续增加到溢出时又会被置为0,如果in先溢出,出现 in < out 的情况,那么 in – out 为负数(又将溢出),in – out 的值还是为buffer中数据的长度。

图解一下 in 先溢出的情况,size = 64, 写入前 in = 4294967291, out = 4294967279 ,数据 in – out = 12;


写入 数据16个字节,则 in + 16 = 4294967307,溢出为 11,此时 in – out = –4294967268,溢出为28,数据长度仍然正确,由此可见,在这种特殊情况下,这种计算仍然正确,是不是让人叹为观止,妙不可言?

4. kfifo_get和kfifo_put无锁并发 *** 作

计算机科学家已经证明,当只有一个读经程和一个写线程并发 *** 作时,不需要任何额外的锁,就可以确保是线程安全的,也即kfifo使用了无锁编程技术,以提高kernel的并发。

kfifo使用in和out两个指针来描述写入和读取游标,对于写入 *** 作,只更新in指针,而读取 *** 作,只更新out指针,可谓井水不犯河水,示意图如下:

                                               |<--写入-->|+--------------------------------------------------------------+|                        |<----------data----->|               |+--------------------------------------------------------------+                         |<--读取-->|                         ^                     ^               ^                         |                     |               |                        out                   in              size

为了避免读者看到写者预计写入,但实际没有写入数据的空间,写者必须保证以下的写入顺序:

1.往[kfifo->in, kfifo->in + len]空间写入数据
2.更新kfifo->in指针为 kfifo->in + len

在 *** 作1完成时,读者是还没有看到写入的信息的,因为kfifo->in没有变化,认为读者还没有开始写 *** 作,只有更新kfifo->in之后,读者才能看到。

那么如何保证1必须在2之前完成,秘密就是使用内存屏障:smp_mb(),smp_rmb(), smp_wmb(),来保证对方观察到的内存 *** 作顺序。

5. 总结

kfifo优点:

实现单消费者和单生产者的无锁并发访问。多消费者和多生产者的时候还是需要加锁的。使用与运算in & (size-1)代替模运算在更新in或者out的值时不做模运算,而是让其自动溢出。这应该是kfifo实现最牛叉的地方了,利用溢出后的值参与运算,并且能够保证结果的正确。溢出运算保证了以下几点: in - out为缓冲区中的数据长度size - in + out 为缓冲区中空闲空间in == out时缓冲区为空size == (in - out)时缓冲区满了

读完kfifo代码,令我想起那首诗“众里寻他千百度,默然回首,那人正在灯火阑珊处”。不知你是否和我一样,总想追求简洁,高质量和可读性的代码,当用尽各种方法,江郞才尽之时,才发现linux kernel里面的代码就是我们寻找和学习的对象。

6. 项目使用介绍

思想和上面差不多,只不过把队列内容改为了指针,是为了我们项目存储数据(使用时malloc)用到。废话不多说,直接上源码:

struct kfifo {	voID  **data;    /* the buffer holding the data */	unsigned int size;    /* the count of the data */	unsigned int pod;    /* productor */	unsigned int cons;    /* consumer */};static inline uint32_t roundup_power_of_two(uint32_t a){    if (a == 0)        return 0;    uint32_t position = 0;    for (int i = a; i != 0; i >>= 1)        position++;    return static_cast<uint32_t>(1 << position);}struct kfifo* kfifo_alloc(uint32_t size){	struct kfifo* fifo;	if (size & (size - 1))	{		size = roundup_pow_of_two(size);	}	FORCE_DEBUG(w_log_fp, "kfifo_alloc file stream queue len:%d n", size);	fifo = (struct kfifo* )malloc(sizeof(struct kfifo));	fifo->data = (voID**)malloc(size * sizeof(voID*));	fifo->size = size;	fifo->pod = 0;	fifo->cons = 0;	return fifo;}voID kfifo_free(struct kfifo* fifo){	uint32_t i = 0;	for ( i = 0; i < fifo->size; i++ )	{		free(fifo->data[i]);	}	free(fifo);}int kfifo_put(struct kfifo* fifo, voID * data){	if ( data == NulL )		return 0;   if (fifo->pod - fifo->cons < fifo->size)   {      fifo->data[fifo->pod & (fifo->size - 1) ] = data;      fifo->pod++;      return 1;   }   else      return 0;}voID* kfifo_get(struct kfifo* fifo){   voID * result = NulL;   if (fifo->pod != fifo->cons)   {      result = fifo->data[fifo->cons & (fifo->size - 1) ];      fifo->cons++;   }   return result;}
7. 其它 userspace 移植实现

kfifo的移植:
https://blog.csdn.net/liigo/article/details/100993236

参考:
https://www.cnblogs.com/wangshaowei/p/11559522.HTML
https://blog.csdn.net/linyt/article/details/53355355
https://blog.csdn.net/chen19870707/article/details/39899743

总结

以上是内存溢出为你收集整理的linux内核之无锁缓冲队列kfifo原理(结合项目实践)全部内容,希望文章能够帮你解决linux内核之无锁缓冲队列kfifo原理(结合项目实践)所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/yw/1013104.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-22
下一篇 2022-05-22

发表评论

登录后才能评论

评论列表(0条)