netty底层源码探究:启动流程;EventLoop中的selector、线程、任务队列;监听处理accept、read事件流程;

netty底层源码探究:启动流程;EventLoop中的selector、线程、任务队列;监听处理accept、read事件流程;,第1张

netty底层源码探究:启动流程;EventLoop中的selector、线程、任务队列;监听处理accept、read事件流程;

文章目录
  • 探究源码
    • 启动流程
      • nio启动流程
      • 概述
      • init
      • Register
      • doBind0()
    • EventLoop
      • Selector何时被创建
      • 两个Selector成员变量
      • EventLoop的nio线程何时被启动
      • 提交普通任务会不会结束Selector阻塞
      • wakeup()方法
      • 何时进入Select分支进行阻塞
      • 会阻塞多久
      • nio空轮询bug
      • EventLoop---ioRatio
      • 执行io事件,在哪进行事件判断
    • accept流程
    • read流程

探究源码 启动流程 nio启动流程

因为netty的底层使用的是nio,所以先回忆一下nio的启动流程对于接下来要探究的netty启动流程也是有好处的。

  1. 创建一个选择器,监听多个channel发生的各类事件

    Selector selector = Selector.open();
    
  2. 创建一个ServerSocketChannel,并且设置非阻塞

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    
  3. 将serverSocketChannel注册进选择器中

    SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
    

    这是jdk原生的ServerSocketChannel,将来如果selector发生了事件,会将这个事件交给Nio相应的类去处理,这里就使用到了attachment附件,通过附件将serverSocketChannel与NioServerSocketChannel进行绑定。

    NioServerSocketChannel nioServerSocketChannel = new NioServerSocketChannel();
    SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
    
  4. 绑定端口

    serverSocketChannel.bid(new InetSocketAddress(8080));
    
  5. 在selectionKey上注册一个它关心的事件类型

    selectionKey.interestOps(SelectionKey.OP_ACCEPT);
    
概述

上面nio的五个步骤是如何在netty中实现的?

下方代码是使用netty创建一个服务器的基本步骤

new ServerBootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(NioSocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new StringDecoder());
            }
        })
        .bind(8080);

我们知道EventLoop包含了一个Selector和一个单线程执行器 ,也就是说.group(new NioEventLoopGroup()) 这行语句可以看为是完成Nio的第一步创建一个选择器的。

Nio剩下的四个步骤其实都是在.bind(8080); 这行语句完成的,然后我们点进bind()方法,接着会进入到第一个比较重要的方法doBind

private ChannelFuture doBind(final SocketAddress localAddress) {
    // initAndRegister()方法 所做的事情就是初始化和注册,相当于上面Nio的第二步和第三步
    // 它会将ServerSocketChannel创建好后注册进Selector中。该方法返回一个Future对象,就说明该方法是异步的,
    final ChannelFuture regFuture = this.initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
       // 这里就会利用future对象调用isDone()进行判断,如果上面initAndRegister()方法干的活比较快,就会执行if语句,
       // 但是一般情况下initAndRegister()方法中的nio线程将ServerSocketChannel和Selector进行绑定会比较慢 会执行else语句
    } else if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        // doBind0()方法是相当于Nio的第四步 绑定端口,监听事件
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        final AbstractBootstrap.PendingRegistrationPromise promise = new 			                                                                              AbstractBootstrap.PendingRegistrationPromise(channel);
        // 从这里可以看出future对象采用了异步的方式执行下面的语句,下方的doBind0()方法也就不是主线程调用了
        regFuture.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.registered();
                        // 进入到else语句后会在这里执行doBind0()方法
                        AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
                    }

                }
            });
        return promise;
    }
}

在正式开始之前需要了解ServerBootstrap.bind(8080);是主线程调用的,然后进入到doBind()方法 在进入到initAndRegister()方法中,直到创建ServerSocketChannel都是主线程做的事,包括register的前一部分都是主线程,但是在Register中会启动Nio线程,后续的 *** 作就不是在主线程中执行了,ServerSocketChannel注册进Selector中都是Nio线程做的事,如下图所示:

概述需要了解的就几件事

  • init是创建ServerSocketChannel
  • Register是将ServerSocketChannel注册进Selector中的,是nio线程执行的
  • initAndRegister()会返回一个future对象,然后使用该对象进行if判断,一般情况下都会进入到else语句
  • else语句中会利用future的异步方式,通过nio线程来执行doBind0()方法

init

接下来详细了解initAndRegister()方法中的init部分。

