Netty学习之旅------NioSocketChannel源码分析之读事件处理逻辑

Netty学习之旅------NioSocketChannel源码分析之读事件处理逻辑,第1张

@Override

protected int doReadBytes(ByteBuf byteBuf) throws Exception {

return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());//javaChannel()方法返回的就是java.nio.SocketChannel

}

这里涉及到ByteBuf与通道打交道,最终还是会落到java.nio.Channel和java.nio.ByteBuffer上去,所以下文我也觉得有必要深究一下整个API的调用,增强对java.nio原生api的使用。

代码@7,如果没有读到可用数据,则回收刚申请的ByteBuf,如果读到的数据小于0,则说明需要将通道关闭。设置close=true,并跳出循环。

代码@8,将读到的内容(ByteBuf)通过管道传播到各个Handler,此时Handler的处理,默认都会在IO线程中处理。

代码@9,如果本次读到的字节数小于分配ByteBuf的可写字节数,说明该通道已经没有数据可读,结束本次循环。

代码@10,再次从通道中读取,直到读取次数已经超过配置的最大预读取次数,maxMessagesPreRead,如果是ServerChannel或AbstractNioChannel,则默认为16,其他的默认为1。

代码@11,触发读完成事件。

代码@12,该方法非常重要,IO线程将本次读取的字节数反馈给 接收ByteBuf分配器,方便下一次分配合理的ByteBuf,足够用,但不能超过太多。

代码@13,如果发生异常触发异常事件。

对整个通道读事件处理,上述是主要的处理流程,接下来重点分析如下两个方面代码@4: RecvByteBufAllocator和代码@8

2.1 RecvByteBuffAllocator系列类分析

/**

  • Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough

  • not to waste its space.

*/

public interface RecvByteBufAllocator {

/**

  • Creates a new handle. The handle provides the actual operations and keeps the internal information which is

  • required for predicting an optimal buffer capacity.

*/

Handle newHandle();

interface Handle {

/**

  • Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small

  • enough not to waste its space.

*/

ByteBuf allocate(ByteBufAllocator alloc);

/**

  • Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the

  • capacity.

*/

int guess();

/**

  • Records the the actual number of read bytes in the previous read operation so that the allocator allocates

  • the buffer with potentially more correct capacity.

  • @param actualReadBytes the actual number of read bytes in the previous read operation

*/

void record(int actualReadBytes);

}

}

该接口主要要解决的问题就是:在处理通道读事件的时候,如何确定需要分配多大的ByteBuf呢?

对如下英文进行通俗化的理解:

  1. Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and s

mall enough

* not to waste its space.

分配一个接收ByteBuf,希望这个容量足够大,能够容纳通道中可读数据,但又尽量少,够用就好,别浪费空间。

2)guess 方法,只返回猜测,建议的容量(capacity),不执行实际的内存申请。

3)Records the the actual number of read bytes in the previous read operation so that the allocator allocates the buffer with potentially more correct capacity.

记录上传读到的实际字节的大小,以便分配器更加准确的分配正确的容量。

2.1.1 FixedRecvByteBufAllocator  固定容量分配,该方式简单,但忽略了IO线程的反馈。

/**

* The {@link RecvByteBufAllocator} that always yields the same buffer

* size prediction.  This predictor ignores the feed back from the I/O thread.

*/

public class FixedRecvByteBufAllocator implements RecvByteBufAllocator

2.1.2 AdaptiveRecvByteBufAllocator 自适应分配算法

1)概述

/**

* The {@link RecvByteBufAllocator} that automatically increases and

* decreases the predicted buffer size on feed back.

* It gradually increases the expected number of readable bytes if the previous

* read fully filled the allocated buffer.  It gradually decreases the expected

* number of readable bytes if the read operation was not able to fill a certain

* amount of the allocated buffer two times consecutively.  Otherwise, it keeps

* returning the same prediction.

*/

AdaptiveRecvByteBufAllocator 根据IO线程的反馈自动增加或减少预期的buffer size。

如果上一次读填满了分配的缓存区,则增大猜测缓存区的大小,如果上一次读没有填满分配的缓冲区,则减少。

2)AdaptiveRecvByteBufAllocator 源码分析

1、核心属性详解

static final int DEFAULT_MINIMUM = 64; // 分配的最小容量

