dubbo源码学习总结

dubbo源码学习总结,第1张

dubbo源码学习总结 前言

 总结 内容主要包括dubbo 框架的spi 实现,服务暴露实现,服务调用实现,以及个人感觉里面比较好的设计分享。涉及的代码部分较多。主要是自己的主观理解,如有纰漏 可以随时指正。

RPC

rpc 框架在使用时可以让使用者调用远程的接口的时候犹如调本地接口一样,然而一个远程过程调用一定是会使用网络和序列化的,因此简单一点看,dubbo或者说所有的rpc框架提供的核心的能力就是通过动态代理的方式把接口的网络 *** 作和序列化 *** 作代理掉,对于使用者是透明的。dubbo在这个核心能力之上,外加了一些过滤,负载均衡,服务注册发现等功能,同时通过spi机制为这些功能加入了很多动态的扩展,为了和Spring一起使用,也有很多Spring的扩展,使得dubbo越来越庞大。因此理解dubbo,最好是从他核心去看,抽象出他的本质是要干什么,在去看他的扩展的东西。这样会清楚很多,否则面对庞大的框架代码就是盲人摸象。因此在此之前先总结下一个rpc的本质在java中的实现,然后对照的去它的代码中寻找实现点。

基于java实现简单的rpc

代码可以下载:GitHub - hbzhangwenjie/miniRPC: 使用java反射+动态代理模拟 RPC

dubbo或者说一个rpc抽象出核心的能力就是通过动态代理的方式把接口的网络 *** 作和序列化 *** 作代理掉。之所以他很复杂,是因为在这个核心的步骤之上加入了扩展功能。那么只管本质的话 通过一个动态代理和网络 *** 作就可以实现简单的rpc。

框架把这部分的东西做了,对使用者透明, 在这里 简单 展现出来,方便后面查看它的代码时会比较好理 

提供查询当前时间服务的接口
package api;


public interface TimeService {
    public long getCurrentTimeMillis() ;

    public Long getCurrentTimeSec();

}
服务端timeService的具体实现
package provide;

import api.TimeService;


public class TimeServiceImpl implements TimeService {

    @Override
    public long getCurrentTimeMillis()  {
        return System.currentTimeMillis();
    }

    @Override
    public Long getCurrentTimeSec()  {
        return System.currentTimeMillis()/1000;
    }
}
服务端通过暴露服务
package provide;

import static provide.Provider.beanFactory;

import api.TimeService;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;



public class Server extends Thread {

    @Override
    public void run() {
        System.out.println("server start...");
        try {
            // 打开服务端Socket
            ServerSocket serverSocket = new ServerSocket(9000);
            // 接受客户端的请求
            Socket socket = serverSocket.accept();
            while (null != socket) {
                //   ObjectInputStream是java 提供的序列化/反序列化的一种工具
                ObjectInputStream inStream = null;
                try {
                    inStream =
                            new ObjectInputStream(socket.getInputStream());
                } catch (Exception e) {

                }
                if (inStream == null) {
                    continue;
                }
                String invoker = null;
                try {
                    invoker = (String) inStream.readObject();
                } catch (Exception e) {
                }
                if (invoker == null) {
                    continue;
                }
                //这里 没有实现 exchange 层
                System.out.println("server receive...:" + invoker);
                //invoker 在框架中是一个类,这里用string简单代替
                String service = invoker.split(",")[0];
                String method = invoker.split(",")[1];
                //根据客户端传入的请求 找到具体的方法,使用实现类去执行,这里还还少一个类型,
                // api.TimeService,在框架中通过类型和名字找到唯一的bean
                //还有入参 这里简单实现没有写
                TimeService timeService = (TimeService) beanFactory.get(service);
                //通过反射执行服务端实现类的方法
                Class clazz = timeService.getClass();
                Method[] methods = clazz.getMethods();

                for (Method targetMethod : methods) {
                    if (method.equals(targetMethod.getName())) {
                        long currentTimeMillis = (long) targetMethod.invoke(timeService);
                        //序列化
                        ObjectOutputStream outStream =
                                new ObjectOutputStream(socket.getOutputStream());
                        // 返回结果
                        outStream.writeLong(currentTimeMillis);
                        outStream.flush();
                    }
                }
            }
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }
}
客户端发起请求的代理
package consumer.proxy;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;




public class ConsumerProxy implements InvocationHandler {

    private Socket socket;
    private Invoker invoker;

    public ConsumerProxy(Invoker invoker) {
        this.invoker = invoker;
        try {
            socket = new Socket("127.0.0.1", 9000);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ObjectOutputStream outStream =
                new ObjectOutputStream(socket.getOutputStream());
        //通过反射来构造 invoker,invoker还包括了,args 这个里 简单实现没有用到
        String rpcInvocation = invoker.getInterfaces().getTypeName() + "," + method.getName();
        outStream.writeObject(rpcInvocation);
        outStream.flush();
        ObjectInputStream inStream =
                new ObjectInputStream(socket.getInputStream());
        return inStream.readLong();
    }

    public Object getProxy() {
        return Proxy.newProxyInstance(
                Thread.currentThread().getContextClassLoader(),
                new Class[]{invoker.getInterfaces()},
                this
        );
    }
}

这样一个最简单的rpc调用就写好了,客户端是没有具体的实现的,具体的实现在服务端。客户端通过和服务端一起定义好的协议(invoker)通过一样的序列化方式。完成远程调用

启动服务端
package provide;

import java.util.HashMap;



public class Provider {

    public static final HashMap beanFactory = new HashMap<>();

    public static void main(String[] args) {
        //类比框架加载服务端的实现类
        beanFactory.put("api.TimeService", new TimeServiceImpl());
        //类比rpc框架 启动 监听一个端口
        Server server = new Server();
        server.start();
    }
}

启动客户端
​
package consumer;

import api.TimeService;
import consumer.proxy.ConsumerProxy;
import consumer.proxy.Invoker;


public class Consumer {

    public static void main(String[] args) {
        Invoker invoker = new Invoker(TimeService.class);
        ConsumerProxy consumerProxy = new ConsumerProxy(invoker);
        //这个客户端是类比框架根据接口代理生成的一个实现类,它把网络代理进去 ,使用方无感知
        //dubbo 中是框架启动时帮我们生成的这个代理类,通过扫描 @dubboReference 这2个注解来实现
        TimeService timeService = (TimeService) consumerProxy.getProxy();
        for (int i = 0; i < 10; i++) {
            System.out.println("consumer response:" + timeService.getCurrentTimeMillis());
        }
    }
}

上面是一个简单的例子,要能用还需要在关键步揍加上 服务发现,协议完善,等等,这些加上就是一个rpc框架,但是它的核心思路和整体的流程还是不变的。

dubbo

基于上面对rpc 本质的 总结,在看dubbo框架,它是一个扩展力非常强的框架,使用者可以不用改源码的情况下通过插件化替换一个rpc过程中的关键实现。其中包括,容错,负载均衡,调用前后过滤,消费方线程池(在dubbo里把主调方称为消费者),生产方线程池(在dubbo里把被调方称为生产者),协议(默认是dubbo协议,可以暴露成其他协议 http,grpc等),序列化,服务发现等都可以 基于使用方自己的需求进行替换。之所以这么灵活是因为dubbo实现了一套spi机制。

图例说明:

