- 探究源码
- 启动流程
- nio启动流程
- 概述
- init
- Register
- doBind0()
- EventLoop
- Selector何时被创建
- 两个Selector成员变量
- EventLoop的nio线程何时被启动
- 提交普通任务会不会结束Selector阻塞
- wakeup()方法
- 何时进入Select分支进行阻塞
- 会阻塞多久
- nio空轮询bug
- EventLoop---ioRatio
- 执行io事件,在哪进行事件判断
- accept流程
- read流程
因为netty的底层使用的是nio,所以先回忆一下nio的启动流程对于接下来要探究的netty启动流程也是有好处的。
-
创建一个选择器,监听多个channel发生的各类事件
Selector selector = Selector.open();
-
创建一个ServerSocketChannel,并且设置非阻塞
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false);
-
将serverSocketChannel注册进选择器中
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, null);
这是jdk原生的ServerSocketChannel,将来如果selector发生了事件,会将这个事件交给Nio相应的类去处理,这里就使用到了attachment附件,通过附件将serverSocketChannel与NioServerSocketChannel进行绑定。
NioServerSocketChannel nioServerSocketChannel = new NioServerSocketChannel(); SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
-
绑定端口
serverSocketChannel.bid(new InetSocketAddress(8080));
-
在selectionKey上注册一个它关心的事件类型
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
上面nio的五个步骤是如何在netty中实现的?
下方代码是使用netty创建一个服务器的基本步骤
new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(NioSocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringDecoder()); } }) .bind(8080);
我们知道EventLoop包含了一个Selector和一个单线程执行器 ,也就是说.group(new NioEventLoopGroup()) 这行语句可以看为是完成Nio的第一步创建一个选择器的。
Nio剩下的四个步骤其实都是在.bind(8080); 这行语句完成的,然后我们点进bind()方法,接着会进入到第一个比较重要的方法doBind
private ChannelFuture doBind(final SocketAddress localAddress) { // initAndRegister()方法 所做的事情就是初始化和注册,相当于上面Nio的第二步和第三步 // 它会将ServerSocketChannel创建好后注册进Selector中。该方法返回一个Future对象,就说明该方法是异步的, final ChannelFuture regFuture = this.initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; // 这里就会利用future对象调用isDone()进行判断,如果上面initAndRegister()方法干的活比较快,就会执行if语句, // 但是一般情况下initAndRegister()方法中的nio线程将ServerSocketChannel和Selector进行绑定会比较慢 会执行else语句 } else if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); // doBind0()方法是相当于Nio的第四步 绑定端口,监听事件 doBind0(regFuture, channel, localAddress, promise); return promise; } else { final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel); // 从这里可以看出future对象采用了异步的方式执行下面的语句,下方的doBind0()方法也就不是主线程调用了 regFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.registered(); // 进入到else语句后会在这里执行doBind0()方法 AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
在正式开始之前需要了解ServerBootstrap.bind(8080);是主线程调用的,然后进入到doBind()方法 在进入到initAndRegister()方法中,直到创建ServerSocketChannel都是主线程做的事,包括register的前一部分都是主线程,但是在Register中会启动Nio线程,后续的 *** 作就不是在主线程中执行了,ServerSocketChannel注册进Selector中都是Nio线程做的事,如下图所示:
概述需要了解的就几件事
- init是创建ServerSocketChannel
- Register是将ServerSocketChannel注册进Selector中的,是nio线程执行的
- initAndRegister()会返回一个future对象,然后使用该对象进行if判断,一般情况下都会进入到else语句
- else语句中会利用future的异步方式,通过nio线程来执行doBind0()方法
init
接下来详细了解initAndRegister()方法中的init部分。
final ChannelFuture initAndRegister() { Channel channel = null; try { // 这行就是创建一个channel,创建的就是NioServerSocketChannel。 // 再点进newChannel()方法就会发现里面是利用了反射调用无参构造方法获取的对象 constructor.newInstance() // 这里不仅仅会创建NioServerSocketChannel。还会创建jdk的ServerSocketChannel channel = this.channelFactory.newChannel(); // 创建NioServerSocketChannel对象后就调用了init()方法,具体方法如下方代码所示 this.init(channel); } catch (Throwable var3) { if (channel != null) { channel.unsafe().closeForcibly(); return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3); } return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3); } // 上面的init部分 从这里开始就是register部分了 ChannelFuture regFuture = this.config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
创建NioServerSocketChannel对象后就调用了init()方法
void init(Channel channel) throws Exception { 。。。 ChannelPipeline p = channel.pipeline(); 。。。 // 这里就会发现创建NioServerSocketChannel后会往该channel的pipeline中添加一个Handler // 这个ChannelHandler和其他hander不同的地方在于该handler的initChannel()方法只会执行一次 // 这里执行往pipeline中添加handler哦 还没有到执行的地步哦 p.addLast(new ChannelHandler[]{new ChannelInitializer() { public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = ServerBootstrap.this.config.handler(); if (handler != null) { pipeline.addLast(new ChannelHandler[]{handler}); } ch.eventLoop().execute(new Runnable() { public void run() { pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)}); } }); } }}); }
所以initAndRegister()方法中的init部分的作用就是
- 创建了一个NioServerSocketChannel,
- 并往该channel的pipeline中添加了一个Handler。
Register
接下来就轮到了Register部分
final ChannelFuture initAndRegister() { Channel channel = null; // init try { channel = this.channelFactory.newChannel(); this.init(channel); } catch (Throwable var3) { if (channel != null) { channel.unsafe().closeForcibly(); return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3); } return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3); } // 上面的init部分 从这里开始就是register部分了 ChannelFuture regFuture = this.config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture;
首先是register部分的第一行代码ChannelFuture regFuture = this.config().group().register(channel);该方法返回的是有个ChannelFuture对象,那么我们笃定该方法是异步的。然后我们点进该方法,多点几次就会进入到核心
经过的类如下图
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8bC8ZvbT-1628604267435)(E:Java笔记pictureimage-20210806125339453.png)]
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } else if (AbstractChannel.this.isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); } else if (!AbstractChannel.this.isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); } else { AbstractChannel.this.eventLoop = eventLoop; // 到目前为止都是主线程在执行,下面这个if就是判断当前线程是否是nio线程,所以自然而然就进入到else语句中 if (eventLoop.inEventLoop()) { this.register0(promise); } else { // 这里做的事情就是将正在做事的register0(promise)方法封装到了任务对象中,然后让eventLoop线程去执行 try { // 这里第一次调用execute()方法会创建EventLoop线程,然后取执行run()方法,而不是早就创建好线程直接用。 eventLoop.execute(new Runnable() { public void run() { AbstractUnsafe.this.register0(promise); } }); } catch (Throwable var4) { 。。。 } } } }
现在进入到register0(promise)方法体中
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !this.ensureOpen(promise)) { return; } boolean firstRegistration = this.neverRegistered; // 在netty源码中,一般do开头的方法就是真正做事的方法,具体功能如下方的代码 AbstractChannel.this.doRegister(); this.neverRegistered = false; AbstractChannel.this.registered = true; // doRegister()方法结束后还有下面这个方法也比较重要。我们上面在init的步骤中往channel的pipeline中添加了一个Handler,但是还没有被调用。下面这个方法就是调用init添加的Handler。 // 那个Handler的作用就是又往pipeline中添加另一个Handler,用来当acceptor事件发生后建立连接的 AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded(); // 在最开始的代码中主线程调用了initAndRegister()方法 返回了一个future对象,然后采用了异步的方式执行一些代码,异步的方法体需要等待Future对象中有数据了才会执行,那么谁来给数据嘞?就是下面这个safeSetSuccess()方法。 // 下面方法体中的promise就是initAndRegister()方法返回的future对象 方法意思就是给这个promise设置一个安全的成功值 // 这里给了结果 initAndRegister()方法返回的Future对象 异步方式的回调方法就会被执行 会执行方法体中的doBind0()方法。 回调方法体如下: this.safeSetSuccess(promise); AbstractChannel.this.pipeline.fireChannelRegistered(); if (AbstractChannel.this.isActive()) { if (firstRegistration) { AbstractChannel.this.pipeline.fireChannelActive(); } else if (AbstractChannel.this.config().isAutoRead()) { this.beginRead(); } } } catch (Throwable var3) { 。。。 } }
再进入到doRegister()方法中 这里会进入到AbstractNioChannel类的doRegister()方法
protected void doRegister() throws Exception { boolean selected = false; while(true) { try { // this.javaChannel()就是拿到jdk原生的ServerSocketChannel // 然后在用jdk原生的ServerSocketChannel调用register()方法, // 该方法之前参数1需要绑定的Selector就是从我们现在的EventLoop中得到,刚开始没有关注事件, // 最后的附件this就是NioServerSocketChannel // 所以下面就和nio时学的serverSocketChannel.register(selector, 0, attachment)一样 this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this); // 这里的附件NioServerSocketChannel 之前讲过,如果jdk原生的ServerSocketChannel发生了事件,是由NioServerSocketChannel 调用一些对应的方法进行处理。 return; } catch (CancelledKeyException var3) { 。。 } } }
initAndRegister()方法返回的Future对象 等待Future异步回调的方法
regFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.registered(); // 进入到else语句后会在这里执行doBind0()方法 AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); } } });
Register做的事情就是:
- 从主线程切换到nio线程
- 将ServerSocketChannel注册进Selector中,附件和NioServerSocketChannel进行绑定
- 再执行了init时添加的Handler,又往pipeline中又添加了一个Handler,用来处理将来发生的accept事件
- 并为promise赋值,也就是initAndRegister()方法返回的Future对象赋值,让future对象能够执行异步方法 在该方法中调用doBind0()方法
doBind0()
经过了initAndRegister()之后,就轮到执行Future异步方式的回调函数中 调用的doBind0()方法了
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // netty的老套路,保存是EventLoop线程执行 channel.eventLoop().execute(new Runnable() { public void run() { if (regFuture.isSuccess()) { // 继续往下面执行 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
一直往里面点
最后就进入到AbstractChannel类中的bind()方法
public final void bind(SocketAddress localAddress, ChannelPromise promise) { this.assertEventLoop(); if (promise.setUncancellable() && this.ensureOpen(promise)) { ... boolean wasActive = AbstractChannel.this.isActive(); try { // 正真正做事的方法 do开头 它会执行ServerSocketChannel与端口号的绑定 AbstractChannel.this.doBind(localAddress); } catch (Throwable var5) { this.safeSetFailure(promise, var5); this.closeIfClosed(); return; } // 在doBind()之后运行, // 这里if就是判断就是 判断当前的ServerSocketChannel是否可以用了 是否是Active状态 if (!wasActive && AbstractChannel.this.isActive()) { this.invokeLater(new Runnable() { public void run() { // 如果当前channel是Active状态了就会执行下面的语句 // 作用是触发channel中pipeline里面的所有Handler的active事件 AbstractChannel.this.pipeline.fireChannelActive(); } }); } this.safeSetSuccess(promise); } }
点进doBind()方法 选择 NioServerSocketChannel类中的doBind()方法:
protected void doBind(SocketAddress localAddress) throws Exception { // 首先判断jdk版本是否大于等于7 if (PlatformDependent.javaVersion() >= 7) { // this.javaChannel() 就是jdk的ServerSocketChannel,后面指定端口与设置全连接队列的大小 this.javaChannel().bind(localAddress, this.config.getBacklog()); } else { this.javaChannel().socket().bind(localAddress, this.config.getBacklog()); } }
doBind()方法执行完后会执行pipeline.fireChannelActive()方法,触发channel中pipeline里面的所有Handler的active事件。当前pipeline中的handler是:head–>acceptor–>tail。后两个Handler即使触发了active也不会做什么事情,主要做事的还是Head这一个Handler ,然后点进pipeline.fireChannelActive()方法
public void channelActive(ChannelHandlerContext ctx) { ctx.fireChannelActive(); // 真正让SelectorKey关注accept事件是下面的方法执行的 this.readIfIsAutoRead(); }
再往下执行 最后会进入到AbstractNioChannel类的doBeginRead()方法
protected void doBeginRead() throws Exception { // 这里就是SelectionKey SelectionKey selectionKey = this.selectionKey; if (selectionKey.isValid()) { this.readPending = true; int interestOps = selectionKey.interestOps(); // 这里首先判断SelectionKey是否已经关注Accept事件,如果没有才会执行下面的方法 if ((interestOps & this.readInterestOp) == 0) { // 这里的 | 就是+号 this.readInterestOp的值就是16 // nio中 selectionKey.interestOps(SelectionKey.OP_ACCEPT); 这里的SelectionKey.OP_ACCEPT也是16 selectionKey.interestOps(interestOps | this.readInterestOp); } } }
到目前为止,nio中的五个步骤在netty中都执行到了
EventLoop
NioEventloop的重要组成:selector、线程、任务队列。我们现在找到这三个在源码中的位置
// selector public final class NioEventLoop extends SingleThreadEventLoop { ... private Selector selector; private Selector unwrappedSelector; private SelectedSelectionKeySet selectedKeys; }
// 线程和任务队列在NioEventLoop的父类的父类中 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { ... private final QueuetaskQueue; // 任务队列 private volatile Thread thread; // 线程 ... private final Executor executor;// 这个就是一个单线程的线程池 也就是上面的那个线程 // 因为NioEventloop只有一个线程,但我们可能会提交多个任务,单线程同一时刻就只能执行一个任务,多出来的任务就放在任务队列中 // 然后由线程从队列中依次取出任务来执行 }
再进入到曾祖父类AbstractScheduledEventExecutor
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { ... // 处理定时任务的任务队列 PriorityQueue> scheduledTaskQueue; }
NioEventLoop即会处理io任务,也会处理普通任务,还有定时任务。
Selector何时被创建
在NioEventloop类的构造方法中
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } else if (strategy == null) { throw new NullPointerException("selectStrategy"); } else { this.provider = selectorProvider; // 这里会调用openSelector()方法 NioEventLoop.SelectorTuple selectorTuple = this.openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; this.selectStrategy = strategy; } }
进入到openSelector()
private NioEventLoop.SelectorTuple openSelector() { final AbstractSelector unwrappedSelector; try { // 这里实际上就是创建一个Selector对象 unwrappedSelector = this.provider.openSelector(); } catch (IOException var7) { throw new ChannelException("failed to open a new selector", var7); } 。。。 }
以前我们在nio中使用的是Selector.open()方法创建是Selector,这里对比一下这两种方式有什么区别
public abstract class Selector implements Closeable { protected Selector() { } public static Selector open() throws IOException { // 可以看到 nio调用的open()方法内部也是和netty调用的一样的方法 return SelectorProvider.provider().openSelector(); } 。。。 }
Selector何时被创建?
在NioEventLoop的构造方法中被创建。
两个Selector成员变量
为什么在NioEventLoop中会两个Selector成员变量
// selector public final class NioEventLoop extends SingleThreadEventLoop { ... // 在源码中也就是下面这两个Selector,各自有什么作用 private Selector selector; private Selector unwrappedSelector; private SelectedSelectionKeySet selectedKeys; }
在上面创建Selector时,调用nio底层方法provider.openSelector()创建的Selector其实是赋值给了unwrappedSelector。为什么netty还要再加一个Selector嘞?因为在nio原生的Selector会有一个SelectionKeys集合,将来发生了事件我们要从个这里面获取事件的信息。这个集合的实现默认使用的set集合,我们都知道set遍历的性能并不高,因为它的底层是一个hash表,遍历hash表会先去遍历每个hash桶,然后再去遍历每个链表。
因为遍历的性能并不高,所以netty做了这样一个优化,将nio内部SelectionKeys集合给替换掉了,换为了基于数组的实现。
// 具体的实现还是在NioEventloop类的openSelector()方法中 private NioEventLoop.SelectorTuple openSelector() { final AbstractSelector unwrappedSelector; try { // 创建selector unwrappedSelector = this.provider.openSelector(); } catch (IOException var7) { throw new ChannelException("failed to open a new selector", var7); } 。。。 // 内部基于数组的实现 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); 。。。 try { // 这里是利用反射 先拿到Selector的实现类,然后获得私有成员变量 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); 。。。 // 使用netty提供的一个反射工具类,将这个私有成员变量 暴力反射 然后可以调用 Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } else { // 暴力反射 cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } else { // 反射调用这两个成员变量,然后将原始的Selector对象用netty提供的替换掉 selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } } 。。。 }
为什么在NioEventLoop中会两个Selector成员变量?
为了在遍历SelectinKeys时提高效率
- private Selector selector; 这个是包装后的 内部SelectinKeys基于数组实现的Selector
- private Selector unwrappedSelector; 这个是原始的Selector
EventLoop的nio线程何时被启动
第一次调用execute()方法时会创建EventLoop线程
EventLoop eventLoop = new NioEventLoopGroup().next(); eventLoop.execute(()->{ System.out.println("第一次调用execute()方法时会创建EventLoop线程"); })
主线程调用execute()方法,底层执行流程如下
public void execute(Runnable task) { // 首先判断方法的参数 Runnable对象是否为null if (task == null) { throw new NullPointerException("task"); } else { // 然后在判断当前线程是否为nio线程,这里是主线程调用的execute()方法 所以这里会返回fasle boolean inEventLoop = this.inEventLoop(); this.addTask(task); // 把这个任务加入到任务队列中 if (!inEventLoop) { // false在取反就为true 就会进入到if里面 this.startThread(); // 然后就会执行startThread()方法首次开启这个线程 。。。 }
下面为SingleThreadEventExecutor类的startThread()方法
private void startThread() { // 这里第一次进来时 state的值为1 所以第一次这里的条件满足 if的第二个条件就是将state从1改为2 // 所以 当以后再次调用该方法 if的条件就不会成立了,只有第一次会成立 if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) { boolean success = false; try { // 这里就是开启线程 , 如果开启成功就会将success变量变为true,所以findlly语句中也就不会执行了 // 如果开启线程抛异常了,finally语句就又会将state从2变为1 this.doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, 2, 1); } } } }
我们在点进doStartThread();方法
private void doStartThread() { // EventLoop中的线程还为null assert this.thread == null; // 这里的executor就是单线程的线程池 这里使用这里面的nio线程执行一个任务 this.executor.execute(new Runnable() { public void run() { // 执行的任务就是把当前的nio线程赋值为 EventLoop中的thread成员变量。到这里一步,thread也就有值了 SingleThreadEventExecutor.this.thread = Thread.currentThread(); if (SingleThreadEventExecutor.this.interrupted) { SingleThreadEventExecutor.this.thread.interrupt(); } ... label1907: { try { var112 = true; // 这里的run方法也比较重要,它做的事是 一个死循环, 不断的去找任务、定时任务、io事件去执行 SingleThreadEventExecutor.this.run(); ... } } } }
EventLoop的nio线程何时被启动
首次调用execute方法时启动,重复调用该方法也不会启动多次线程,因为底层有一个if判断,第一次启动有一个状态为statu值为1,当线程启动成功后就会将值变为2,所以再次调用该方法if判断也不会成立。
提交普通任务会不会结束Selector阻塞
上面说过,首次调用execute方法启动nio线程,还会调用一个run()方法,启动一个死循环,代码如下
protected void run() { for (;;) { try { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // 当有事件发生的时候就会调用下方的select()方法 case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); 。。。
在点进select()方法
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; 。。。 // 这里就是nio时我们见过的阻塞方法,避免线程一直在运行死循环,和我们使用时不一样的是这里不是调用的无参的select()方法 // netty调用的是有参的select()方法,当这里面的超时时间到了之后就会解除阻塞,之所以采用有参的select()方法 // 这是因为EventLoop不仅仅要处理io事件,还要处理一些其他任务,它不能一直阻塞。 // 这里当超时时间到了会解除阻塞,或者是有新任务提交也会唤醒它 以便及时的处理io事件之外的任务 int selectedKeys = selector.select(timeoutMillis); selectCnt ++;
当有普通任务提交了,nio线程这里的超时时间还没到这里还是阻塞的,那到底是如何唤醒这里的嘞?
这是因为当主线程调用execute()方法,然后再从里面调用startThread()方法,该方法会判断status是否值为1,也就是首次被调用,然后在调用doStartThread()方法,这个方法里面就会启动Executor单线程池里面的nio线程,主线程到这暂时结束,nio线程为thread赋值,然后调用我们这里的死循环run()方法,再进入switch分支调用一个方法 进而导致selector.select(timeoutMillis)阻塞。
当有普通任务了,通过eventLoop.execute(()->{..})添加普通任务,主线程又会执行一次execute()方法,又会调用startThread()方法,这时已经不是第一次调用该方法了所以不会做什么事情,然后startThread()方法执行完后接着执行execute()方法会调用一个wakeup()方法来唤醒nio阻塞的线程去执行普通任务。
提交普通任务会不会结束Selector阻塞
主线程会调用wakeup()方法唤醒阻塞的nio线程来执行普通任务。
wakeup()方法
通过上面引出了wakeup()方法,这里详细介绍该方法的执行条件
在NioEventLoop类中
protected void wakeup(boolean inEventLoop) { if (!inEventLoop && this.wakenUp.compareAndSet(false, true)) { this.selector.wakeup(); } }
首先这里对if判断条件进行解读,前面一部分是表示只有EventLoop线程之外的线程提交任务才有机会执行wakeup()方法。如果的EventLoop线程自己提交的任务就会走其他的逻辑。wakenUP变量在定义时的类型是AtomicBoolean,它是采用cas的方式去设置值,多个线程在同一个时刻修改它的值只会有一个成功。它的作用是什么?
因为this.selector.wakeup();是一个比较耗费性能的 *** 作,所以我们应该避免对它的频繁调用。将来会有这样一个情况,有多个线程都来提交任务,都走到了上面这个地方,那么selector需要唤醒几次嘞,其实1次就足够了。所以就用到了wakenUp这个原子变量。
何时进入Select分支进行阻塞
在上面我们知道主线程首次调用eventLoop.execute(Runnable run) —>调用startThread()方法 —>判断statu是否为1 是否为首次调用execute()方法 —>调用doStartThread()方法 nio线程接手主线程运行—>SingleThreadEventExecutor.this.run() —> 死循环
protected void run() { for (;;) { try { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: // 这里才会让EventLoop线程 selector.select(timeoutMillis) 进入阻塞 // 那么什么条件下才会进入到这条分支嘞? select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } } catch (IOException e) { rebuildSelector0(); handleLoopException(e); continue; } 。。。 }
决定进入select分支的条件就是switch中的代码selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())。
点进方法:
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { // 这里会根据上面传递过来的boolean变量决定,如果为false时就会走select阻塞分支 // 这个Boolean变量的作用就是当前是否有任务,如果没有任务就会进入select阻塞分支 return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; }
如果有任务 selectSupplier.get()又会有什么作用。get()方法中又调用了selectNow()方法
int selectNow() throws IOException { try { // 之前nio讲select()方法时讲了三个使用,一个是空参的select,第二个就是带超时时间的select 第三个就是selectNow() // 该方法与前两个不同的地方是它不会阻塞,它会在selector上立刻查看是否有事件发生,如果没有就返回0 return selector.selectNow(); } finally { // restore wakeup state if needed if (wakenUp.get()) { selector.wakeup(); } } }
所以,当没有任务时,才会进入Select分支,进行阻塞。如果有任务时 最终会调用selectNow()方法返回当前的任务数,并跳出switch多分支语句,执行switch下面的代码来处理任务。
会阻塞多久
当没有事件发生时,进入了select分支,最终执行selector.select(timeoutMillis); 那么这个超时时间为多久嘞,源码如下:
long currentTimeNanos = System.nanoTime(); // 当前时间 // 截止时间=当前时间+1秒 如果有定时任务 截止时间=当前时间+下一个定时任务开始的事件 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { // 超时时间 这里又减去了当前时间,后面又加上0.5毫秒,最后的是将纳秒转换为毫秒 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; 。。。 }
所以当没有定时任务的情况下selector.select(timeoutMillis); 只会阻塞1秒左右。
那么什么时候会跳出select分支进入的select()方法的死循环
- 当前时间超过了截止时间,因为每一次循环都会重新为当前时间变量赋值
- 有任务发生了也会退出死循环
- 有事件发生了也会推出死循环
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); // 当前时间 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); // 截止时间 for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; // 当前时间超过了截止时间 会退出死循环 if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // hasTasks()是否有任务 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } // 阻塞 当有事件发生会解除阻塞 selectedKeys也不为0 int selectedKeys = selector.select(timeoutMillis); selectCnt ++; // 所以又事件发生这里也会退出死循环 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } 。。。 long time = System.nanoTime(); 。。。 currentTimeNanos = time; } // for end 。。 } catch (CancelledKeyException e) { .. } }
nio空轮询bug
nio中selector.select()方法有一个bug(jdk在linux环境下),本来正常情况下如果是无参的select()方法,只有在有事件发生时才会解除阻塞,如果是有超时时间的select()方法,没有事件发生需要等到超时时间才会解除阻塞。而这个bug有很小的几率会出现,那就是即使没有事件发生,超时时间也没到,也不会阻塞,特别是当好几个EventLoop线程都空轮询这就会很占用CPU资源。
netty解决了nio的空轮询bug,它解决的方法是采用一个循环计数的方式
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { // 就是这个selectCnt变量 初始值为0 int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } // 阻塞 当有事件发生会解除阻塞 selectedKeys也不为0 int selectedKeys = selector.select(timeoutMillis); // 每循环一次就会让计数++ selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } 。。。 long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; // 这里首先判断有没有设置一个 期望值 大于0 并且如果 selectCnt大于了这个期望值就会退出死循环 // 这个期望值默认 会读取运行时的环境变量io.netty.selectorAutoRebuildThreshold // 如果我们自己设置了值就以我们设置的为准,如果没有设置默认是512。 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 当发生了空轮询bug如果出现了就会调用下面的方法,作用是重新创建一个selector,替换掉旧的selector, // 还会把旧的selector中的一些信息赋值个新的,内部的实现还是比较复杂的。 selector = selectRebuildSelector(selectCnt); selectCnt = 1; break; } currentTimeNanos = time; } // for end 。。 } catch (CancelledKeyException e) { .. } }EventLoop—ioRatio
@Override protected void run() { for (;;) { try { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } } catch (IOException e) { rebuildSelector0(); handleLoopException(e); continue; } // 当阻塞解除后,就会继续执行switch语句下面的代码来处理任务或者是发生的事件 cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { // 如果ioRatio的值设置为100 就会先运行所有的io事件,然后在运行所有的普通任务 try { processSelectedKeys(); } finally { runAllTasks(); } } else {// else 会做两件事 // 首先获取当前时间 final long ioStartTime = System.nanoTime(); try { // 1. 处理所有的io事件 processSelectedKeys(); } finally { // 处理完io事件后在获取一次当前时间,在和之前的事件相减,得到处理io事件的时间 final long ioTime = System.nanoTime() - ioStartTime; // 2. 运行普通任务,普通任务能执行的事件就是 ioTime * (100 - ioRatio) / ioRatio 来控制的 // 当普通任务运行的时间超过了这里传递的事件 它就不会从任务队列中拿普通任务执行了 // 会等下一次循环 处理完io事件后再来执行普通任务,这样就实现了优先处理io事件 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } 。。 } }
如果普通任务的耗时比较长,那势必会影响到io事件的执行,毕竟EventLoop中只是一个单线程,netty为了避免因为普通任务的耗时较长影响到io事件,netty会做一个参数的控制——ioRatio 这个参数是控制处理io事件所占用的事件比例,它默认是50%
执行io事件,在哪进行事件判断
就从上面的源码中 ioRatio 参数的判断中 执行所有的io事件的方法processSelectedKeys(); 跟进这个方法,就会进入到下面的方法中
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { // 这里首先是拿到所有的SelectionKey final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null; // 然后获取附件,也就是NioServerSocketChannel ,这里为什么要拿到这个对象嘞, // 因为接下来要对SelectionKey进行各种各样的处理,也就是Handler,所以需要通过channel得到pipeline 在得到Handler final Object a = k.attachment(); // 拿到channel了就进行判断是否是NioChannel if (a instanceof AbstractNioChannel) { //然后就会进入到这个方法 processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTasktask = (NioTask ) a; processSelectedKey(k, task); } if (needsToSelectAgain) { selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
processSelectedKey(k, (AbstractNioChannel) a);方法的源码,就是在这里面进行各类事件的判断,然后进行相应的处理
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop != this || eventLoop == null) { return; } unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // 可连接事件 客户端的事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // 可写事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } // 可读事件和连接事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
accept流程
首先会议nio中accept的流程
- selector.select()阻塞
- 遍历SelectionKeys
- 判断事件类型
- 创建SocketChannel
- 注册进Selector中
- 利用SelectionKey监听read事件
其实前面三步已经在上面学习EventLoop的源码中已经实现了,接下来重点关注后面的三步
在processSelectedKey(k, (AbstractNioChannel) a);方法中进行事件判断
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ... try { int readyOps = k.readyOps(); // 可连接事件 客户端的事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // 可写事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } // 可读事件和连接事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
当服务器启动,客户端启动就会进入到unsafe.read();方法中
public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { // 这里就会创建SocketChannel 并将它设置为非阻塞的 , 这里的readBuf就是创建的NioSocketChannel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 这里就是把刚刚建立的连接当成一个消息 给pipeline中的Handler去处理。 // 目前的handler共有:head--> accept ---> tail,所以这里肯定是accept这个handler来进行处理 // 其实接下来的两步 将SocketChannel注册进select并且监听read事件都是这个handler做的事 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); 。。 } finally { 。。。 } }
创建SocketChannel 并将它设置为非阻塞的方法
protected int doReadMessages(List
接下来就不一步一步走了,最终它会到accept 这个Handler 中的方法中。最后会进入到ServerBootstrap类中的静态内部类ServerBootstrapAcceptor中的channelRead()方法
public void channelRead(ChannelHandlerContext ctx, Object msg) { // 这里的msg就是上面创建的NioSocketChannel final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); // 这里会对NioSocketChannel 设置一些参数 setChannelOptions(child, childOptions, logger); for (Entry, Object> e: childAttrs) { child.attr((AttributeKey
接着在进入register(child)方法,最终会进入到AbstractChannel类的register()方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } else if (AbstractChannel.this.isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); } else if (!AbstractChannel.this.isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); } else { AbstractChannel.this.eventLoop = eventLoop; // 这里会把当前线程和EventLoop线程进行判断,现在的线程虽然是EventLoop线程,但还是会进入到else语句中 // 这里因为目前的线程是NioServerSocketChannel的EventLoop线程,但是线程新建了一个NioSocketChannel,这两个channel肯定不能共用一个线程,所以就会进入到else中新开一个线程来执行register0(promise)方法 if (eventLoop.inEventLoop()) { this.register0(promise); } else { try { // 这里会拿到新的EventLoop,用新的EventLoop中的线程 eventLoop.execute(new Runnable() { public void run() { // 程序会走到这里。 AbstractUnsafe.this.register0(promise); } }); } catch (Throwable var4) { 。。。 } } } }
AbstractUnsafe.this.register0(promise);这行代码最后会调用doRegister()方法,
doRegister()方法就会先拿到jdk原生的SocketChannel,注册进当前EventLoop中的Selector中。并且把当前NioSocketChannel作为附件绑定上去
doRegister()方法结束后 , register0(promise)方法继续运行,调用pipeline.invokeHandlerAddedIfNeeded();方法,其实就是给现在的NioSocketChannel加一个Handler,这些handler就是我们自己在代码中写的 ,这里会把我们写的handler 加到channel中。
register0(promise)方法继续运行, 调用pipeline.fireChannelActive(); 这里的作用就是拿到SelectionKey,然后关注read事件,这最后会在AbstractNioChannel类的doBeginRead()方法中添加关注read事件
read流程
还是在这个地方处理读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
将断电打在这里,当第一次触发断点是连接,直接放行,再一次触发就是read事件了。
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); // 获取Bytebuf的分配器,决定Bytebuf是池化还非池化 final ByteBufAllocator allocator = config.getAllocator(); // 可以动态调整Bytebuf的大小,并强制使用直接内存 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 这里就是分配具体的Bytebuf byteBuf = allocHandle.allocate(allocator); // 客户端发送了数据,服务器端这里调用doReadBytes()方法后救护我那个Bytebuf中填充内容 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; // 找到当前NioServerSocketChannel上的pipeline,然后触发一个读事件, // 就是将这个Bytebuf依次传给入站Handler 依次去处理。 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
最后祝大家学有所成,所愿皆所得。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)