static final int DEFAULT_INITIAL = 1024; // 初始容量

static final int DEFAULT_MAXIMUM = 65536; // 最大容量64k

private static final int INDEX_INCREMENT = 4; //增长索引数,在SIZE_TABLE下的索引

private 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 static final int INDEX_DECREMENT = 1; // 减少索引数

private static final int[] SIZE_TABLE; //小于512个字节,按16个字节递增,大于512字节,成倍增长

// SIZE_TABLE的初始化,其实这里有效使用的最大内存为65536, 64K

static {

List sizeTable = new ArrayList();

for (int i = 16; i < 512; i += 16) { //小于512字节,从16开始,以16递增

sizeTable.add(i);

}

for (int i = 512; i > 0; i <<= 1) { //大于512字节,成倍增长,该循环在超过int最大值时退出

sizeTable.add(i);

}

SIZE_TABLE = new int[sizeTable.size()];

for (int i = 0; i < SIZE_TABLE.length; i ++) {

SIZE_TABLE[i] = sizeTable.get(i);

}

}

private final int minIndex; // DEFAULT_MINIMUM 所在SIZE_TABLE中的下标

private final int maxIndex; // DEFAULT_MAXIMUM 所在SIZE_TABLE中的下标

private final int initial; // 初始时,分配的buffer大小,默认为1K

2、构造方法

/**

  • Creates a new predictor with the default parameters. With the default

  • parameters, the expected buffer size starts from {@code 1024}, does not

  • go down below {@code 64}, and does not go up above {@code 65536}.

*/

private AdaptiveRecvByteBufAllocator() {

this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);

}

/**

  • Creates a new predictor with the specified parameters.

  • @param minimum the inclusive lower bound of the expected buffer size

  • @param initial the initial buffer size when no feed back was received

  • @param maximum the inclusive upper bound of the expected buffer size

*/

public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {

if (minimum <= 0) {

throw new IllegalArgumentException("minimum: " + minimum);

}

if (initial < minimum) {

throw new IllegalArgumentException("initial: " + initial);

}

if (maximum < initial) {

throw new IllegalArgumentException("maximum: " + maximum);

}

int minIndex = getSizeTableIndex(minimum); //@1

if (SIZE_TABLE[minIndex] < minimum) {

this.minIndex = minIndex + 1;

} else {

this.minIndex = minIndex;

}

int maxIndex = getSizeTableIndex(maximum);

if (SIZE_TABLE[maxIndex] > maximum) {

this.maxIndex = maxIndex - 1;

} else {

this.maxIndex = maxIndex;

}

this.initial = initial;

}

构造方法,主要是初始化minIndex、maxIndex,initial。重点看一下getSizeTableIndex方法的详解:

private static int getSizeTableIndex(final int size) {

for (int low = 0, high = SIZE_TABLE.length - 1;😉 { //典型的二分查找算法

if (high < low) { //@1

return low;

}

if (high == low) { //@2

return high;

}

int mid = low + high >>> 1;

int a = SIZE_TABLE[mid];

int b = SIZE_TABLE[mid + 1];

if (size > b) {

low = mid + 1;

} else if (size < a) {

high = mid - 1;

} else if (size == a) {

return mid;

} else {

return mid + 1;

}

}

}

二分查找的原理,对于一个有序排序,用中间的数与需要查找的数进行比较,然后在另外一半里进行再分半查询,每次将范围缩小一半。

二分查找,对于high的初始值有学问,使用的是SIZE_TABLE.length - 1,就是为了确保越界。(min = low + hign >>> 1),如果在后半部分,确保该值最大为该值。

3、内部Handler实现类

private static final class HandleImpl implements Handle {

private final int minIndex;

private final int maxIndex;

private int index;

private int nextReceiveBufferSize;

private boolean decreaseNow;

HandleImpl(int minIndex, int maxIndex, int initial) {

this.minIndex = minIndex;

this.maxIndex = maxIndex;

index = getSizeTableIndex(initial);

nextReceiveBufferSize = SIZE_TABLE[index];

}

@Override

public ByteBuf allocate(ByteBufAllocator alloc) {

return alloc.ioBuffer(nextReceiveBufferSize);

}

@Override

public int guess() {

return nextReceiveBufferSize;

}

@Override

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/796099.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存