  • 图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
  • 图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
  • 图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
  • 图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。

   

exchange 层其实是指明这个rpc协议的交互方式,一般是 request/repsonse   或者subscription/publish  应该说所有的rpc都是前者 包括http, 消息中间件和一些物联网协议(mqtt)和im 是后者。

从图看,最重要的是 服务方怎么把impl 映射到invoker,调用方怎么把service 转成invoker,这都是用到了动态代理,另一个重点是 如何做到插件话,spi跟rpc没有关系,他是实现插件话的一种机制,在关键步骤通过spi把自己的实现替换框架原有的实现。对spi理解, 非常关键。


SPI

SPI ,全称为 Service Provider Interface,它通过在ClassPath路径下的约定的某个文件夹查找java文件,然后自动加载文件里所定义的类。在面向的对象的设计里,我们一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。为了实现在模块装配的时候不用在程序里动态指明,这就需要一种服务发现机制。java spi就是提供这样的一个机制:为某个接口寻找服务实现的机制。

java 的spi

在jdk6里面引进的一个新的特性ServiceLoader,从官方的文档来说,它主要是用来装载一系列的service provider。而且ServiceLoader可以通过service provider的配置文件来装载指定的service provider。当服务的提供者,提供了服务接口的一种实现之后,我们只需要在jar包的meta-INF/services/目录里同时创建一个以服务接口命名的文件。该文件里就是实现该服务接口的具体实现类。而当外部程序装配这个模块的时候,就能通过该jar包meta-INF/services/里的配置文件找到具体的实现类名,并装载实例化,完成模块的注入。最经常被使用的就是我们的程序加载数据库驱动jdk不关心连接什么数据库,只提供标准的数据库 *** 作,这些 *** 作都是接口,没有具体的实现,而具体的实现由数据库厂商去实现,并提供jar 包给我们的程序使用,如下 加载mysql的驱动时:

 

springboot 的spi

spring boot 的 spi 主要是用在自动装配上,springboot 使用自动装配的方式 消灭了spring xml 的装配过程。例如现在我们使用一些客户端 只需要引入对应的starter 然后按照它的约定配置 ,在配置文件写上对应的自己项目需要的值,就可以使用这个客户端了,这也是 springboot 约定大于配置 这一思想的实现方式。

springboot 使用@Configuration 和@bean 注解 替代了xml配置bean 的方式。那么如果我们不使用starter也是可以自己用这个2个注解写一个配置类来装配一个bean 注册到spring容器。 但是这个装配类怎么提供给其他项目使用,就可以把它通过spi 变成一个starter来实现。

在springboot的自动装配过程中,会加载meta-INF/spring.factories文件,而加载的过程是由SpringFactoriesLoader加载的。从CLASSPATH下的每个Jar包中搜寻所有meta-INF/spring.factories配置文件,然后将解析properties文件,找到指定名称的配置后返回。

@Configuration
@EnableConfigurationProperties(CosClientProperties.class)
public class CosClientAutoConfiguration {

    private CosClientProperties cosClientProperties;

    @Autowired
    public CosClientAutoConfiguration(CosClientProperties cosClientProperties) {
        this.cosClientProperties = cosClientProperties;
    }

    @Bean
    public COSClient getCOSClient() {
        //1 初始化用户身份信息(secretId, secretKey)。
        COSCredentials cred = new BasicCOSCredentials(cosClientProperties.getSecretId(),
                cosClientProperties.getSecretKey());
        // 2 设置 bucket 的区域, COS 地域的简称请参照 https://cloud.tencent.com/document/product/436/6224
        Region region = new Region(cosClientProperties.getRegionStr());
        ClientConfig clientConfig = new ClientConfig(region);
        // 3 生成 cos 客户端。
        return new COSClient(cred, clientConfig);
    }

    @Bean
    public CosService getCosService(COSClient cosClient) {
        return new CosServiceImpl(cosClient);
    }
}

import org.springframework.boot.context.properties.ConfigurationProperties;


@ConfigurationProperties(prefix = "cos.client")
public class CosClientProperties {

    private String secretId;
    private String secretKey;
    private String regionStr;

    public String getSecretId() {
        return secretId;
    }

    public void setSecretId(String secretId) {
        this.secretId = secretId;
    }

    public String getSecretKey() {
        return secretKey;
    }

    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }

    public String getRegionStr() {
        return regionStr;
    }

    public void setRegionStr(String regionStr) {
        this.regionStr = regionStr;
    }
}


 

如上一个cos 客户端的 就生成了,其他程序可以直接引入然后通过application.yml配置自己的cos信息 来注册一个cos客户端到Spring容器

如果不在spring.factories文件中配置,那么 其他项目引入这个jar包的话,还能通过自动装配这个bean吗?

答案是不一定,因为springboot 是通过扫描@Configuration注解,来装配一个bean,也就是只要能扫到这个配置类就可以,而springboot项目启动扫描@Configuration,@Component, @service时如果不配置@ComponentScan 的包路径那么,默认是扫描启动类的当前包路径,因此如果是第三方或者第二方包,他们的包路径不具备通用性的话,是加载不到这个配置类的因此需要通过在spring.factories文件告诉springboot去加载这个类。

springboot 的spi除了在自动装配中使用,还在一些暴露出给用户的扩展点里使用,基本上可以概括为3类postprocessor,

1.BeanPostprocessor : 初始化一个bean时可以扩展这个扩展点来 加一些自己的逻辑 

2.BeanFactoryPostProcessor(BeanDefinitionRegistryPostProcessor) 解析一个bean 时可以扩展这个扩展点来 加一些自己的逻辑 

3.EnvironmentPostProcessor 在项目启动加载application 和系统的配置时可以扩展,来加自己的逻辑,如把配置中心的配置从远程读取写到Spring的环境,项目像使用本地appllication.yml配置的一样使用这些配置。

像3这个扩展点就必须在spring.factories文件中配置才行,因为他发生在扫描装配bean之前。不依赖包路径。 其中1,2 基本是一些集成Spring的框架会用到,比如 @DubboService @DubboReference 这些注解 的实现就依赖 1,2的扩展点。

Dubbo的spi

dubbo的spi 实现的更加灵活 它可以做到在方法级别的spi。Dubbo 并未使用 Java 原生的 SPI 机制,而是对其进行了增强,使其能够更好的满足需求。在 Dubbo 中,SPI 是一个非常重要的模块。基于 SPI,我们可以很容易的对 Dubbo 进行拓展。Dubbo SPI 的相关逻辑被封装在了 ExtensionLoader 类中,通过 ExtensionLoader ,我们可以加载指定的实现类,先讲总结她的功能 后面总结它的实现,dubbo中的扩展点根据功能可以分为如下几类

普通的扩展点: 使用@spi 标示的接口,根据spi(“value”) 选择具体的实现类 和上诉的java /springboot 的spi 一样  通过 ExtensionLoader.getExtensionLoader(type).getExtension(name)来使用

自适应扩展点:根据方法的不同入参使用不同的实现类,依赖@adaptive注解实现 , 通过 ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension() 来使用

自激活扩展点:根据不同的条件 选择使用不同的实现类 多个 作用在fillter 依赖@Activate 注解实现,通过 ExtensionLoader.getExtensionLoader(RegistryServiceListener.class).getActivateExtension(url, key,group))  使用  ,通过key 从url中取出value,group 区分 消费方和提供方

