在前面 Reactor 三种模型这篇文章中,已经对 Reactor 的三种模式作了简单的描述,那么在Netty 中是如何使用这个三种模式的呢?且Netty的内部对这三种模式是如何支持的呢?接下来就开始本篇的内容吧!
如何在Netty中使用Reactor三种模式Reactor 模式
Netty 使用示例
Reactor单线程模式
EventLoopGroup eventGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstra.group(eventGroup);
Reactor多线程模式
EventLoopGroup eventGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstra.group(eventGroup);
Reactor主从多线程模式
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstra.group(boosGroup, workGroup );
- Reactor 单线程模式需要显式的构建一个线程池书为 1 的 NioEventLoopGroup,然后传递给 ServerBootstrap。
- Reactor 多线程模式使用的 NioEventLoopGroup 默认构造器构建,这里不需要显式的指定线程的数量,因为在 NioEventLoopGroup构造器里面最后会根据默认的CPU内核数量来计算线程数。
- Reactor 主从多线程模式需要显式的声明两个 group,分别为:bossGroup和workGroup。它们分别负责接纳和分配工作,也就是接收连接并把创建的连接绑定到 work group 中的一个 worker 中,这个worker 本身用来处理连接上所发生的事件。
这里需要注意的是Netty 中的这三种Reactor 模式,与前面所说的文章中阐述的三种Reactor 模式是有一定区别的,前面所介绍的Reactor 模式中的Reactor多线程模式和Reactor主从多线程模式的线程池不是用于处理I/O事件的,只是仅仅用于处理业务的。而在Netty 中使用的这三种Reactor模式,它们的线程池是可以处理I/O事件的。
Netty 内部是如何支持 Reactor 模式的因为Reactor主从多线程模式是项目中的首选,所以这里就以Reactor主从多线程模式为例来进行。从前面所举例的Reactor主从多线程模式的代码实现上看,在Netty中所做的就是将两种通道分别注册到独立的Selector中。这两种独立的Selector都是NioEventLoop,每个NioEventLoop中都包含着一个Selector,而每一个NioEventLoop又可以当作线程。所以Netty中对Reactor模式的支持,是将两种通道分别绑定到两个独立的多线程组中,那么就来以代码来进行详细阐述。
1、创建主从Selector对于支持Reactor主从多线程模式的第一步,就是创建主从Selector。但是在Netty里面,却是创建两种类型的NioEventLoopGroup。对于每个Group而言,里面又会创建NioEventLoop。代码如下:
private Selector selector; private Selector unwrappedSelector; NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; // 创建 Selector final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; } private SelectorTuple openSelector() { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); } Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction
在上面的代码中,selectorProvider是由NioEventLoopGroup 中传入的,默认是使用SelectorProvider.provider()方法调用的返回值。有了 SelectorProvider 之后,就可以调用openSelector()来创建一个Selector来提供注册功能。这里所创建的SelectorProvider会作为 Selector的unwrappedSelector, 由此可见Selector 和NioEventLoop之前的关系是一一对应的。考虑到每个NioEventLoopGroup 对应多个NioEventLoop,所以最终创建的不是两个Selector,而是两组Selector,因为如果仅仅只是创建两个Selector,可能会出现性能瓶颈的问题。
这我们来回顾一下SelectorProvider.provider()方法的具体实现,代码如下:
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction() { public SelectorProvider run() { // 从配置中读取的 if (loadProviderFromProperty()) return provider; // SPI 方式 if (loadProviderAsService()) return provider; // 默认方式 provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
从上面代码中可以看出,除非显式的指定使用哪种Provider,否则会使用默认的当时创建Provider,在不同的平台上(在不同的Windows系统和Linux 系统版本来说,安装的JDK版本不同),sun.nio.ch.DefaultSelectorProvider.create()实现方式也是不一样的,例如MacOS平台的实现方式,代码如下:
public class DefaultSelectorProvider { private DefaultSelectorProvider() { } public static SelectorProvider create() { return new KQueueSelectorProvider(); } }
本小节代码的执行步骤是:
- io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()
- io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
- io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
- io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
- io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)
- io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
- io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...)
- io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
- io.netty.util.concurrent.MultithreadEventExecutorGroup#newChild
- io.netty.channel.nio.NioEventLoop
- io.netty.util.concurrent.MultithreadEventExecutorGroup#newChild
- io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)
- io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...)
- io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)
- io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)
- io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)
- io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)
- io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)
在有了Selector 之后,就可以注册Channel(这个思路就是NIO编程的思路),但是需要先创建通道。在通过调用io.netty.bootstrap.AbstractBootstrap#bind(int)方法来启动程序的时候,便会在其中调用io.netty.bootstrap.AbstractBootstrap#initAndRegister方法,代码如下:
final ChannelFuture initAndRegister() { Channel channel = null; try { // 开始创建 Channel channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // 开始 register Channel ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
上面代码中首先是调用channelFactory.newChannel()方法来创建Channel,也就是ServerSocketChannel。具体是通过ReflectiveChannelFactory来实现的。在ReflectiveChannelFactory的实现中,主要使用了反射、泛型等技术,创建SocketChannel类型则是由io.netty.bootstrap.AbstractBootstrap#channel方法来指定的,比如指定NioServerSocketChannel,具体代码实现如下:
//泛型+反射+工厂实现IO模式切换 public class ReflectiveChannelFactory3、注册 ServerSocketChannel给主 Selectorimplements ChannelFactory { private final Constructor extends T> constructor; public ReflectiveChannelFactory(Class extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { //获取无参构造器 this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); } } @Override //泛型T代表不同的Channel public T newChannel() { try { //反射创建channel return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } } @Override public String toString() { return StringUtil.simpleClassName(ReflectiveChannelFactory.class) + '(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)"; } }
有了Selector(NioEventLoop成员)和ServerSocketChannel之后,便需要将它们进行绑定,也就是把ServerSocketChannel绑定到bossGroup中的NioEventLoop上,这功能主要是由config().group().register(channel)这行代码实现的,而这里的io.netty.bootstrap.AbstractBootstrapConfig#group方法,就是boosGroup具体的注册过程,io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)方法代码如下:
@Override public ChannelFuture register(Channel channel) { return next().register(channel); }
上面的代码其实就是从boos group中进行选择(通过调用 next()方法)一个NioEventLoop进行注册,最终注册具体实现如下:
io.netty.channel.nio.AbstractNioChannel#doRegister
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { logger.info("initial register: " + 0); selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
上面代码javaChannel().register(eventLoop().unwrappedSelector(), 0, this)中的eventLoop().unwrappedSelector()方法就是把ServerSocketChannel注册到NioEventLoop中。
本小节代码的执行逻辑如下:
- io.netty.channel.EventLoopGroup#register(io.netty.channel.Channel)
- io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
- io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
- io.netty.channel.AbstractChannel.AbstractUnsafe#register
- io.netty.channel.AbstractChannel.AbstractUnsafe#register0
- io.netty.channel.AbstractChannel#doRegister
- io.netty.channel.nio.AbstractNioChannel#doRegister
- io.netty.channel.AbstractChannel#doRegister
- io.netty.channel.AbstractChannel.AbstractUnsafe#register0
- io.netty.channel.AbstractChannel.AbstractUnsafe#register
- io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
- io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
在完成Selector的构建和ServerSocketChannel的创建和注册后,就可以接收连接了。在服务器接收到连接之后,创建连接的过程就是创建SocketChannel代码如下:
io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction() { @Override public SocketChannel run() throws IOException { return serverSocketChannel.accept(); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
本小节执行顺序:
- io.netty.bootstrap.AbstractBootstrap#doBind
- io.netty.channel.DefaultChannelPromise#addListener
- io.netty.util.concurrent.DefaultPromise#addListener
- io.netty.util.concurrent.DefaultPromise#notifyListeners
- io.netty.util.concurrent.DefaultPromise#safeExecute
- io.netty.util.concurrent.SingleThreadEventExecutor#execute
- io.netty.util.concurrent.SingleThreadEventExecutor#startThread
- io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
- io.netty.util.concurrent.SingleThreadEventExecutor#run
- io.netty.channel.nio.NioEventLoop#processSelectedKeys
- io.netty.channel.nio.NioEventLoop#processSelectedKeysPlain
- io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.NioTask
) - io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
- io.netty.channel.nio.AbstractNioMessageChannel#doReadMessages
- io.netty.util.internal.SocketUtils#accept
- io.netty.channel.nio.AbstractNioMessageChannel#doReadMessages
- io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
- io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.NioTask
- io.netty.channel.nio.NioEventLoop#processSelectedKeysPlain
- io.netty.channel.nio.NioEventLoop#processSelectedKeys
- io.netty.util.concurrent.SingleThreadEventExecutor#run
- io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
- io.netty.util.concurrent.SingleThreadEventExecutor#startThread
- io.netty.util.concurrent.SingleThreadEventExecutor#execute
- io.netty.util.concurrent.DefaultPromise#safeExecute
- io.netty.util.concurrent.DefaultPromise#notifyListeners
- io.netty.util.concurrent.DefaultPromise#addListener
- io.netty.channel.DefaultChannelPromise#addListener
在有了代表连接实体的SocketChannel之后,就可以为Selector注册SocketChannel,主要就是调用io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead方法中的这个方法是负责对创建后的连接后的连接完成注册(调用io.netty.channel.EventLoopGroup#register(io.netty.channel.Channel)方法),代码如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry, Object> e: childAttrs) { child.attr((AttributeKey) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
到这里就已经展示完了Netty支持Reactor主从多线程模式的流程,Netty支持跨平台,且Netty中是可以通过判断是否只有一个NioEventLoopGroup来决定是否使用Reactor主从多线程模式,同时通过判断NioEventLoopGroup与多少个元素,来控制是否使用多线程模式。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)