final ChannelFuture initAndRegister() {
    Channel channel = null;

    try {
        // 这行就是创建一个channel,创建的就是NioServerSocketChannel。
        // 再点进newChannel()方法就会发现里面是利用了反射调用无参构造方法获取的对象  constructor.newInstance()
        // 这里不仅仅会创建NioServerSocketChannel。还会创建jdk的ServerSocketChannel
        channel = this.channelFactory.newChannel();
        // 创建NioServerSocketChannel对象后就调用了init()方法,具体方法如下方代码所示
        this.init(channel);
    } catch (Throwable var3) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
        }
     return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
    }

    // 上面的init部分  从这里开始就是register部分了
    ChannelFuture regFuture = this.config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    return regFuture;
}

创建NioServerSocketChannel对象后就调用了init()方法

void init(Channel channel) throws Exception {
    。。。
    ChannelPipeline p = channel.pipeline();
   。。。
       // 这里就会发现创建NioServerSocketChannel后会往该channel的pipeline中添加一个Handler
       // 这个ChannelHandler和其他hander不同的地方在于该handler的initChannel()方法只会执行一次
       // 这里执行往pipeline中添加handler哦 还没有到执行的地步哦
    p.addLast(new ChannelHandler[]{new ChannelInitializer() {
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = ServerBootstrap.this.config.handler();
            if (handler != null) {
                pipeline.addLast(new ChannelHandler[]{handler});
            }

            ch.eventLoop().execute(new Runnable() {
                public void run() {
                    pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
                }
            });
        }
    }});
}

所以initAndRegister()方法中的init部分的作用就是

  • 创建了一个NioServerSocketChannel,
  • 并往该channel的pipeline中添加了一个Handler。

Register

接下来就轮到了Register部分

final ChannelFuture initAndRegister() {
    Channel channel = null;
    // init
    try {
        channel = this.channelFactory.newChannel();
        this.init(channel);
    } catch (Throwable var3) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
        }
     return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
    }

// 上面的init部分  从这里开始就是register部分了
ChannelFuture regFuture = this.config().group().register(channel);
if (regFuture.cause() != null) {
    if (channel.isRegistered()) {
        channel.close();
    } else {
        channel.unsafe().closeForcibly();
    }
}

return regFuture;

首先是register部分的第一行代码ChannelFuture regFuture = this.config().group().register(channel);该方法返回的是有个ChannelFuture对象,那么我们笃定该方法是异步的。然后我们点进该方法,多点几次就会进入到核心

经过的类如下图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8bC8ZvbT-1628604267435)(E:Java笔记pictureimage-20210806125339453.png)]

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    } else if (AbstractChannel.this.isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
    } else if (!AbstractChannel.this.isCompatible(eventLoop)) {
        promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    } else {
        AbstractChannel.this.eventLoop = eventLoop;
        // 到目前为止都是主线程在执行,下面这个if就是判断当前线程是否是nio线程,所以自然而然就进入到else语句中
        if (eventLoop.inEventLoop()) {
            this.register0(promise);
        } else {
            // 这里做的事情就是将正在做事的register0(promise)方法封装到了任务对象中,然后让eventLoop线程去执行
            try {
                // 这里第一次调用execute()方法会创建EventLoop线程,然后取执行run()方法,而不是早就创建好线程直接用。
                eventLoop.execute(new Runnable() {
                    public void run() {
                        AbstractUnsafe.this.register0(promise);
                    }
                });
            } catch (Throwable var4) {
                。。。
            }
        }

    }
}

现在进入到register0(promise)方法体中

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !this.ensureOpen(promise)) {
            return;
        }

        boolean firstRegistration = this.neverRegistered;
        // 在netty源码中,一般do开头的方法就是真正做事的方法,具体功能如下方的代码
        AbstractChannel.this.doRegister();
       	this.neverRegistered = false;
        AbstractChannel.this.registered = true;
        // doRegister()方法结束后还有下面这个方法也比较重要。我们上面在init的步骤中往channel的pipeline中添加了一个Handler,但是还没有被调用。下面这个方法就是调用init添加的Handler。
        // 那个Handler的作用就是又往pipeline中添加另一个Handler,用来当acceptor事件发生后建立连接的
        AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
        // 在最开始的代码中主线程调用了initAndRegister()方法 返回了一个future对象,然后采用了异步的方式执行一些代码,异步的方法体需要等待Future对象中有数据了才会执行,那么谁来给数据嘞?就是下面这个safeSetSuccess()方法。
        // 下面方法体中的promise就是initAndRegister()方法返回的future对象 方法意思就是给这个promise设置一个安全的成功值
        // 这里给了结果 initAndRegister()方法返回的Future对象 异步方式的回调方法就会被执行 会执行方法体中的doBind0()方法。 回调方法体如下:
        this.safeSetSuccess(promise);
        AbstractChannel.this.pipeline.fireChannelRegistered();
        if (AbstractChannel.this.isActive()) {
            if (firstRegistration) {
                AbstractChannel.this.pipeline.fireChannelActive();
            } else if (AbstractChannel.this.config().isAutoRead()) {
                this.beginRead();
            }
        }
    } catch (Throwable var3) {
       。。。
    }
}