扩展点自动装配: 实现类的构造函数包含这个扩展点,那么就是一个自动包装的扩展点,功能和aop类似。 (cluster

扩展点自动包装:加载扩展点时,扩展点实现类的成员如果为其它扩展点类型,ExtensionLoader 会自动注入依赖的扩展点。ExtensionLoader 通过扫描扩展点实现类的所有 setter 方法来判定其成员。功能和ioc类似。(spring 自动注入)

@SPI 

@SPI主要标示在接口上,标示这个接口是个扩展点,具体使用哪个实现是通过配置去找到具体实现类。这个注解可以接受一个value,这个值和文件(spi获取具体现实类全路径名的文件)里面的key 对应。dubbo的这个文件格式是key:实现类。例如

filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol

key也不是一定需要,不写key的话使用类名作为key

dubbo 会从3个路径是读取这个文件,它的代码如下

    private static final String SERVICES_DIRECTORY = "meta-INF/services/";
    private static final String DUBBO_DIRECTORY = "meta-INF/dubbo/";
    private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";

private Map> loadExtensionClasses() {
        cacheDefaultExtensionName();
        Map> extensionClasses = new HashMap<>();
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
        loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
        loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
        loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
        loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
        return extensionClasses;
    }

总结就是/meta-INF/services  /meta-INF/dubbo  /meta-INF/dubbo/internal  meta-INF/dubbo/external/

4个路径  (在trpcjava 这个注解 叫@Extension) 现在的dubbo已经不是直接写上 路径而是通过路径策略下文会出现

@Adaptive 

当这个注解标注在方法上的时候,这个方法的入参一定是带有URL这个在dubbo定义的一个对象的。在扫描到这个扩展点的这个方法的注解的时候,会生成一个代理类,在这个方法里,会获取URL,他是和这个接口名字一样的一个属性,(例如protocol接口的export方法有@adaptive注解它生成的代理类就会从url里面获取一个protocol属性的值)这个属性的值是来获取对应的扩展实现类的。如果异常就获取默认的扩展实现类,它生成这个类的方式就是通过string字符串去拼接这个代理类,然后在用编译器编译为class。它生成的字符串如下如下:例子是 Protocol的扩展Adaptive 

package org.apache.dubbo.rpc;

import org.apache.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {

    public void destroy() {
        throw new UnsupportedOperationException(
                "The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException(
                "The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }

    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invokerargument == null");
        }
        if (arg0.getUrl() == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        }
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null) {
            throw new IllegalStateException(
                    "Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        }
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader
                .getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null) {
            throw new IllegalArgumentException("url == null");
        }
        org.apache.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null) {
            throw new IllegalStateException(
                    "Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        }
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader
                .getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}
 

Protocol的源码是 上下对照就能清楚知道生成的代理类是怎么来的了spi中的值就是默认值

@SPI("dubbo")
public interface Protocol {

    int getDefaultPort();
    @Adaptive
     Exporter export(Invoker invoker) throws RpcException;
    @Adaptive
     Invoker refer(Class type, URL url) throws RpcException;
    void destroy();

}

@Adaptive标示在类上的时候比较特殊,且只能有一个实现类被它标示在类上。标示在类上,此种情况,表示拓展的代理类由人工编码完成,而不会自动生成,在 Dubbo 中,仅有两个类被 Adaptive 注解了,分别是 AdaptiveCompiler 和 AdaptiveExtensionFactory,,它一般会遍历其他所有的实现类来完成功能,比如AdaptiveExtensionFactory 这样就是和标示在方法上相对应,标示在方法上是根据条件选择一个扩展点。这里是遍历所由扩展点看哪个可以执行。

@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {

    private final List factories;

    public AdaptiveExtensionFactory() {
        ExtensionLoader loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List list = new ArrayList();
        for (String name : loader.getSupportedExtensions()) {
            list.add(loader.getExtension(name));
        }
        factories = Collections.unmodifiableList(list);
    }

    @Override
    public  T getExtension(Class type, String name) {
        for (ExtensionFactory factory : factories) {
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }

}

这个类的作用是为扩展点实例化的时候注入对象属性的,它遍历的主要是SpiExtensionFactory和SpringExtensionFactory 来获取它保存的一个实例。SpringExtensionFactory就是拿到Spring管理的那些bean。

总结dubbo中的自适应扩展点就是依赖@Adaptive注解实现,它的含义就是在运行时根据不同的配置选择接口的不同实现类的不同方法。

@Activate

主要是用在有多个扩展点的实现,需要跟据不同条件被激活的场景,主要用在filter上例如暴露服务ProtocolFilterWrapper的代码

        private static  Invoker buildInvokerChain(final Invoker invoker, String key, String group) {
        Invoker last = invoker;
        List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
//构造一个过滤链,FilterNode 是继承Invoker。FilterNode的invoke(Invocation) 方法是执行filter 的invoke(Invoker invoker, Invocation invocation) 方法
//最后一个是invoker 不是FilterNode ,所以最后一个就是真的invoke(Invocation) 前面都执行了filter
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                                         //真实的invoker , 下一个, 每一个的filter
                last = new FilterNode(invoker, last, filter);
            }
        }

        return last;
    }

它把filter一层一层的联在一起,这里很难理解 画个图 帮助自己记忆

总结 dubbo中的自动激活扩展点就是依赖这个注解实现。

包装类

还有一类扩展点它是包装类。在扫到的时候会根据构造函数里是否是该接口类型来判断它是否是一个包装类,如果它是包装类,实例化的时候会一层一层的包装,以真正有暴露协议功能的dubboProtocol实例化的时候为列。

它会实例化ProtocolFilterWrapper,然后把dubboProtocol当作构造的参数传入,然后ProtocolListenerWrapper又会把ProtocolFilterWrapper当作构造的参数传入,最后拿到的一个实例是ProtocolListenerWrapper的实例了。

调用这个实例的export的方法的时候是会经过这些个包装类的处理在调用正真能export的dubboProtocol的。

最后的结构就是ProtocolListenerWrapper持有ProtocolFilterWrapper的对象,ProtocolFilterWrapper又持有dubboProtocol 的对象,ProtocolListenerWrapper的export会调用ProtocolFilterWrapper的export,ProtocolFilterWrapper的export又会调用dubboProtocol 的export。完成了一些过滤等功能

它对应的代码在

 private T createExtension(String name) {
        Class clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set> wrapperClasses = cachedWrapperClasses;
//循环注入包装类
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                    type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
    }

扩展实现应用举例

真正通过dubbo暴露一个服务的时候,是serviceBean通过

ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

这个方法去获取一个protocol的扩展点 ,一般url是registry开头的,于是会得到一个registryProtocol,这个实现类会去把服务的信息放到注册中心,然后在用它自己持有的一个protocol对象去export,这个protocol 对象就是 因为registryProtocol 在实例化的时候他有setProtocol方法所以会通过extensionFactory 来获取实例来进行象注入(参见上面多AptiveExtensionFactory说明 ioc)。这个对象又会通过spiExtionFactory获取,得到的就是一个包装对象如上面介绍包装类例子所说的,经过包装类的一些过滤在经过自激活Activite 扩展点的 filter 过滤,最终用dubboProtocol打开一个端口就可以接受请求了。上面的registryProtocol 具体选择注册中心又是通过配置拿的注册中心扩展点。

ExtensionLoader 的关键实现
    private static final ConcurrentMap, ExtensionLoader> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);

在上面看到在获取扩展点时都是先ExtensionLoader.getExtensionLoader(type)。来获取某一个接口的扩展loader。上面的map是它的一个缓存,保证只会加载一次

ExtensionLoader.getExtensionLoader(type)是一个静态方法如下

    public static  ExtensionLoader getExtensionLoader(Class type) {
        if (type == null) {
            throw new IllegalArgumentException("Extension type == null");
        }
        if (!type.isInterface()) {
            throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
        }
        if (!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type (" + type +
                    ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
        }

        ExtensionLoader loader = (ExtensionLoader) EXTENSION_LOADERS.get(type);
        if (loader == null) {
//如果缓存没有那么新建一个
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader(type));
            loader = (ExtensionLoader) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }

ExtensionLoader 有一个私有的构造函数,因此他只有 自己可以创建

 private ExtensionLoader(Class type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

objectFactory 是一个ExtensionFactory 的实现类,它本身也是扩展点

@SPI
public interface ExtensionFactory {

    
     T getExtension(Class type, String name);

}


getAdaptiveExtension 表示获取一个自适应的扩展点,但是它有一个@Adaptive的扩展类

@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {

    private final List factories;

    public AdaptiveExtensionFactory() {
        ExtensionLoader loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List list = new ArrayList();
        for (String name : loader.getSupportedExtensions()) {
            list.add(loader.getExtension(name));
        }
        factories = Collections.unmodifiableList(list);
    }

    @Override
    public  T getExtension(Class type, String name) {
        for (ExtensionFactory factory : factories) {
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }

}

上面提到过, 只能有一个实现类是被标示@Adaptive,标示后,它是一个所有扩展点的代理,而标示在方法上的话就是通过参数选择一个扩展点来执行那个被标示的方法,而表示在类上,就相当于是遍历所有的扩展点,看哪一个可以执行。可以这么理解,标示在方法上 框架会自动生成一个代理类。标示在类上,这个类就是这个代理类,框架就不会自动生成了。

获取一个loader后就可以根据这个loader去获取对应的扩展点了

普通的扩展点

上面提过获取普通扩展点通过ExtensionLoader.getExtensionLoader(type).getExtension(name)。现在看看getExtension

    @SuppressWarnings("unchecked")
    public T getExtension(String name) {
        return getExtension(name, true);
    }

    public T getExtension(String name, boolean wrap) {
        if (StringUtils.isEmpty(name)) {
            throw new IllegalArgumentException("Extension name == null");
        }
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        final Holder holder = getOrCreateHolder(name);
        Object instance = holder.get();
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    instance = createExtension(name, wrap);
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
    } 

holder 只是提供set,get  方法, getOrCreateHolder从缓存的map cachedInstances中获取实例。如果没有则创建

    @SuppressWarnings("unchecked")
    private T createExtension(String name, boolean wrap) {
        Class clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.getDeclaredConstructor().newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);


            if (wrap) {

                List> wrapperClassesList = new ArrayList<>();
                if (cachedWrapperClasses != null) {
                    wrapperClassesList.addAll(cachedWrapperClasses);
                    wrapperClassesList.sort(WrapperComparator.COMPARATOR);
                    Collections.reverse(wrapperClassesList);
                }

                if (CollectionUtils.isNotEmpty(wrapperClassesList)) {
//遍历的构造wapper 把真正的扩展点包在最里面。wapper可以做些aop 的功能 例如 clusterWapper 是调用发起的最上层,在这里可以做统一异常 ,进出口日志,的功能
                    for (Class wrapperClass : wrapperClassesList) {
                        Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
                        if (wrapper == null
                                || (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) {
                            instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                        }
                    }
                }
            }

            initExtension(instance);
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                    type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
    }

wrap 默认是true ,表示创建的时候 是否 构建wrapper,新加的功能 以前看还没有的。waaper 就是 dubbo 框架的aop 实现。injectExtension 方法就是dubbo 的ioc实现,通过set 方法 为 实例 注入依赖。

    private T injectExtension(T instance) {

        if (objectFactory == null) {
            return instance;
        }

        try {
            for (Method method : instance.getClass().getMethods()) {
                if (!isSetter(method)) {
                    continue;
                }
                
                if (method.getAnnotation(DisableInject.class) != null) {
                    continue;
                }
                Class pt = method.getParameterTypes()[0];
                if (ReflectUtils.isPrimitives(pt)) {
                    continue;
                }

                try {
                    String property = getSetterProperty(method);
                    Object object = objectFactory.getExtension(pt, property);
                    if (object != null) {
                        method.invoke(instance, object);
                    }
                } catch (Exception e) {
                    logger.error("Failed to inject via method " + method.getName()
                            + " of interface " + type.getName() + ": " + e.getMessage(), e);
                }

            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }

这里就是利用了上面提到的ExtensionFactory 去找符合要求的实列进行注入。普通的扩展点就获取完了。这里有个点还没有 那就是创建实例的Class 是哪里来的。

加载扩展点

可以看到在createExtension 的第一行有一个getExtensionClasses ,可以想到我们在通过名称获取拓展类之前,首先需要根据配置文件解析出拓展项名称到拓展类的映射关系表(Map<名称, 拓展类>),之后再根据拓展项名称从映射关系表中取出相应的拓展类即可。

    private Map> getExtensionClasses() {
        Map> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    }

这里也是先检查缓存,若缓存未命中,则通过 synchronized 加锁。加锁后再次检查缓存,并判空。此时如果 classes 仍为 null,则通过 loadExtensionClasses 加载拓展类。下面分析 loadExtensionClasses 方法的逻辑。

    private Map> loadExtensionClasses() {
        cacheDefaultExtensionName();

        Map> extensionClasses = new HashMap<>();

        for (LoadingStrategy strategy : strategies) {
            loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
            loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
        }

        return extensionClasses;
    }

loadExtensionClasses 方法总共做了两件事情,一是对 SPI 注解进行解析,同时把它的默认名字存起来,之前说过 @SPI 注解的值就是默认的名字。和二是调用 loadDirectory 方法加载指定文件夹配置文件。

    private void cacheDefaultExtensionName() {
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if (defaultAnnotation == null) {
            return;
        }

        String value = defaultAnnotation.value();
        if ((value = value.trim()).length() > 0) {
            String[] names = NAME_SEPARATOR.split(value);
            if (names.length > 1) {
                throw new IllegalStateException("More than 1 default extension name on extension " + type.getName()
                        + ": " + Arrays.toString(names));
            }
            if (names.length == 1) {
                cachedDefaultName = names[0];
            }
        }
    }

LoadingStrategy是加载策略其实就是告诉要加载哪些文件夹例如其中一个策略,有四个实现类 对应了 最开始说的四个路径

public class DubboLoadingStrategy implements LoadingStrategy {

    @Override
    public String directory() {
        return "meta-INF/dubbo/";
    }

    @Override
    public boolean overridden() {
        return true;
    }

    @Override
    public int getPriority() {
        return NORMAL_PRIORITY;
    }


}

loadDirectory

  private void loadDirectory(Map> extensionClasses, String dir, String type,
                               boolean extensionLoaderClassLoaderFirst, boolean overridden, String... excludedPackages) {
        String fileName = dir + type;
        try {
            Enumeration urls = null;
            ClassLoader classLoader = findClassLoader();

            // try to load from ExtensionLoader's ClassLoader first
            if (extensionLoaderClassLoaderFirst) {
                ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
                if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
                    urls = extensionLoaderClassLoader.getResources(fileName);
                }
            }

            if (urls == null || !urls.hasMoreElements()) {
                if (classLoader != null) {
                    urls = classLoader.getResources(fileName);
                } else {
                    urls = ClassLoader.getSystemResources(fileName);
                }
            }

            if (urls != null) {
                while (urls.hasMoreElements()) {
                    java.net.URL resourceURL = urls.nextElement();
                    loadResource(extensionClasses, classLoader, resourceURL, overridden, excludedPackages);
                }
            }
        } catch (Throwable t) {
            logger.error("Exception occurred when loading extension class (interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }

loadDirectory 方法先通过 classLoader 获取所有资源链接,然后再通过 loadResource 方法加载资源。继续跟下去,看一下 loadResource 方法的实现

 private void loadResource(Map> extensionClasses, ClassLoader classLoader,
                              java.net.URL resourceURL, boolean overridden, String... excludedPackages) {
        try {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
                String line;
                String clazz = null;
                while ((line = reader.readLine()) != null) {
//定位 # 字符
                    final int ci = line.indexOf('#');
                    if (ci >= 0) {
//截取#之前的字符串,之后的是注释
                        line = line.substring(0, ci);
                    }
                    line = line.trim();
                    if (line.length() > 0) {
                        try {
                            String name = null;
//以等号为界 截取值
                            int i = line.indexOf('=');
                            if (i > 0) {
                                name = line.substring(0, i).trim();
                                clazz = line.substring(i + 1).trim();
                            } else {
                                clazz = line;
                            }
                            if (StringUtils.isNotEmpty(clazz) && !isExcluded(clazz, excludedPackages)) {
//通过loadclass 加载类
                                loadClass(extensionClasses, resourceURL, Class.forName(clazz, true, classLoader), name, overridden);
                            }
                        } catch (Throwable t) {
                            IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
                            exceptions.put(line, e);
                        }
                    }
                }
            }
        } catch (Throwable t) {
            logger.error("Exception occurred when loading extension class (interface: " +
                    type + ", class file: " + resourceURL + ") in " + resourceURL, t);
        }
    }

上面是解析扩展文件例如:providerThreadPool=com.providerthreadpool.ProviderThreadPool  扩展使用自己的线程池

loadClass 方法 *** 作了不同的缓存,比如 cachedAdaptiveClass、cachedWrapperClasses 和 cachedNames ExtensionClass,在获取扩展点的时候 如上面说了的获取普通的扩展带你。就是直接读取这些缓存那到class,然后实例化,除此之外,该方法没有其他什么逻辑了。

到此,关于缓存类加载的过程就分析完了。

    private void loadClass(Map> extensionClasses, java.net.URL resourceURL, Class clazz, String name,
                           boolean overridden) throws NoSuchMethodException {
//判断是否是 扩展点的实现类
        if (!type.isAssignableFrom(clazz)) {
            throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                    type + ", class line: " + clazz.getName() + "), class "
                    + clazz.getName() + " is not subtype of interface.");
        }
//检测目标类上是否有Adaptive 注解
        if (clazz.isAnnotationPresent(Adaptive.class)) {
//缓存adaptive ,上面提过 一个扩展点只有一个 多个类上有这个注解会抱异常,如果没有类上有,框架自动生成一个。
            cacheAdaptiveClass(clazz, overridden);
        } else if (isWrapperClass(clazz)) {
//换成wapper
            cacheWrapperClass(clazz);
        } else {
//进入普通点的处理,如果咩有默认不带参数的构造方法 抱异常,所以每一个普通的扩展点必须有无参的构造函数
            clazz.getConstructor();
            if (StringUtils.isEmpty(name)) {
            // 如果 name 为空,则尝试从 Extension 注解中获取 name,或使用小写的类名作为 name
                name = findAnnotationName(clazz);
                if (name.length() == 0) {
                    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
                }
            }
        // 切分 name
            String[] names = NAME_SEPARATOR.split(name);
            if (ArrayUtils.isNotEmpty(names)) {
//判断是否是activate 有则缓存
                cacheActivateClass(clazz, names[0]);
                for (String n : names) {
                  // 存储名称到 Class 的映射关系  class 到名称的映射

                    cacheName(clazz, n);
                    saveInExtensionClass(extensionClasses, clazz, n, overridden);
                }
            }
        }
    }

    private void cacheActivateClass(Class clazz, String name) {
        Activate activate = clazz.getAnnotation(Activate.class);
        if (activate != null) {
            cachedActivates.put(name, activate);
        } else {
            // support com.alibaba.dubbo.common.extension.Activate
            com.alibaba.dubbo.common.extension.Activate oldActivate = clazz.getAnnotation(com.alibaba.dubbo.common.extension.Activate.class);
            if (oldActivate != null) {
                cachedActivates.put(name, oldActivate);
            }
        }
    }

这里分别缓缓道来不同的map,在获取不同的扩展点的时候 从不同的map里拿。

获取自适应扩展点

上面提到可以通过 ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension() 来获取自适应的扩展点 看getAdaptiveExtension 实现

   public T getAdaptiveExtension() {
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError != null) {
                throw new IllegalStateException("Failed to create adaptive instance: " +
                        createAdaptiveInstanceError.toString(),
                        createAdaptiveInstanceError);
            }

            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        }

        return (T) instance;
    }

首先从缓存中取,没有则创建,在加载 扩展点的时候提到来 如果有类被@Adaptive 标示,会被缓存,而且只有一个,原因上面也讲了,如果没有那么 框架来创建,就是在这里创建的

创建的过程就是根据接口 用string 来写一个实现类 然后编译为class 

    private Class createAdaptiveExtensionClass() {
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        ClassLoader classLoader = findClassLoader();
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }

编译的类是 动态生成的string, 通过debug 可以获取这个code 就是上面 讲Adaptive 注解功能时 提到的 Protocol$Adaptive 类 ,一般讲动态代理 可以通过jdk的InvocationHandler接口 或者 *** 作字节码的工具,这里是第三种方式根据接口拼接一个类然后编译为class,前面2种是为一个class 生成一个代理后面是为一个接口所以 不能用前面的方式。

获取自激活的扩展点

    public List getActivateExtension(URL url, String[] values, String group) {
        List activateExtensions = new ArrayList<>();
//values 是 dubbo.provider.filter配置项的值 , 隔开
        // solve the bug of using @SPI's wrapper method to report a null pointer exception.
        TreeMap activateExtensionsMap = new TreeMap<>(ActivateComparator.COMPARATOR);
        List names = values == null ? new ArrayList<>(0) : asList(values);
//如果配置没有有 -default 那么 加载缺省的扩展类
        if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {

            getExtensionClasses();
            for (Map.Entry entry : cachedActivates.entrySet()) {
//获取所有被@Activate 注解包含的扩展类 比较group和key 符合则加入activateExtensions
                String name = entry.getKey();
                Object activate = entry.getValue();

                String[] activateGroup, activatevalue;

                if (activate instanceof Activate) {
                    activateGroup = ((Activate) activate).group();
                    activatevalue = ((Activate) activate).value();
                } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
                    activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
                    activatevalue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
                } else {
                    continue;
                }
                if (isMatchGroup(group, activateGroup)
                        && !names.contains(name)
                        && !names.contains(REMOVE_VALUE_PREFIX + name)
                        && isActive(activatevalue, url)) {
                    activateExtensionsMap.put(getExtensionClass(name), getExtension(name));
                }
            }
            if(!activateExtensionsMap.isEmpty()){
                activateExtensions.addAll(activateExtensionsMap.values());
            }
        }

        List loadedExtensions = new ArrayList<>();
        for (int i = 0; i < names.size(); i++) {
            String name = names.get(i);
//如果不是-开头,同时后面也没有 排出自己那么 加载 ,如果是default 把它已经加载的放到前面
            if (!name.startsWith(REMOVE_VALUE_PREFIX)
                    && !names.contains(REMOVE_VALUE_PREFIX + name)) {
                if (DEFAULT_KEY.equals(name)) {
                    if (!loadedExtensions.isEmpty()) {
                        activateExtensions.addAll(0, loadedExtensions);
                        loadedExtensions.clear();
                    }
                } else {
                    loadedExtensions.add(getExtension(name));
                }
            }
        }
        if (!loadedExtensions.isEmpty()) {
            activateExtensions.addAll(loadedExtensions);
        }
        return activateExtensions;
    }
服务暴露

这节主要是总结 为什么我们使用@dubboServie 可以暴露一个服务出去。  

服务导出的入口方法是 DubboBootstrapApplicationListener的onContextRefreshedEvent。它是一个事件响应方法,该方法会在收到 Spring 上下文刷新事件。

    private void onContextRefreshedEvent(ContextRefreshedEvent event) {
        dubboBootstrap.start();
    }

start 方法会去export 所有的service 

   
    public DubboBootstrap start() {
        if (started.compareAndSet(false, true)) {
            ready.set(false);
            initialize();
            if (logger.isInfoEnabled()) {
                logger.info(NAME + " is starting...");
            }
            // 1. export Dubbo Services
            exportServices();

            // Not only provider register
            if (!isonlyRegisterProvider() || hasExportedServices()) {
                // 2. export metadataService
                exportmetadataService();
                //3. Register the local ServiceInstance if required
                registerServiceInstance();
            }

       .....
    }

exportServices(); 就是找到所有的serviceBean 执行export 方法。serviceBean 又是什么了,每一个被@dubboService标示的实现类,都会对应生成一个serviceBean。怎么生成了,这个通过上面提到的Spring的扩展点 BeanFactoryPostProcessor 来实现,扫描到@dubboService的时候就会生成一个ServiceBean,这个serviceBean 的ref 对象会持有 被 这个标记的bean。 通过这个ref 会生成一个代理对象,先简单过下,dubbo没有配置spring.factories,dubbo是这么把扩展点 加到Spring的,这里就是除了 上面的总结Springboot spi 时 漏掉的另一个 方式:通过importBeanDefinitionRegistrar动态注册bean 。在启动项目时我们都加了一个注解@EnableDubbo  在这个注解上可以用@import 导入importBeanDefinitionRegistrar 的实现类,来动态注入bean,会注入2种类型的后置处理器bean ,一个是根据 配置文件的配置值 去生成 dubbo 内部定义的 对象,同时放入Spring 容器,还有一类是扫描注解,生成对象和代理对象,就是上面serviceBean 的生成方式。ServiceAnnotationBeanPostProcessor 来处理@dubboService注解referenceAnnotationBeanPostProcessor处理 @dubboReferen注解

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@documented
@EnableDubboConfig
@DubboComponentScan
public @interface EnableDubbo {
    @AliasFor(
        annotation = DubboComponentScan.class,
        attribute = "basePackages"
    )
    String[] scanbasePackages() default {};

    @AliasFor(
        annotation = DubboComponentScan.class,
        attribute = "basePackageClasses"
    )
    Class[] scanbasePackageClasses() default {};

    @AliasFor(
        annotation = EnableDubboConfig.class,
        attribute = "multiple"
    )
    boolean multipleConfig() default true;
}

@EnableDubboConfig 对应说的第一类后置处理器bean,@DubboComponentScan对应处理第二类

通过@EnableDubboConfig注解引入DubboConfigConfigurationRegistrar,通过注入DubboConfigConfiguration 来处理配置类。
通过@DubboComponentScan注解引入DubboComponentScanRegistrar,扩展的registerBeanDefinitions方法中registerServiceAnnotationBeanPostProcessor 处理@dubboService ,registerCommonBeans中的referenceAnnotationBeanPostProcessor处@dubboReference

上面都是 dubbo 框架 集成到Springboot 的过程,现在回到export的过程

    public synchronized void export() {
        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.initialize();
        }
//检查分别是配置检查,以及 URL 装配。在导出服务之前,Dubbo 需要检查用户的配置是否合理,或者为用户补充缺省配置。配置检查完成后,接下来需要根据这些配置组装 URL
        checkAndUpdateSubConfigs();

        initServicemetadata(provider);
        servicemetadata.setServiceType(getInterfaceClass());
        servicemetadata.setTarget(getRef());
        servicemetadata.generateServiceKey();

//检测要不要暴露服务,有时候我们只是想本地启动服务进行一些调试工作,我们并不希望把本地启动的服务暴露出去给别人调用。可以在dubbo.provider.export=false 配置
        if (!shouldExport()) {
            return;
        }
//检测是不是要延迟 一点时间在暴露 服务
        if (shouldDelay()) {
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }

        exported();
    }

doExport

  protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        doExportUrls();
        bootstrap.setReady(true);
    }

    private void doExportUrls() {
//dubbo 3.0 面向云原生和简化注册中心数据的的优化 。叫做服务自省。没有用过。只注册实例,不会把实列的每个接口都注册,由消费方 自己去提供方获取: 有哪些接口可以用,这里就是服务方存储的自己有哪些服务被提供出去,相当于注册到自己,服务自省。
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                servicemetadata
        );
//加载注册中心地址
        List registryURLs = ConfigValidationUtils.loadRegistries(this, true);
// 遍历所有的协议 ,并在每个协议下 暴露服务,一个接口 可以暴露多种协议出去
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
            // In case user specified path, register service one more time to map it to path.
            repository.registerService(pathKey, interfaceClass);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

loadRegistries 方法主要包含如下的逻辑:

  1. 检测是否存在注册中心配置类,不存在则抛出异常
  2. 构建参数映射集合,也就是 map
  3. 构建注册中心链接列表
  4. 遍历链接列表,并根据条件决定是否将其添加到 registryList 中

doExportUrlsFor1Protocol 里面会根据配置组装url,忽略 主要看export 
 

 String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
    // 如果 scope = none,则什么都不做

        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
        // scope != remote,导出到本地

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
        // scope != local,导出到远程

            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            if (url.getParameter(REGISTER_KEY, true)) {
                                logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " +
                                        registryURL);
                            } else {
                                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                            }
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }

                        Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
                                registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProvidermetaDataInvoker wrapperInvoker = new DelegateProvidermetaDataInvoker(invoker, this);

                        Exporter exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProvidermetaDataInvoker wrapperInvoker = new DelegateProvidermetaDataInvoker(invoker, this);

                    Exporter exporter = PROTOCOL.export(wrapperInvoker);
                    exporters.add(exporter);
                }

                metadataUtils.publishServiceDefinition(url);
            }
        }

