通过之前对provider启动过程的学习,我们知道,提供者默认是以Netty来启动对应协议端口来提供服务的。
Netty的标准启动模式下有两个线程组:boss和work线程组。
在接收到具体的请求后,如果服务提供者对该请求处理时间比较短,那么直接在work线程上处理即可;
如果服务提供者对该请求处理时间比较长,那么如果还在work线程上处理,则会阻塞其他请求的处理,降低了整个provider的处理能力。
如何解决这种问题呢?
可以通过自定义一个线程池,将work线程接收到的请求,交由该线程池来处理,这样就避免了请求都阻塞在work线程上。
基于这种解决方案,Dubbo提供了多种线程模式。我们一起来看下。
1.创建线程模型入口先来分析下线程模型创建的入口。
还是从NettyServer(Netty4下的)的创建开始
public class NettyServer extends AbstractServer implements RemotingServer { public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // 具体创建在ChannelHandlers.wrap()中 super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); } }1.1 ChannelHandlers.wrap()
public class ChannelHandlers { private static ChannelHandlers INSTANCE = new ChannelHandlers(); public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { // 本质上最终还是调用到这 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); } }
从NettyServer的构造方法中可以看到,传入的Handler通过不断的复合,最终以MultiMessageHandler --> HeartbeatHandler --> (Dispatcher生成的Handler) 这种顺序来处理请求。
所以,线程模式的创建是在NettyServer创建时就已经完成的。
2.任务执行线程模型线程模式本身通过SPI的方式来获取Dispatcher接口的实现类。具体有以下实现
下面具体来分析下每种Dispatcher的不同之处。
2.1 AllDispatcher(默认)当前类型Dispatcher,则会把接收到的所有请求(请求、响应、连接心跳等事件)都交由自定义的线程池来处理。
public class AllDispatcher implements Dispatcher { public static final String NAME = "all"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } } public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void connected(Channel channel) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { // 在接收到请求时,直接交由executor线程池处理 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } }
通过源码可知:针对以上覆写的几种请求类型,全部交由自定义线程池来执行。
2.2 DirectDispatcher所有的请求都直接在work线程上执行,不交由自定义线程池来处理。
public class DirectDispatcher implements Dispatcher { public static final String NAME = "direct"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new DirectChannelHandler(handler, url); } } public class DirectChannelHandler extends WrappedChannelHandler { public DirectChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); if (executor instanceof ThreadlessExecutor) { try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } // 直接在当前线程执行handler *** 作 } else { handler.received(channel, message); } } }2.3 MessageonlyDispatcher
只有请求响应消息派发到自定义线程池,其他的连接、心跳等直接在work线程上执行
public class MessageonlyDispatcher implements Dispatcher { public static final String NAME = "message"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new MessageonlyChannelHandler(handler, url); } } public class MessageonlyChannelHandler extends WrappedChannelHandler { @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } }
只重写了received方法,其他的connect等方法直接在当前work线程上执行Handler相关方法
2.4 ExecutionDispatcher只有请求类型消息交由自定义线程池执行,其他类型消息都在work线程上执行
public class ExecutionDispatcher implements Dispatcher { public static final String NAME = "execution"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new ExecutionChannelHandler(handler, url); } } public class ExecutionChannelHandler extends WrappedChannelHandler { public ExecutionChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); // 这里会过滤消息类型,只有request请求才会交由自定义线程池执行 if (message instanceof Request) { try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (t instanceof RejectedExecutionException) { sendFeedback(channel, (Request) message, t); } throw new ExecutionException(message, channel, getClass() + " error when process received event.", t); } } else if (executor instanceof ThreadlessExecutor) { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } else { handler.received(channel, message); } } }2.5 ConnectionOrderedDispatcher
连接、断开事件放入队列中,在work线程上逐个执行,而其他类型的消息(请求、响应)则交由自定义业务线程池来执行
public class ConnectionOrderedDispatcher implements Dispatcher { public static final String NAME = "connection"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new ConnectionOrderedChannelHandler(handler, url); } } public class ConnectionOrderedChannelHandler extends WrappedChannelHandler { protected final ThreadPoolExecutor connectionExecutor; private final int queuewarninglimit; public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) { super(handler, url); String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); // 定义一个单线程的线程池,后续为执行连接、断开事件做准备 connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new linkedBlockingQueue总结:(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true), new AbortPolicyWithReport(threadName, url) ); // FIXME There's no place to release connectionExecutor! queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE); } @Override public void connected(Channel channel) throws RemotingException { try { // 连接事件被单线程池执行 checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { try { // 连接事件被单线程池执行 checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { // 请求响应信息则交由自定义线程池执行 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException) { sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } private void checkQueueLength() { if (connectionExecutor.getQueue().size() > queuewarninglimit) { logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit)); } } }
Dubbo的分层实在太细了,每层之间也是强解耦的。用户可以随时自定义相关Dispatcher来选择不同的执行请求方案。
下一篇我们再来分析下Dubbo线程池的模型。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)