再进入到doRegister()方法中 这里会进入到AbstractNioChannel类的doRegister()方法

protected void doRegister() throws Exception {
    boolean selected = false;

    while(true) {
        try {
            // this.javaChannel()就是拿到jdk原生的ServerSocketChannel 
            // 然后在用jdk原生的ServerSocketChannel调用register()方法,
            // 该方法之前参数1需要绑定的Selector就是从我们现在的EventLoop中得到,刚开始没有关注事件,
            // 最后的附件this就是NioServerSocketChannel
            // 所以下面就和nio时学的serverSocketChannel.register(selector, 0, attachment)一样
            this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
            // 这里的附件NioServerSocketChannel 之前讲过,如果jdk原生的ServerSocketChannel发生了事件,是由NioServerSocketChannel 调用一些对应的方法进行处理。
            return;
        } catch (CancelledKeyException var3) {
            。。
        }
    }
}

initAndRegister()方法返回的Future对象 等待Future异步回调的方法

regFuture.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        Throwable cause = future.cause();
        if (cause != null) {
            promise.setFailure(cause);
        } else {
            promise.registered();
            // 进入到else语句后会在这里执行doBind0()方法
            AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
        }

    }
});

Register做的事情就是:

  • 从主线程切换到nio线程
  • 将ServerSocketChannel注册进Selector中,附件和NioServerSocketChannel进行绑定
  • 再执行了init时添加的Handler,又往pipeline中又添加了一个Handler,用来处理将来发生的accept事件
  • 并为promise赋值,也就是initAndRegister()方法返回的Future对象赋值,让future对象能够执行异步方法 在该方法中调用doBind0()方法

doBind0()

经过了initAndRegister()之后,就轮到执行Future异步方式的回调函数中 调用的doBind0()方法了

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    // netty的老套路,保存是EventLoop线程执行
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if (regFuture.isSuccess()) {
                // 继续往下面执行
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }

        }
    });
}

一直往里面点

最后就进入到AbstractChannel类中的bind()方法

public final void bind(SocketAddress localAddress, ChannelPromise promise) {
    this.assertEventLoop();
    if (promise.setUncancellable() && this.ensureOpen(promise)) {
        ...

        boolean wasActive = AbstractChannel.this.isActive();

        try {
            // 正真正做事的方法 do开头 它会执行ServerSocketChannel与端口号的绑定
            AbstractChannel.this.doBind(localAddress);
        } catch (Throwable var5) {
            this.safeSetFailure(promise, var5);
            this.closeIfClosed();
            return;
        }

        // 在doBind()之后运行,
        // 这里if就是判断就是 判断当前的ServerSocketChannel是否可以用了  是否是Active状态
        if (!wasActive && AbstractChannel.this.isActive()) {
            this.invokeLater(new Runnable() {
                public void run() {
                    // 如果当前channel是Active状态了就会执行下面的语句
                    // 作用是触发channel中pipeline里面的所有Handler的active事件
                    AbstractChannel.this.pipeline.fireChannelActive();
                }
            });
        }

        this.safeSetSuccess(promise);
    }
}

点进doBind()方法 选择 NioServerSocketChannel类中的doBind()方法:

protected void doBind(SocketAddress localAddress) throws Exception {
    // 首先判断jdk版本是否大于等于7
    if (PlatformDependent.javaVersion() >= 7) {
        // this.javaChannel() 就是jdk的ServerSocketChannel,后面指定端口与设置全连接队列的大小
        this.javaChannel().bind(localAddress, this.config.getBacklog());
    } else {
        this.javaChannel().socket().bind(localAddress, this.config.getBacklog());
    }
}

doBind()方法执行完后会执行pipeline.fireChannelActive()方法,触发channel中pipeline里面的所有Handler的active事件。当前pipeline中的handler是:head–>acceptor–>tail。后两个Handler即使触发了active也不会做什么事情,主要做事的还是Head这一个Handler ,然后点进pipeline.fireChannelActive()方法

public void channelActive(ChannelHandlerContext ctx) {
    ctx.fireChannelActive();
    // 真正让SelectorKey关注accept事件是下面的方法执行的
    this.readIfIsAutoRead();
}