这里  生成代理对象的地方是最关键的,这里就是把 serviceimpl 转化成为invoker,后面接收一个请求 都是先通过元信息 类型 入参等 找到invoker

    Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
                                registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

PROXY_FACTORY 也是一个扩展点

    private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

我们主要看他用jdk 实现的版本

    @Override
    public  Invoker getInvoker(T proxy, Class type, URL url) {
        return new AbstractProxyInvoker(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }

这里主要是 返回一个AbstractProxyInvoker 的匿名实现类 实现 的方法就是doInvoke ,doInvoke里面干的事 就是通过反射调用 具体提供服务的serviceImpl 的 方法 。AbstractProxyInvoker 抽象类实现了 invoker接口 它 实现了invoke 方法

    public Result invoke(Invocation invocation) throws RpcException {
        try {
            Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            CompletableFuture future = wrapWithFuture(value);
            CompletableFuture appResponseFuture = future.handle((obj, t) -> {
                AppResponse result = new AppResponse(invocation);
                if (t != null) {
                    if (t instanceof CompletionException) {
                        result.setException(t.getCause());
                    } else {
                        result.setException(t);
                    }
                } else {
                    result.setValue(obj);
                }
                return result;
            });
            return new AsyncRpcResult(appResponseFuture, invocation);
        } catch (InvocationTargetException e) {
            if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
            return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } 

他的invoke 方法就是调用实现类的doInvoke 方法。然后把结果包装到AsyncRpcResult ,因为这个结果 就是service实现类的 返回结果。

梳理一下。就是,服务端 接收到一个请求 ,根据请求的元信息 找到invoker,invoker经过一些filter最后 掉到这个代理生成的 invoker,通过doinvoke 去执行具体的实现方法,然后把结果放到AsyncRpcResult 返回。

回到export过程,代理类生成后就是 网络 *** 作了,打开一个网络 端口来接收请求

                        Exporter exporter = PROTOCOL.export(wrapperInvoker);

PROTOCOL 是一个扩展点,上面举例子说明扩展点的时候 说到了默认是dubboProtocol 这个来暴露dubbo协议

   @Override
    public  Exporter export(Invoker invoker) throws RpcException {
        URL url = invoker.getUrl();
    // 获取服务标识,理解成服务坐标。由服务组名,服务名,服务版本号以及端口组成。比如:
    // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
        // export service.
        String key = serviceKey(url);
    // 创建 DubboExporter

        DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);
    // 将  键值对放入缓存中

   

        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            }
        }
 // 启动服务器
        openServer(url);
   // 优化序列化
        optimizeSerialization(url);

        return exporter;
    }

