Netty源码分析专题[2]-消息队列MpscQueue分析

Netty源码分析专题[2]-消息队列MpscQueue分析,第1张

Netty源码分析专题[2]-消息队列MpscQueue分析 Netty源码分析专题[2]-消息队列MpscQueue分析

  在看Netty源码的时候看到了这个队列,之前都没见过,所以特地写个笔记记录一下

  MpscQueue的Mpsc的全称是Multi producer single consumer【多生成者单消费者】,我们先把单消费者单生成者的情况分析清楚,在多生产者的情况下,多了一步抢位置的动作,就是如果多个线程同时要往队列添加数据,需要先抢占一下最后一个位置,这就涉及多线程同步,可以加锁,但是更多的是CAS *** 作,线程同步问题就不在这介绍了

1、前序

  由于消费者消费数据和生产者生成数据其实流程差不多,所以着重分析一下生产数据的过程,在介绍MpscQueue之前,先回顾一下环形队列,假设现在有一个可以装8个元素的环形队列,结构如下【环形队列只是逻辑上的结构,实际内存只是一个线性结构的数组或者链表,我们以数组的形式展现】:

  这里假设消费者能及时消费掉数据,不考虑阻塞的情况,假设现在生产者已经生成到数组的末尾也就是7号位置,那么生产者生产的数据该放在什么位置?

  因为是环形结构,逻辑上0号位置和7号位置是首尾相接的,所以下一个位置应该就是0号位置,在数学上就是:(已生产的数据-1)%队列大小,所以需要两个变量

1)、pindex:生产者生产的数据个数,从0开始,生产一个数据+12)、offset:元素需要放置位置在数组的对应下标,其数值为pindex%队列大小

  优化一下,求余 *** 作在除数是2的整数次幂时可以转化为按位与 *** 作,这也是为什么很多队列或者内存在分配空间时会将传入的大小转换成2的整数次幂后再进行后续 *** 作的原因,以队列大小为8为例,计算结果如下【二进制位值显示低8位】

Pindex队列大小余数(offset)pindex二进制(队列大小-1)二进制按位与二进制按位与十进制0800x000000000x000001110x0000000001810x000000010x000001110x0000000112820x000000100x000001110x0000001023830x000000110x000001110x0000001134840x000001000x000001110x0000010045850x000001010x000001110x0000010156860x000001100x000001110x0000011067870x000001110x000001110x0000011178800x000010000x000001110x0000000009810x000010010x000001110x00000001110820x000010100x000001110x000000102

  可以看出余数和按位与后的十进制结果一样,在除法 *** 作除了余数还有商,在这个前提下,商同可以用位 *** 作,以上面为例9/8的商就是将9的二进制由移3位【3位就是0x00000111中1的个数】得到的结果就是1,具体可以自行验证

  为啥搞的这么复杂?

  提高效率,计算机内部都是二进制数据,位 *** 作比除法快的不要太多!

  既然说到环形队列,则不得不提Disruptor,其号称是单线程情况下的最强队列,内部的核心存储就是一个环形Buffer,有兴趣的可以取了解一哈

  好了,不跑偏了,开始介绍一个MpscQueue是个什么鬼

2、MpscQueue基本结构及扩容方式

  假设我们现在没有环形队列,同样以队列元素为8来进行说明,初始化时,队列的结构如下:

  这………………不是和之前一毛一样,先别急,假设现在生产者已经生产到位置7,由于该buffer正在逻辑上不是环形结构,要想接着生产位置8,只有一个solution,队列扩容,一般都是扩容成原大小的两倍,此时出现了两种扩容方式,下面分别介绍

2-1 朴素思想-创建新的Buffer

  这是比较朴素的思想,就是创建新的buffer,然后将数据拷贝过去,假设是两倍扩容,那么扩容后的数据结构如下:

  直接扩容的方式比较简单,只要空间足够就行,但是思考一个问题,假设现在消费者已经将队列的前8个元素都消费走了,下一次消费的是第9个元素,其实buffer中的位置0到位置7已经不会再被访问,换句话说就是占用的内存可以释放了,但是这种情况下似乎好像并不会,当然可以在数据拷贝是进行压缩处理,但是带来的就是要调整生产者生成位置指针和消费者消费指针,在多线程条件下想想就头大啊,当然你可以加把锁,lock一下,这势必又会影响效率,所以MpscQueue采用的肯定也不是这个思路

2-2 MpscQueue扩容方式

  空间不够了,那么第一步肯定是创建出一个新的空间,假设扩容的新Buffer的大小和初始大小一致,则两个Buffer如下图:

  那么问题来了,当消费者要消费第9个元素时,生成者该怎么知道下一个该消费的buffer在哪?

  所以需要在buffer1中找个空间将buffer2的位置存起来,MpscQueue里默认是存在buffer尾部,但是这个尾部位置是在初始化时增加的空间,也就是虽然我们要创建的队列元素个数是8,实际它会偷偷增加一个元素位置,存储下一个buffer的地址,所以队列会变成:

  是不是有点像链表了,扩容完成之后,就得计算第9个元素应该放在buffer2的哪个位置了,是不是正常思维就是从buffer2的0号位置开始,列个表