再往下执行 最后会进入到AbstractNioChannel类的doBeginRead()方法

protected void doBeginRead() throws Exception {
    // 这里就是SelectionKey
    SelectionKey selectionKey = this.selectionKey;
    if (selectionKey.isValid()) {
        this.readPending = true;
        int interestOps = selectionKey.interestOps();
        // 这里首先判断SelectionKey是否已经关注Accept事件,如果没有才会执行下面的方法
        if ((interestOps & this.readInterestOp) == 0) {
            // 这里的 | 就是+号  this.readInterestOp的值就是16 
            // nio中 selectionKey.interestOps(SelectionKey.OP_ACCEPT); 这里的SelectionKey.OP_ACCEPT也是16
            selectionKey.interestOps(interestOps | this.readInterestOp);
        }
    }
}

到目前为止,nio中的五个步骤在netty中都执行到了


EventLoop

NioEventloop的重要组成:selector、线程、任务队列。我们现在找到这三个在源码中的位置

// selector
public final class NioEventLoop extends SingleThreadEventLoop {
    ...
    private Selector selector;
    private Selector unwrappedSelector;
    private SelectedSelectionKeySet selectedKeys;
}
// 线程和任务队列在NioEventLoop的父类的父类中
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    ...
    private final Queue taskQueue; // 任务队列
    private volatile Thread thread; // 线程
    ...
    private final Executor executor;// 这个就是一个单线程的线程池 也就是上面的那个线程
    // 因为NioEventloop只有一个线程,但我们可能会提交多个任务,单线程同一时刻就只能执行一个任务,多出来的任务就放在任务队列中
    // 然后由线程从队列中依次取出任务来执行
}

再进入到曾祖父类AbstractScheduledEventExecutor

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
    ...
    // 处理定时任务的任务队列    
    PriorityQueue> scheduledTaskQueue;
}

NioEventLoop即会处理io任务,也会处理普通任务,还有定时任务。


Selector何时被创建

在NioEventloop类的构造方法中

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    } else if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    } else {
        this.provider = selectorProvider;
        // 这里会调用openSelector()方法
        NioEventLoop.SelectorTuple selectorTuple = this.openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
        this.selectStrategy = strategy;
    }
}

进入到openSelector()

private NioEventLoop.SelectorTuple openSelector() {
    final AbstractSelector unwrappedSelector;
    try {
        // 这里实际上就是创建一个Selector对象
        unwrappedSelector = this.provider.openSelector();
    } catch (IOException var7) {
        throw new ChannelException("failed to open a new selector", var7);
    }
    。。。
}

以前我们在nio中使用的是Selector.open()方法创建是Selector,这里对比一下这两种方式有什么区别

public abstract class Selector implements Closeable {
    protected Selector() { }
    public static Selector open() throws IOException {
        // 可以看到 nio调用的open()方法内部也是和netty调用的一样的方法
        return SelectorProvider.provider().openSelector();
    }
    。。。
}

Selector何时被创建?

在NioEventLoop的构造方法中被创建。


两个Selector成员变量

为什么在NioEventLoop中会两个Selector成员变量

// selector
public final class NioEventLoop extends SingleThreadEventLoop {
    ...
    // 在源码中也就是下面这两个Selector,各自有什么作用
    private Selector selector;
    private Selector unwrappedSelector;
    private SelectedSelectionKeySet selectedKeys;
}

在上面创建Selector时,调用nio底层方法provider.openSelector()创建的Selector其实是赋值给了unwrappedSelector。为什么netty还要再加一个Selector嘞?因为在nio原生的Selector会有一个SelectionKeys集合,将来发生了事件我们要从个这里面获取事件的信息。这个集合的实现默认使用的set集合,我们都知道set遍历的性能并不高,因为它的底层是一个hash表,遍历hash表会先去遍历每个hash桶,然后再去遍历每个链表。

因为遍历的性能并不高,所以netty做了这样一个优化,将nio内部SelectionKeys集合给替换掉了,换为了基于数组的实现。