openServer是重点

  private void openServer(URL url) {
        // find server.
    // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例

        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
        // 访问缓存

            ProtocolServer server = serverMap.get(key);
            if (server == null) {
 // 创建服务器实例
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
            // 服务器已创建,则根据 url 中的配置重置服务器
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

如上,在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置。接下来分析服务器实例的创建过程。如下:

   private ProtocolServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
	// 获取 server 参数,默认为 netty

        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
	// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
        // 创建 ExchangeServer

            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
	// 获取 client 参数,可指定 netty,mina

        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
        // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]

            Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
 // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
        // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return new DubboProtocolServer(server);
    }

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }
 public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

Exchanger也是一个扩展点。默认是HeaderExchanger

@SPI(HeaderExchanger.NAME)
public interface Exchanger {

    
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

}

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

HeaderExchangeServer接收一个RemotingServer 他是原程socket的抽象。通过Transporter 绑定一个端口来创建 并且放进自己的处理handler,熟悉netty 的就会比较眼熟。

Transporter也是一个扩展点 默认是netty

@SPI("netty")
public interface Transporter {

    
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;

    
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyServer(url, handler);
    }

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyClient(url, handler);
    }

}

通过doOpen打开一个端口,这里就是使用netty 监听端口的标准代码了 ,通过handler放进去 协议的编解码器,然后和 业务处理handler

    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();  
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("backlog", getUrl().getPositiveParameter(BACKLOG_KEY, Constants.DEFAULT_BACKLOG));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

