前面几篇分析了netty的启动过程,这篇看一下服务端是如何接收请求,以及Netty是如何解决NIO的空轮转问题
正文io.netty.channel.nio.NioEventLoop @Override protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { // 获取策略值 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // 没有需要执行的任务 case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { // 获取已就绪的通道 strategy = select(curDeadlineNanos); } } finally { nextWakeupNanos.lazySet(AWAKE); } default: } } catch (IOException e) { // 异常处理,忽略 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } // 每次执行,数量+1 selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; // 执行任务和获取已就绪通道的IO事件 两个事件的比重 final int ioRatio = this.ioRatio; boolean ranTasks; // 处理IO事件的占比为100 if (ioRatio == 100) { try { // 先处理IO事件 if (strategy > 0) { processSelectedKeys(); } } finally { // 最后再处理任务 ranTasks = runAllTasks(); } } // 有已就绪的通道 else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { // 先处理IO事件 processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; // 最后再处理任务 ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } // 没有已就绪的通道,直接处理任务 else { ranTasks = runAllTasks(0); } // 运行过任务后,ranTasks会设置为true if (ranTasks || strategy > 0) { // 正常返回,selectCnt重置为0 selectCnt = 0; } // 【重点分析】判断selectCnt是否超过阈值,否则重建selector,解决NIO空轮转的问题 else if (unexpectedSelectorWakeup(selectCnt)) { selectCnt = 0; } } catch (CancelledKeyException e) { } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
io.netty.channel.nio.NioEventLoop private boolean unexpectedSelectorWakeup(int selectCnt) { // 异常情况处理。忽略 if (Thread.interrupted()) { return true; } // SELECTOR_AUTO_REBUILD_THRESHOLD 默认是512次,超过这个数字,开始重建selector if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 【重点分析】 rebuildSelector(); return true; } return false; } public void rebuildSelector() { if (!inEventLoop()) { execute(new Runnable() { @Override public void run() { rebuildSelector0(); } }); return; } rebuildSelector0(); } private void rebuildSelector0() { final Selector oldSelector = selector; final SelectorTuple newSelectorTuple; if (oldSelector == null) { return; } try { newSelectorTuple = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } // Register all channels to the new Selector. int nChannels = 0; for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; } int interestOps = key.interestOps(); key.cancel(); SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTasktask = (NioTask ) a; invokeChannelUnregistered(task, key, e); } } } selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector; try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)