// 具体的实现还是在NioEventloop类的openSelector()方法中
private NioEventLoop.SelectorTuple openSelector() {
    final AbstractSelector unwrappedSelector;
    try {
        // 创建selector
        unwrappedSelector = this.provider.openSelector();
    } catch (IOException var7) {
        throw new ChannelException("failed to open a new selector", var7);
    }
	。。。
    // 内部基于数组的实现
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    。。。
        
        try {
            // 这里是利用反射 先拿到Selector的实现类,然后获得私有成员变量
            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
            。。。
            // 使用netty提供的一个反射工具类,将这个私有成员变量 暴力反射 然后可以调用    
            Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
            if (cause != null) {
                return cause;
            } else {
                // 暴力反射
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                if (cause != null) {
                    return cause;
                } else {
                    // 反射调用这两个成员变量,然后将原始的Selector对象用netty提供的替换掉
                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                }
            }
            。。。
}

为什么在NioEventLoop中会两个Selector成员变量?

为了在遍历SelectinKeys时提高效率

  • private Selector selector; 这个是包装后的 内部SelectinKeys基于数组实现的Selector
  • private Selector unwrappedSelector; 这个是原始的Selector

EventLoop的nio线程何时被启动

第一次调用execute()方法时会创建EventLoop线程

EventLoop eventLoop = new NioEventLoopGroup().next();
eventLoop.execute(()->{
	System.out.println("第一次调用execute()方法时会创建EventLoop线程");
})

主线程调用execute()方法,底层执行流程如下

public void execute(Runnable task) {
    // 首先判断方法的参数 Runnable对象是否为null
    if (task == null) {
        throw new NullPointerException("task");
    } else {
        // 然后在判断当前线程是否为nio线程,这里是主线程调用的execute()方法 所以这里会返回fasle
        boolean inEventLoop = this.inEventLoop();
        this.addTask(task); // 把这个任务加入到任务队列中
        if (!inEventLoop) { // false在取反就为true 就会进入到if里面
            this.startThread(); // 然后就会执行startThread()方法首次开启这个线程
            。。。
        }

下面为SingleThreadEventExecutor类的startThread()方法

private void startThread() {
    // 这里第一次进来时 state的值为1 所以第一次这里的条件满足  if的第二个条件就是将state从1改为2 
    // 所以 当以后再次调用该方法 if的条件就不会成立了,只有第一次会成立
    if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
        boolean success = false;

        try {
            // 这里就是开启线程 , 如果开启成功就会将success变量变为true,所以findlly语句中也就不会执行了
            // 如果开启线程抛异常了,finally语句就又会将state从2变为1
            this.doStartThread();
            success = true;
        } finally {
            if (!success) {
                STATE_UPDATER.compareAndSet(this, 2, 1);
            }
        }
    }

}

我们在点进doStartThread();方法

private void doStartThread() {
    // EventLoop中的线程还为null
    assert this.thread == null;

    // 这里的executor就是单线程的线程池 这里使用这里面的nio线程执行一个任务
    this.executor.execute(new Runnable() {
        public void run() {
            // 执行的任务就是把当前的nio线程赋值为 EventLoop中的thread成员变量。到这里一步,thread也就有值了
            SingleThreadEventExecutor.this.thread = Thread.currentThread();
            if (SingleThreadEventExecutor.this.interrupted) {
                    SingleThreadEventExecutor.this.thread.interrupt();
                }
                ...
                label1907: {
                    try {
                        var112 = true;
                        // 这里的run方法也比较重要,它做的事是 一个死循环, 不断的去找任务、定时任务、io事件去执行
                        SingleThreadEventExecutor.this.run();
                        ...
                    }
        }
    }
}

EventLoop的nio线程何时被启动

首次调用execute方法时启动,重复调用该方法也不会启动多次线程,因为底层有一个if判断,第一次启动有一个状态为statu值为1,当线程启动成功后就会将值变为2,所以再次调用该方法if判断也不会成立。


提交普通任务会不会结束Selector阻塞

上面说过,首次调用execute方法启动nio线程,还会调用一个run()方法,启动一个死循环,代码如下

 protected void run() {
        for (;;) {
            try {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                       
					// 当有事件发生的时候就会调用下方的select()方法
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
      。。。

在点进select()方法

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
     。。。

         // 这里就是nio时我们见过的阻塞方法,避免线程一直在运行死循环,和我们使用时不一样的是这里不是调用的无参的select()方法
         // netty调用的是有参的select()方法,当这里面的超时时间到了之后就会解除阻塞,之所以采用有参的select()方法
         // 这是因为EventLoop不仅仅要处理io事件,还要处理一些其他任务,它不能一直阻塞。
         // 这里当超时时间到了会解除阻塞,或者是有新任务提交也会唤醒它 以便及时的处理io事件之外的任务
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

当有普通任务提交了,nio线程这里的超时时间还没到这里还是阻塞的,那到底是如何唤醒这里的嘞?

这是因为当主线程调用execute()方法,然后再从里面调用startThread()方法,该方法会判断status是否值为1,也就是首次被调用,然后在调用doStartThread()方法,这个方法里面就会启动Executor单线程池里面的nio线程,主线程到这暂时结束,nio线程为thread赋值,然后调用我们这里的死循环run()方法,再进入switch分支调用一个方法 进而导致selector.select(timeoutMillis)阻塞。

当有普通任务了,通过eventLoop.execute(()->{..})添加普通任务,主线程又会执行一次execute()方法,又会调用startThread()方法,这时已经不是第一次调用该方法了所以不会做什么事情,然后startThread()方法执行完后接着执行execute()方法会调用一个wakeup()方法来唤醒nio阻塞的线程去执行普通任务。

提交普通任务会不会结束Selector阻塞

主线程会调用wakeup()方法唤醒阻塞的nio线程来执行普通任务。


wakeup()方法

通过上面引出了wakeup()方法,这里详细介绍该方法的执行条件

在NioEventLoop类中

protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && this.wakenUp.compareAndSet(false, true)) {
        this.selector.wakeup();
    }
}

首先这里对if判断条件进行解读,前面一部分是表示只有EventLoop线程之外的线程提交任务才有机会执行wakeup()方法。如果的EventLoop线程自己提交的任务就会走其他的逻辑。wakenUP变量在定义时的类型是AtomicBoolean,它是采用cas的方式去设置值,多个线程在同一个时刻修改它的值只会有一个成功。它的作用是什么?

因为this.selector.wakeup();是一个比较耗费性能的 *** 作,所以我们应该避免对它的频繁调用。将来会有这样一个情况,有多个线程都来提交任务,都走到了上面这个地方,那么selector需要唤醒几次嘞,其实1次就足够了。所以就用到了wakenUp这个原子变量。


何时进入Select分支进行阻塞

在上面我们知道主线程首次调用eventLoop.execute(Runnable run) —>调用startThread()方法 —>判断statu是否为1 是否为首次调用execute()方法 —>调用doStartThread()方法 nio线程接手主线程运行—>SingleThreadEventExecutor.this.run() —> 死循环

protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:

                case SelectStrategy.SELECT:
                    // 这里才会让EventLoop线程 selector.select(timeoutMillis) 进入阻塞
                    // 那么什么条件下才会进入到这条分支嘞?
                    select(wakenUp.getAndSet(false));

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                }
            } catch (IOException e) {
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }

            。。。
}

决定进入select分支的条件就是switch中的代码selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())。

点进方法:

public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    // 这里会根据上面传递过来的boolean变量决定,如果为false时就会走select阻塞分支
    // 这个Boolean变量的作用就是当前是否有任务,如果没有任务就会进入select阻塞分支
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

如果有任务 selectSupplier.get()又会有什么作用。get()方法中又调用了selectNow()方法

int selectNow() throws IOException {
    try {
        // 之前nio讲select()方法时讲了三个使用,一个是空参的select,第二个就是带超时时间的select 第三个就是selectNow()
        // 该方法与前两个不同的地方是它不会阻塞,它会在selector上立刻查看是否有事件发生,如果没有就返回0
        return selector.selectNow();
    } finally {
        // restore wakeup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

所以,当没有任务时,才会进入Select分支,进行阻塞。如果有任务时 最终会调用selectNow()方法返回当前的任务数,并跳出switch多分支语句,执行switch下面的代码来处理任务。


会阻塞多久

当没有事件发生时,进入了select分支,最终执行selector.select(timeoutMillis); 那么这个超时时间为多久嘞,源码如下:

long currentTimeNanos = System.nanoTime(); // 当前时间
// 截止时间=当前时间+1秒       如果有定时任务 截止时间=当前时间+下一个定时任务开始的事件
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 
for (;;) {
	// 超时时间 这里又减去了当前时间,后面又加上0.5毫秒,最后的是将纳秒转换为毫秒 
	long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; 
    。。。
}

所以当没有定时任务的情况下selector.select(timeoutMillis); 只会阻塞1秒左右。

那么什么时候会跳出select分支进入的select()方法的死循环

  1. 当前时间超过了截止时间,因为每一次循环都会重新为当前时间变量赋值
  2. 有任务发生了也会退出死循环
  3. 有事件发生了也会推出死循环
private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime(); // 当前时间
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // 截止时间

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            // 当前时间超过了截止时间 会退出死循环
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // hasTasks()是否有任务
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
		
            // 阻塞 当有事件发生会解除阻塞 selectedKeys也不为0
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            // 所以又事件发生这里也会退出死循环
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            
            。。。

            long time = System.nanoTime();
			。。。
            currentTimeNanos = time;
        }
        // for end
        。。
    } catch (CancelledKeyException e) {
        ..
    }
}