nettyHandler 就是继承自netty 的接口SimpleChannelHandler

这个handler 会处理连接, 关闭,连接活跃, 收到消息等事件 这些都是 netty 的 内容了

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

服务暴露酒完了 现在接下来就是服务的处理过程

如果用dubobo 协议则会被 dubboprotocol的received处理 然后调用reply

       @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);

            } else {
                super.received(channel, message);
            }
        }

     @Override
        public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            Invoker invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);
            return result.thenApply(Function.identity());
        } 

这里就是上面一直说的根据元信息找到invoker,然后执行invoker的invoke方法, 这个方法上面说了,经过代理类 最后执行 serviceImpl 的对应方法。其实和最开始 的java简单实现一个rpc 的原理是一样。

服务引用

调用方一般使用@DubboReference 注解使用,那么他会被扫描注入到Spring时 生成一个ReferenceBean,上面也总结了 是通过ReferenceAnnotationBeanPostProcessor 这个后置处理器,当要创建一个bean时 发现这个bean依赖了一个dubbo 的远程接口那么就 生成一个代理类,

   private Object getOrCreateProxy(String referencedBeanName, ReferenceBean referenceBean, boolean localServiceBean, Class serviceInterfaceType) {
        if (localServiceBean) {
            return Proxy.newProxyInstance(this.getClassLoader(), new Class[]{serviceInterfaceType}, this.newReferencedBeanInvocationHandler(referencedBeanName));
        } else {
//如果开启检测,检测是注册中心是否有这个服务
            this.exportServiceBeanIfNecessary(referencedBeanName);
            return referenceBean.get();
        }
    }

    private T createProxy(Map map) {
        URL u;
        if (this.shouldJvmRefer(map)) {
            URL url = (new URL("injvm", "127.0.0.1", 0, this.interfaceClass.getName())).addParameters(map);
            this.invoker = REF_PROTOCOL.refer(this.interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + this.interfaceClass.getName());
            }
        } else {
            this.urls.clear();
            URL monitorUrl;
            if (this.url != null && this.url.length() > 0) {
                String[] us = CommonConstants.SEMIcolon_SPLIT_PATTERN.split(this.url);
                if (us != null && us.length > 0) {
                    String[] var11 = us;
                    int var14 = us.length;

                    for(int var17 = 0; var17 < var14; ++var17) {
                        String u = var11[var17];
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(this.interfaceName);
                        }

                        if (UrlUtils.isRegistry(url)) {
                            this.urls.add(url.addParameterAndEncoded("refer", StringUtils.toQueryString(map)));
                        } else {
                            this.urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else if (!"injvm".equalsIgnoreCase(this.getProtocol())) {
                this.checkRegistry();
                List us = ConfigValidationUtils.loadRegistries(this, false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for(Iterator var3 = us.iterator(); var3.hasNext(); this.urls.add(u.addParameterAndEncoded("refer", StringUtils.toQueryString(map)))) {
                        u = (URL)var3.next();
                        monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                        if (monitorUrl != null) {
                            map.put("monitor", URL.encode(monitorUrl.toFullString()));
                        }
                    }
                }

                if (this.urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + this.interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config  to your spring config.");
                }
            }

            if (this.urls.size() == 1) {
                this.invoker = REF_PROTOCOL.refer(this.interfaceClass, (URL)this.urls.get(0));
            } else {
                List> invokers = new ArrayList();
                URL registryURL = null;
                Iterator var16 = this.urls.iterator();

                while(var16.hasNext()) {
                    monitorUrl = (URL)var16.next();
                    invokers.add(REF_PROTOCOL.refer(this.interfaceClass, monitorUrl));
                    if (UrlUtils.isRegistry(monitorUrl)) {
                        registryURL = monitorUrl;
                    }
                }

                if (registryURL != null) {
                    u = registryURL.addParameterIfAbsent("cluster", "zone-aware");
                    this.invoker = CLUSTER.join(new StaticDirectory(u, invokers));
                } else {
                    this.invoker = CLUSTER.join(new StaticDirectory(invokers));
                }
            }
        }

        if (this.shouldCheck() && !this.invoker.isAvailable()) {
            this.invoker.destroy();
            throw new IllegalStateException("Failed to check the status of the service " + this.interfaceName + ". No provider available for the service " + (this.group == null ? "" : this.group + "/") + this.interfaceName + (this.version == null ? "" : ":" + this.version) + " from the url " + this.invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("Refer dubbo service " + this.interfaceClass.getName() + " from url " + this.invoker.getUrl());
            }

            String metadata = (String)map.get("metadata-type");
            WritablemetadataService metadataService = WritablemetadataService.getExtension(metadata == null ? "local" : metadata);
            if (metadataService != null) {
                u = new URL("consumer", (String)map.remove("register.ip"), 0, (String)map.get("interface"), map);
                metadataService.publishServiceDefinition(u);
            }

            return PROXY_FACTORY.getProxy(this.invoker, ProtocolUtils.isGeneric(this.generic));
        }
    }

这里主要的工作是 生成 客户端的invoker,最后包装的是AsyncToSyncInvoker,同时调用的同步异步也在这里 统一处理

关键的还是需要生成一个代理类 因为 客户端 没有实现类,是因为这里帮我们生成的。

            return PROXY_FACTORY.getProxy(this.invoker, ProtocolUtils.isGeneric(this.generic));

   @Override
    @SuppressWarnings("unchecked")
    public  T getProxy(Invoker invoker, Class[] interfaces) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }

这里和java 简单实现一个rpc 的地方是一样了

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args);
        String serviceKey = invoker.getUrl().getServiceKey();
        rpcInvocation.setTargetServiceUniqueName(serviceKey);

        // invoker.getUrl() returns consumer url.
        RpcContext.setRpcContext(invoker.getUrl());

        if (consumerModel != null) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
        }

        return invoker.invoke(rpcInvocation).recreate();
    }