Pindexbuffer1_index0011223344556677buffer2_index8091102

  这在逻辑上不就和之前的环形队列是一样的,所以可以说MpscQueue其实在逻辑上也是一个环形队列

  似乎这样已经可以了,但是又会有一个问题,消费者怎么知道消费到buffer1的哪个位置时需要跳转到buffer2消费,所以需要有个地方能明确标识出当消费者消费到该位置时,跳转到下一个buffer消费【下一个buffer的位置从当前buffer的最后 一个位置取出即可】在MpscQueue中这个标识被称为Jump对象,当遇到Jump对象后,消费者就会到下一个buffer读取数据,那么生成者应该将Jump对象放置于数组的什么地方呢?但是实际的放置位置有点小复杂

2-2-1 Jump对象放置策略1

  和下一个buffer的位置一样,最终也需要多申请一个位置放Jump的对象,假设buffer容量是8,则多申请两个位置,buffer[8]存储Jump,buffer[9]存储下一个buffer的地址,存储的位置如下图所示:

  简单分析一下,当消费者消费第9个数据是,会直接得到序号是0,可以检测一下此时下一个元素时不是JUMP对象,如果是,则跳到buffer2,否则从buffer1读取,似乎好像没啥问题

  如果扩容的buffer和现有buffer不一样大,假设是现在的两倍,会出现什么情况?
自己补个图吧,扩容之后buffer2的长度变成16,第9个元素进行按位与运算后得到的数值还是9,说明无论生产者还是消费者,都会从buffer2的第9个元素位置开始消费或者生成数据,会造成空间的浪费,并且也不符合环形缓冲区的结构了,所以就有了另外一种策略

2-2-2 Jump对象放置策略2

  不在增加新的位置,也就是容纳个数为8的buffer最多申请9个位置,最后一个位置存储下一个buffer的地址,Jump对象存储在正常的数据区,也就是实际的buffer智能存储8-1=7个元素,整体的流程如下:

1)、使用一个变量存储当前队列所允许的生产者生成的最大数据量,假设为plimit2)、当生产的数据量达到plimit-1时,进行队列扩容,在当前buffer的plimit对应的位置存储Jump对象,在下一个buffer对应的位置存储元素,有点抽象,画个图吧

初始化,队列容量为8,允许放置的最大元素个数
Plimit=8-1=7,队列结构图:

添加第8个元素时,此时8>7,需要进行队列的扩容,扩容后的结构如下,同时将buffer2的地址存到buffer1的末尾,buffer1的第7号位置放置Jump对象,队列结构图

  第8个元素本来是要放置在buffer1的第7号位置,但是此时第7号位置放了JUMP对象,那么第八个元素只能放置在buffer2的第7号位置,因为求余的结果就是7,程序必须到某个buffer的第7号位置取数据,放置数据后队列结构如下:

  添加第9个元素时,此时9<14,buffer长度为8,求余(9-1)%8后得到的位置号是0,意味着第9个元素要放在buffer2的第0号位置,和预期的一致,放置数据后队列结构如下:

  添加第15个元素时,此时16>14,同样进行队列扩容,得到的结果如下,队列结构如下【图片编号:image-20211227184931384.png】:

是否解决了策略1的问题?

  假设此时buffe2的长度是16,buffer1是8,此时队列最大的放置原数个数是7+15=22,当放置第16个元素时,由于16<22,会直接将数据存储在buffer2的0号位置,不会开辟新的空间

  消费者读取数据的流程也类似,只要遇到Jump对象,就跳到下个Buffer对应的位置取数据即可,不用额外的 *** 作

3 MpscQueue特点

  1)、在消费者消费完某个buffer的数据后,会在buffer的末尾【原来存储下一个buffer地址的位置】存储一个标记对象,代表该buffer的所有数据已经被消费,可以重用空间或者回收空间

  2)、MpscQueue在生产和消费数据是,生产者的index【也就是已经生产的数据量】并不是每次都加1,而是加2,主要的原因是如果每次都加2,在正常生产消费的情况下,指针肯定都是偶数,对应在二进制中的现象就是最低位永远是0,所以可以使用index的最低位表示状态:0 代表正常,1 代表正在扩容;虽然index每次都加2,实际存储的位置很容易可以换算得出

  3)、MpscQueue的Buffer读写 *** 作使用了Unsafe类直接对内存进行 *** 作,提升了整体的效率,可以调试下看看,Unsafe类 *** 作的都是Byte,存储位置相对于Buffer首地址的偏移是在modifiedCalcCircularRefElementOffset()函数中完成

  5)、缓存行对齐,前面说的Disruptor页采用了这个优化策略

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5719413.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存