nio空轮询bug

nio中selector.select()方法有一个bug(jdk在linux环境下),本来正常情况下如果是无参的select()方法,只有在有事件发生时才会解除阻塞,如果是有超时时间的select()方法,没有事件发生需要等到超时时间才会解除阻塞。而这个bug有很小的几率会出现,那就是即使没有事件发生,超时时间也没到,也不会阻塞,特别是当好几个EventLoop线程都空轮询这就会很占用CPU资源。

netty解决了nio的空轮询bug,它解决的方法是采用一个循环计数的方式

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        // 就是这个selectCnt变量  初始值为0
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime(); 
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
		
            // 阻塞 当有事件发生会解除阻塞 selectedKeys也不为0
            int selectedKeys = selector.select(timeoutMillis);
            // 每循环一次就会让计数++
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            
            。。。

            long time = System.nanoTime();
			if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    
                    selectCnt = 1;
                // 这里首先判断有没有设置一个 期望值 大于0  并且如果 selectCnt大于了这个期望值就会退出死循环
                // 这个期望值默认 会读取运行时的环境变量io.netty.selectorAutoRebuildThreshold
                // 如果我们自己设置了值就以我们设置的为准,如果没有设置默认是512。
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // 当发生了空轮询bug如果出现了就会调用下面的方法,作用是重新创建一个selector,替换掉旧的selector,
                // 还会把旧的selector中的一些信息赋值个新的,内部的实现还是比较复杂的。
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }
            currentTimeNanos = time;
        }
        // for end
        。。
    } catch (CancelledKeyException e) {
        ..
    }
}
EventLoop—ioRatio
@Override
protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.BUSY_WAIT:
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                }
            } catch (IOException e) {
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }
			// 当阻塞解除后,就会继续执行switch语句下面的代码来处理任务或者是发生的事件
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                // 如果ioRatio的值设置为100 就会先运行所有的io事件,然后在运行所有的普通任务
                try {
                    processSelectedKeys();
                } finally {
                    
                    runAllTasks();
                }
            } else {// else 会做两件事
                // 首先获取当前时间
                final long ioStartTime = System.nanoTime();
                try {
                    // 1. 处理所有的io事件
                    processSelectedKeys();
                } finally { 
                    // 处理完io事件后在获取一次当前时间,在和之前的事件相减,得到处理io事件的时间
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // 2. 运行普通任务,普通任务能执行的事件就是 ioTime * (100 - ioRatio) / ioRatio 来控制的
                    // 当普通任务运行的时间超过了这里传递的事件 它就不会从任务队列中拿普通任务执行了
                    // 会等下一次循环 处理完io事件后再来执行普通任务,这样就实现了优先处理io事件
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        。。
    }
}