生成了一个和service 一样类型的代理类,service的除了toString,hashCode 方法  其他方法都 都去组装一个invocation然后用 invoker去执行。

再来看看 consumer 端的invoker 经历了什么 最开始的代理invoker是AsyncToSyncInvoker,这里就是说的全链路异步的一个点,

 @Override
    public Result invoke(Invocation invocation) throws RpcException {
调用 代理的invoker 
        Result asyncResult = invoker.invoke(invocation);

        try {
//如果是同步的方法 在asyncResult 上阻塞
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e) {
            throw new RpcException("Interrupted unexpectedly while waiting for remote result to return!  method: " +
                    invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (ExecutionException e) {
            Throwable t = e.getCause();
            if (t instanceof TimeoutException) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else if (t instanceof RemotingException) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } else {
                throw new RpcException(RpcException.UNKNOWN_EXCEPTION, "Fail to invoke remote method: " +
                        invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        } catch (Throwable e) {
            throw new RpcException(e.getMessage(), e);
        }
        return asyncResult;
    }

然后经过 cluster 和 filter (在上面的扩展点都总结了,一个是wapper类型,一个是activite类型)最终 是dubboInvoker 来处理 把请求发出去

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isoneway = RpcUtils.isoneway(getUrl(), invocation);
            int timeout = calculateTimeout(invocation, methodName);
            invocation.put(TIMEOUT_KEY, timeout);
// 不需要等待 返回结果
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
//需要返回结果
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

至此 服务引用也基本完了。来一张官方图片

泛化调用

app或者前端 都是通过http协议和后端交互,而后端都用 dubbo协议的时候 就需要 一个网关 进行协议的路由和转化 ,但是 网关不可能 依赖所有的api jar 包。也不能 后端接口新增/修改一个 网关就发布一次。

它的实现就是用一个 内置的接口genericService 调用 然后 框架不提供 传入元信息 的方法,不直接从api 拿 由 代码传入。到了服务端后。服务端通过一个filter 来获取真正的参数然后调用 invoker

@Activate(group = CommonConstants.PROVIDER, order = -20000)
public class GenericFilter implements Filter, Filter.Listener {
    private static final Logger logger = LoggerFactory.getLogger(GenericFilter.class);

    @Override
    public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
        if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
                && inv.getArguments() != null
                && inv.getArguments().length == 3
                && !GenericService.class.isAssignableFrom(invoker.getInterface())) {
            String name = ((String) inv.getArguments()[0]).trim();
            String[] types = (String[]) inv.getArguments()[1];
            Object[] args = (Object[]) inv.getArguments()[2];
            try {
                Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
                Class[] params = method.getParameterTypes();
                if (args == null) {
                    args = new Object[params.length];
                }

                if (types == null) {
                    types = new String[params.length];
                }

                if (args.length != types.length) {
                    throw new RpcException("GenericFilter#invoke args.length != types.length, please check your "
                            + "params");
                }
                String generic = inv.getAttachment(GENERIC_KEY);

                if (StringUtils.isBlank(generic)) {
                    generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
                }

                if (StringUtils.isEmpty(generic)
                        || ProtocolUtils.isDefaultGenericSerialization(generic)
                        || ProtocolUtils.isGenericReturnRawResult(generic)) {
                    try {
                        args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
                    } catch (IllegalArgumentException e) {
                        throw new RpcException(e);
                    }
                } else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                    Configuration configuration = ApplicationModel.getEnvironment().getConfiguration();
                    if (!configuration.getBoolean(CommonConstants.ENABLE_NATIVE_JAVA_GENERIC_SERIALIZE, false)) {
                        String notice = "Trigger the safety barrier! " +
                                "Native Java Serializer is not allowed by default." +
                                "This means currently maybe being attacking by others. " +
                                "If you are sure this is a mistake, " +
                                "please set `" + CommonConstants.ENABLE_NATIVE_JAVA_GENERIC_SERIALIZE + "` enable in configuration! " +
                                "Before doing so, please make sure you have configure JEP290 to prevent serialization attack.";
                        logger.error(notice);
                        throw new RpcException(new IllegalStateException(notice));
                    }

                    for (int i = 0; i < args.length; i++) {
                        if (byte[].class == args[i].getClass()) {
                            try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
                                args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                        .getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA)
                                        .deserialize(null, is).readObject();
                            } catch (Exception e) {
                                throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
                            }
                        } else {
                            throw new RpcException(
                                    "Generic serialization [" +
                                            GENERIC_SERIALIZATION_NATIVE_JAVA +
                                            "] only support message type " +
                                            byte[].class +
                                            " and your message type is " +
                                            args[i].getClass());
                        }
                    }
                } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                    for (int i = 0; i < args.length; i++) {
                        if (args[i] instanceof JavaBeanDescriptor) {
                            args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
                        } else {
                            throw new RpcException(
                                    "Generic serialization [" +
                                            GENERIC_SERIALIZATION_BEAN +
                                            "] only support message type " +
                                            JavaBeanDescriptor.class.getName() +
                                            " and your message type is " +
                                            args[i].getClass().getName());
                        }
                    }
                } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) {
                    // as proto3 only accept one protobuf parameter
                    if (args.length == 1 && args[0] instanceof String) {
                        try (UnsafeByteArrayInputStream is =
                                     new UnsafeByteArrayInputStream(((String) args[0]).getBytes())) {
                            args[0] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                    .getExtension(GENERIC_SERIALIZATION_PROTOBUF)
                                    .deserialize(null, is).readObject(method.getParameterTypes()[0]);
                        } catch (Exception e) {
                            throw new RpcException("Deserialize argument failed.", e);
                        }
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        GENERIC_SERIALIZATION_PROTOBUF +
                                        "] only support one " + String.class.getName() +
                                        " argument and your message size is " +
                                        args.length + " and type is" +
                                        args[0].getClass().getName());
                    }
                }

                RpcInvocation rpcInvocation =
                        new RpcInvocation(method, invoker.getInterface().getName(), invoker.getUrl().getProtocolServiceKey(), args,
                                inv.getObjectAttachments(), inv.getAttributes());
                rpcInvocation.setInvoker(inv.getInvoker());
                rpcInvocation.setTargetServiceUniqueName(inv.getTargetServiceUniqueName());

                return invoker.invoke(rpcInvocation);
            } catch (NoSuchMethodException | ClassNotFoundException e) {
                throw new RpcException(e.getMessage(), e);
            }
        }
        return invoker.invoke(inv);
    }

    @Override
    public void onResponse(Result appResponse, Invoker invoker, Invocation inv) {
        if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
                && inv.getArguments() != null
                && inv.getArguments().length == 3
                && !GenericService.class.isAssignableFrom(invoker.getInterface())) {

            String generic = inv.getAttachment(GENERIC_KEY);
            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
            }

            if (appResponse.hasException()) {
                Throwable appException = appResponse.getException();
                if (appException instanceof GenericException) {
                    GenericException tmp = (GenericException) appException;
                    appException = new com.alibaba.dubbo.rpc.service.GenericException(tmp.getExceptionClass(), tmp.getExceptionMessage());
                }
                if (!(appException instanceof com.alibaba.dubbo.rpc.service.GenericException)) {
                    appException = new com.alibaba.dubbo.rpc.service.GenericException(appException);
                }
                appResponse.setException(appException);
            }
            if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA)
                            .serialize(null, os).writeObject(appResponse.getValue());
                    appResponse.setValue(os.toByteArray());
                } catch (IOException e) {
                    throw new RpcException(
                            "Generic serialization [" +
                                    GENERIC_SERIALIZATION_NATIVE_JAVA +
                                    "] serialize result failed.", e);
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                appResponse.setValue(JavaBeanSerializeUtil.serialize(appResponse.getValue(), JavaBeanAccessor.METHOD));
            } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    ExtensionLoader.getExtensionLoader(Serialization.class)
                            .getExtension(GENERIC_SERIALIZATION_PROTOBUF)
                            .serialize(null, os).writeObject(appResponse.getValue());
                    appResponse.setValue(os.toString());
                } catch (IOException e) {
                    throw new RpcException("Generic serialization [" +
                            GENERIC_SERIALIZATION_PROTOBUF +
                            "] serialize result failed.", e);
                }
            } else if (ProtocolUtils.isGenericReturnRawResult(generic)) {
                return;
            } else {
                appResponse.setValue(PojoUtils.generalize(appResponse.getValue()));
            }
        }
    }

    @Override
    public void onError(Throwable t, Invoker invoker, Invocation invocation) {

    }
}

