紧接上文,在上文中,我们提到了Dispatcher的各种实现方案。通过AllDispatcher(默认)的设定,provider针对接收到的请求会转交由线程池来执行。
那么针对线程池而言,Dubbo是怎样创建的呢?有没有提供不同的创建方案呢?
1.创建线程池入口同样,我们先来从源码中了解下创建线程池的代码入口。
以AllDispatcher为例,在处理connected()请求时,执行代码如下
public class AllChannelHandler extends WrappedChannelHandler { @Override public void connected(Channel channel) throws RemotingException { // 在这里获取线程池,直接调用到父类的,具体见1.1 ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } }1.1 WrappedChannelHandler.getExecutorService() 获取线程池
public class WrappedChannelHandler implements ChannelHandlerDelegate { @Deprecated public ExecutorService getExecutorService() { return getSharedExecutorService(); } public ExecutorService getSharedExecutorService() { // 先通过SPI的方式获取ExecutorRepository,默认为DefaultExecutorRepository ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); // 针对url,线程池会做一个缓存 ExecutorService executor = executorRepository.getExecutor(url); if (executor == null) { // 第一次则默认会创建线程池,我们直接看创建线程池过程,具体见1.2 executor = executorRepository.createExecutorIfAbsent(url); } return executor; } }1.2 DefaultExecutorRepository.createExecutorIfAbsent() 创建线程池对象
public class DefaultExecutorRepository implements ExecutorRepository { public synchronized ExecutorService createExecutorIfAbsent(URL url) { String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY; if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { componentKey = CONSUMER_SIDE; } // 为consumer side(消费侧)创建线程池Map Mapexecutors = data.computeIfAbsent(componentKey, k -> new ConcurrentHashMap<>()); // 获取url的port信息,本质上就是本provider所对应的端口信息,在对应端口上创建线程池 Integer portKey = url.getPort(); ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url)); // If executor has been shut down, create a new one if (executor.isShutdown() || executor.isTerminated()) { executors.remove(portKey); executor = createExecutor(url); executors.put(portKey, executor); } return executor; } private ExecutorService createExecutor(URL url) { // 在这里创建线程池,还是通过SPI的方式来创建,可以根据URL的参数不同来创建不同的线程池类型。 return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); } }
通过线程池创建的过程我们了解到一个最重要的信息:provider根据port来创建线程池。
那么,如果当前provider以两种协议暴露出去(肯定是以不同的端口暴露的),那么针对这两个端口,会创建两个线程池来响应请求。
了解完,线程池创建的入口,下面就来看下ThreadPool的不同类型。
2.ThreadPool线程池模型2.1 FixedThreadPool
这个类似于Executors中设置的固定线程数的线程池。通过源码来看下
public class FixedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { // 默认线程池名为Dubbo String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); // 默认线程数为200 int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); // queue默认容量为0 int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue() : (queues < 0 ? new linkedBlockingQueue () : new linkedBlockingQueue (queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
FixedThreadPool作为Dubbo线程池的默认策略,默认创建200个固定线程,且queue队列默认容量为0,执行的拒绝策略为AbortPolicyWithReport。
通过以上信息我们可以看到,对Dubbo provider而言,每个端口处理请求的线程池线程数量默认为200,当200个线程都处于繁忙状态时,后续请求则直接抛出。
2.2 LimitedThreadPool线程池中线程数量动态增加直到最大值,线程空闲不回收
public class LimitedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); // core默认为0 int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); // 线程数量最大为200 int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS); // 队列默认容量为0 int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue2.3 EagerThreadPool() : (queues < 0 ? new linkedBlockingQueue () : new linkedBlockingQueue (queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
当线程所有的核心线程都处于忙碌状态时,则创建新的线程来执行任务
public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); // core默认为0 int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); // 最大线程数默认为Integer.MAX_VALUE int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE); // 队列容量默认为0 int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); // 线程60秒未执行任务则回收 int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE); // init queue and executor TaskQueuetaskQueue = new TaskQueue (queues <= 0 ? 1 : queues); EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; } }
与我们在Executors中创建的线程池的区别就是:针对EagerThreadPool而言,当core数量的线程都处于繁忙状态,则新请求进入后,直接创建新的线程来处理,一直达到最大线程数后,还有新请求进入则放入taskQueue中。
2.4 CachedThreadPoolpublic class CachedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); // core默认为0 int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS); // 最大线程数默认为Integer.MAX_VALUE int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE); // 队列容量默认为0 int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE); return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue() : (queues < 0 ? new linkedBlockingQueue () : new linkedBlockingQueue (queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
感觉与EagerThreadPool区别不大,大家了解即可
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)