如果普通任务的耗时比较长,那势必会影响到io事件的执行,毕竟EventLoop中只是一个单线程,netty为了避免因为普通任务的耗时较长影响到io事件,netty会做一个参数的控制——ioRatio 这个参数是控制处理io事件所占用的事件比例,它默认是50%


执行io事件,在哪进行事件判断

就从上面的源码中 ioRatio 参数的判断中 执行所有的io事件的方法processSelectedKeys(); 跟进这个方法,就会进入到下面的方法中

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        // 这里首先是拿到所有的SelectionKey
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;

        // 然后获取附件,也就是NioServerSocketChannel ,这里为什么要拿到这个对象嘞,
        // 因为接下来要对SelectionKey进行各种各样的处理,也就是Handler,所以需要通过channel得到pipeline 在得到Handler
        final Object a = k.attachment();

        // 拿到channel了就进行判断是否是NioChannel
        if (a instanceof AbstractNioChannel) {
            //然后就会进入到这个方法
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask task = (NioTask) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

processSelectedKey(k, (AbstractNioChannel) a);方法的源码,就是在这里面进行各类事件的判断,然后进行相应的处理

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            return;
        }
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // 可连接事件 客户端的事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 可写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }

        // 可读事件和连接事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}


accept流程

首先会议nio中accept的流程

  1. selector.select()阻塞
  2. 遍历SelectionKeys
  3. 判断事件类型
  4. 创建SocketChannel
  5. 注册进Selector中
  6. 利用SelectionKey监听read事件

其实前面三步已经在上面学习EventLoop的源码中已经实现了,接下来重点关注后面的三步