import org.apache.dubbo.rpc.service.GenericService; 
... 
 
// 引用远程服务 
// 该实例很重量,里面封装了所有与注册中心及服务提供方连接,请缓存
ReferenceConfig reference = new ReferenceConfig(); 
// 弱类型接口名
reference.setInterface("com.xxx.XxxService");  
reference.setVersion("1.0.0");
// 声明为泛化接口 
reference.setGeneric(true);  

// 用org.apache.dubbo.rpc.service.GenericService可以替代所有接口引用  
GenericService genericService = reference.get(); 
 
// 基本类型以及Date,List,Map等不需要转换,直接调用 
Object result = genericService.$invoke("sayHello", new String[] {"java.lang.String"}, new Object[] {"world"}); 
 
// 用Map表示POJO参数,如果返回值为POJO也将自动转成Map 
Map person = new HashMap(); 
person.put("name", "xxx"); 
person.put("password", "yyy"); 
// 如果返回POJO将自动转成Map 
Object result = genericService.$invoke("findPerson", new String[]
{"com.xxx.Person"}, new Object[]{person}); 

dubbo中好的设计学习总结

无线程线程池

在dubbo 的线程模型是这样的,netty 有一个 boss 线程处理连接, worker 线程处理 请求,然后还有一个业务线程池,业务线程池有2个,一个是provder 用来处理接收到的请求。它有 5种 模式可以选择

  • all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。 也就是worker 拿到请求 全部丢给provider线程池
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。 也就是worker 拿到请求 然后处理 不用provider线程池
  • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。 
  • execution 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

默认是all 模式

对应就是通过Dispatcher 扩展点 

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

另一个线程池是consumer 线程池, 它用来处理 业务发送请求后,收到了结果,把这个结果 塞给CompletableFuture 。

也就是 dubbo收到 消息时 会根据是 request还是response 选择不同的处理request的话走dubboProtocol 的refer 处理(过程上面有) ,response直接找到对应的Future  然后塞result进去。因此getExecutorService() 根据请求是request还是response是会返回不同的池子。request走 进入provder池 ,是response时  走consumer的池子 在 consumer线程负责把结果塞回Future去。

HeaderExchangeHandler中
  @Override
    public void received(Channel channel, Object message) throws RemotingException {
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                    handleRequest(exchangeChannel, request);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    }
WrappedChannelHandler
    public ExecutorService getPreferredExecutorService(Object msg) {
        if (msg instanceof Response) {
            Response response = (Response) msg;
            DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
            // a typical scenario is the response returned after timeout, the timeout response may has completed the future
            if (responseFuture == null) {
                return getSharedExecutorService();
            } else {
                ExecutorService executor = responseFuture.getExecutor();
                if (executor == null || executor.isShutdown()) {
                    executor = getSharedExecutorService();
                }
                return executor;
            }
        } else {
            return getSharedExecutorService();
        }
    }

那么这里dubbo就有一个优化,他弄出了一个 无线程的线程池 的东西。目的是为了在同步请求中节省 线程的开支。

它的意思是,业务线程在CompletableFuture上等待,结果返回后 要一个consumer线程去把结果塞回CompletableFuture,让业务线程结束等待,那么是否可以让业务线程自己去塞这个结果了因为它本身阻塞在那里就是个浪费。所以 它创建了一个

ThreadlessExecutor ,让业务线程在这个池子的队列上阻塞,而不是在CompletableFuture 上阻塞,当有任务进入的时候业务线程被唤醒 执行塞 结果的动作,然后拿结果。

AsyncRpcResult
    public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            threadlessExecutor.waitAndDrain();
        }
        return responseFuture.get(timeout, unit);
    }

大多数的 调用都是 同步的这里就节省了因为全链路异步而增加的线程开销。

线创建最大线程数在放入任务队列的线程池

我们都知道java的带缓冲的线程池,是

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

他是当任务进来,但是coreSize 大小的线程正在忙时 把任务放进 队列,当队列满是 在创建 线程 知道 max。但是如果我们想 先创建知道max,在放进队列 怎么办了。dubbo 就通过了简单的扩展 实现了这个功能

通过用自定义的队列,在放进任务时 判断 当前线程数是否大于max 不大于则 返回fase 让线程池 继续创建线程来处理。

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit, TaskQueue workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
}

    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();
        // have free worker. put task into queue to let the worker deal with task.
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

        // return false to let executor create new worker.
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // currentPoolThreadSize >= max
        return super.offer(runnable);
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5575101.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)