在processSelectedKey(k, (AbstractNioChannel) a);方法中进行事件判断

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    ...
    try {
        int readyOps = k.readyOps();
        // 可连接事件 客户端的事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 可写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }

        // 可读事件和连接事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

当服务器启动,客户端启动就会进入到unsafe.read();方法中

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                // 这里就会创建SocketChannel 并将它设置为非阻塞的 , 这里的readBuf就是创建的NioSocketChannel
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }

                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // 这里就是把刚刚建立的连接当成一个消息 给pipeline中的Handler去处理。
            // 目前的handler共有:head--> accept ---> tail,所以这里肯定是accept这个handler来进行处理
            // 其实接下来的两步 将SocketChannel注册进select并且监听read事件都是这个handler做的事
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        。。
    } finally {
        。。。
    }
}

创建SocketChannel 并将它设置为非阻塞的方法

protected int doReadMessages(List buf) throws Exception {
    // 这里面就是获得SocketChannel ,这里调用了一个工具类的方法,方法的具体实现就是ServerSocketChannel.accept() 
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            // 得到了SocketChannel 还需要创建NioSocketChannel并为这两个建立联系
            // 所以这里将就SocketChannel作为构造方法的参数传递给了NioSocketChannel
            // 在构造方法中也会将SocketChannel设置为非阻塞
            // 创建好后就将NioSocketChannel添加进list集合中 , 也就是一个消息,将来要给pipeline中的Handler去处理
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        。。
    }

    return 0;
}
 

接下来就不一步一步走了,最终它会到accept 这个Handler 中的方法中。最后会进入到ServerBootstrap类中的静态内部类ServerBootstrapAcceptor中的channelRead()方法

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 这里的msg就是上面创建的NioSocketChannel
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    // 这里会对NioSocketChannel 设置一些参数
    setChannelOptions(child, childOptions, logger);

    for (Entry, Object> e: childAttrs) {
        child.attr((AttributeKey) e.getKey()).set(e.getValue());
    }

    try {
        // 比较重要的方法就是这里的register(child) 它会做的事件就是把这个channel和EventLoop中的一个selector进行绑定
        // 这个register()方法和之前 初始化ServerSocketChannel时调用的register()方法很类似
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}
 

接着在进入register(child)方法,最终会进入到AbstractChannel类的register()方法

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    } else if (AbstractChannel.this.isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
    } else if (!AbstractChannel.this.isCompatible(eventLoop)) {
        promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    } else {
        AbstractChannel.this.eventLoop = eventLoop;
        // 这里会把当前线程和EventLoop线程进行判断,现在的线程虽然是EventLoop线程,但还是会进入到else语句中
        // 这里因为目前的线程是NioServerSocketChannel的EventLoop线程,但是线程新建了一个NioSocketChannel,这两个channel肯定不能共用一个线程,所以就会进入到else中新开一个线程来执行register0(promise)方法
        if (eventLoop.inEventLoop()) {
            this.register0(promise);
        } else {
            try {
                // 这里会拿到新的EventLoop,用新的EventLoop中的线程
                eventLoop.execute(new Runnable() {
                    public void run() {
                        // 程序会走到这里。
                        AbstractUnsafe.this.register0(promise);
                    }
                });
            } catch (Throwable var4) {
                。。。
            }
        }

    }
}

AbstractUnsafe.this.register0(promise);这行代码最后会调用doRegister()方法,

doRegister()方法就会先拿到jdk原生的SocketChannel,注册进当前EventLoop中的Selector中。并且把当前NioSocketChannel作为附件绑定上去

doRegister()方法结束后 , register0(promise)方法继续运行,调用pipeline.invokeHandlerAddedIfNeeded();方法,其实就是给现在的NioSocketChannel加一个Handler,这些handler就是我们自己在代码中写的 ,这里会把我们写的handler 加到channel中。

register0(promise)方法继续运行, 调用pipeline.fireChannelActive(); 这里的作用就是拿到SelectionKey,然后关注read事件,这最后会在AbstractNioChannel类的doBeginRead()方法中添加关注read事件


read流程

还是在这个地方处理读事件

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

将断电打在这里,当第一次触发断点是连接,直接放行,再一次触发就是read事件了。

public final void read() {
    final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
        clearReadPending();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    // 获取Bytebuf的分配器,决定Bytebuf是池化还非池化
    final ByteBufAllocator allocator = config.getAllocator();
    // 可以动态调整Bytebuf的大小,并强制使用直接内存
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 这里就是分配具体的Bytebuf
            byteBuf = allocHandle.allocate(allocator);
            // 客户端发送了数据,服务器端这里调用doReadBytes()方法后救护我那个Bytebuf中填充内容
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                // nothing was read. release the buffer.
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            // 找到当前NioServerSocketChannel上的pipeline,然后触发一个读事件,
            // 就是将这个Bytebuf依次传给入站Handler 依次去处理。
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

最后祝大家学有所成,所愿皆所得。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5590445.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)