Dubbo笔记 ㉘ : 服务自省-消费者

Dubbo笔记 ㉘ : 服务自省-消费者,第1张

Dubbo笔记 ㉘ : 服务自省-消费者

文章目录
  • 一、前言
  • 二、ReferenceAnnotationBeanPostProcessor#doGetInjectedBean
    • 1. newProxyInstance
    • 2. ReferenceBean#get
    • 3. ReferenceConfig#createProxy
      • 3.1 ConfigValidationUtils.loadRegistries(this, false);
      • 3.2 REF_PROTOCOL.refer(interfaceClass, urls.get(0));
      • 3.3 metadataService.publishServiceDefinition(consumerURL)
      • 3.4 PROXY_FACTORY.getProxy(invoker);
  • 三、ServiceDiscoveryRegistryProtocol#refer
    • 1. RegistryService#register
    • 2. RegistryDirectory#buildRouterChain
    • 3. RegistryDirectory#subscribe
      • 3.1 writablemetadataService.subscribeURL(url)
      • 3.2 getServices(url)
      • 3.3 subscribeURLs(url, listener, serviceName)
    • 4. Cluster#join
  • 四、ServiceDiscoveryRegistry#subscribeURLsURL, NotifyListener, String)
    • 1. serviceDiscovery.getInstances(serviceName);
    • 2. subscribeURLs(url, listener, serviceName, serviceInstances);
      • 2.1 prepareServiceRevisionExportedURLs(serviceInstances);
        • 2.1.1 ServiceDiscoveryRegistry#expungeStaleRevisionExportedURLs
        • 2.1.2 ServiceDiscoveryRegistry#initializeRevisionExportedURLs(List)
      • 2.2 cloneExportedURLs(subscribedURL, serviceInstances);
    • 3. registerServiceInstancesChangedListener
  • 五、总结

一、前言

本系列为个人Dubbo学习笔记,内容基于《深度剖析Apache Dubbo 核心技术内幕》, 过程参考官方源码分析文章,仅用于个人笔记记录。本文分析基于Dubbo2.7.5版本,由于个人理解的局限性,若文中不免出现错误,感谢指正。

系列文章地址:Dubbo源码分析:全集整理


本文基于 Dubbo 2.7.5 版本。关于该部分逻辑,如有需要可参考:

  1. Dubbo笔记 ㉕ : Spring 执行流程概述
  2. Dubbo笔记 ㉖ : DubboBootstrap 的服务暴露
  3. Dubbo笔记 ㉗ : 服务自省-提供者
  4. Dubbo笔记 ㉘ : 服务自省-消费者
二、ReferenceAnnotationBeanPostProcessor#doGetInjectedBean

在 Dubbo笔记 ㉕ : Spring 执行流程概述中我们就介绍了 Spring 框架下 Dubbo的启动流程。对于消费者我们提到,在Spring 容器启动过程中,ReferenceAnnotationBeanPostProcessor会扫描 被@Reference 注解修饰的属性,并进行属性填充,而填充的逻辑则在于 ReferenceAnnotationBeanPostProcessor#doGetInjectedBean 中,在这个方法中会创建 Dubbo Service的 代理对象。其实现具体如下:

    @Override
    protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class injectedType,
                                       Injectionmetadata.InjectedElement injectedElement) throws Exception {
          
        // 以 @Service 注解规则生成的 ServiceBean BeanName
        // ServiceBean:com.kingfish.service.ProviderService:1.0.0:spring
        String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
        // 以 @Reference 注解规则生成的 BeanName :如果指定id,则返回id,否则按照规则:@Reference(属性key=value,...) 接口名  生成
        // 如:@Reference(group=spring,version=1.0.0) com.kingfish.service.ProviderService
        String referenceBeanName = getReferenceBeanName(attributes, injectedType);
		// 为当前引用创建一个 ReferenceBean  对象(如果需要)
        ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
		// ReferenceBean 注册到容器中
        registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType);
		// 缓存 ReferenceBean  
        cacheInjectedReferenceBean(referenceBean, injectedElement);
		// 创建Dubbo 服务引用服务的代理对象。
        return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
    }

这里我们注重来看 ReferenceAnnotationBeanPostProcessor#getOrCreateProxy 的实现,如下:

    private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class serviceInterfaceType) {
    	// 1. 判断Spring 容器中是否已经存在该 referencedBean,直接创建代理类。
        if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
            return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
                    wrapInvocationHandler(referenceBeanName, referenceBean));
        } else { // ReferenceBean should be initialized and get immediately
        	// 2. 否则的话直接获取
            return referenceBean.get();
        }
    }

这里我们看到l了两个分支:

  1. 如果容器中存在该 dubbo service 的 ReferenceBean,则直接创建代理对象。
  2. 如果容器中不存在 该 dubbo service 的 ReferenceBean,则会通过 ReferenceBean#get 来获取代理对象。

下面我们一一来看:

1. newProxyInstance

当我们通过XML 或 @Bean 注入了 ReferenceBean 时可能会出现这种情况,即容器中存在 referencedBean。这里可以看到, 如果容器中已经存在当前接口的 referencedBean。则会直接创建代理类.

   return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
           wrapInvocationHandler(referenceBeanName, referenceBean));
	// 使用 ReferenceBeanInvocationHandler包装 referenceBean
	// 保存到 ReferenceAnnotationBeanPostProcessor#localReferenceBeanInvocationHandlerCache 缓存中
    private InvocationHandler wrapInvocationHandler(String referenceBeanName, ReferenceBean referenceBean) {
        return localReferenceBeanInvocationHandlerCache.computeIfAbsent(referenceBeanName, name ->
                new ReferenceBeanInvocationHandler(referenceBean));
    }

ReferenceAnnotationBeanPostProcessor.ReferenceBeanInvocationHandler 实现如下:

    private static class ReferenceBeanInvocationHandler implements InvocationHandler {

        private final ReferenceBean referenceBean;

        private Object bean;

        private ReferenceBeanInvocationHandler(ReferenceBean referenceBean) {
            this.referenceBean = referenceBean;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            Object result;
            try {
            	// 判断 Bean 如果为 null,则进行初始化
                if (bean == null) { // If the bean is not initialized, invoke init()
                    // issue: https://github.com/apache/dubbo/issues/3429
                    init();
                }
                result = method.invoke(bean, args);
            } catch (InvocationTargetException e) {
                // re-throws the actual Exception.
                throw e.getTargetException();
            }
            return result;
        }

        private void init() {
        	// 调用 ReferenceBean#get()
            this.bean = referenceBean.get();
        }
    }
2. ReferenceBean#get

我们在 Dubbo源码分析:全集整理 中分析过 2.7.0 版本的 Dubbo消费者引用过程,这里基本逻辑类似,所以这并不会分析过于详细。

Dubbo 在第一次创建 Dubbo Service的 代理对象时会执行 ReferenceConfig#init 方法,如下:

    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        // 缓存没有则创建
        if (ref == null) {
        	// 初始化代理对象
            init();
        }
        return ref;
    }

    public synchronized void init() {
        if (initialized) {
            return;
        }
		// bootstrap 初始化
        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }

        checkAndUpdateSubConfigs();

        //init serivcemetadata
        // 初始化 servicemetadata信息
        servicemetadata.setVersion(version);
        servicemetadata.setGroup(group);
        servicemetadata.setDefaultGroup(group);
        servicemetadata.setServiceType(getActualInterface());
        servicemetadata.setServiceInterfaceName(interfaceName);
        // TODO, uncomment this line once service key is unified
        servicemetadata.setServiceKey(URL.buildKey(interfaceName, group, version));

        checkStubAndLocal(interfaceClass);
        ConfigValidationUtils.checkMock(interfaceClass, this);

        Map map = new HashMap();

		... 服务参数解析,保存到 map 中
		// 将参数信息保存到 servicemetadata#attachments 中
        servicemetadata.getAttachments().putAll(map);
		// 注册消费者
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(interfaceClass);
        repository.registerConsumer(
                servicemetadata.getServiceKey(),
                attributes,
                serviceDescriptor,
                this,
                null,
                servicemetadata);
		// 创建提供者代理类
        ref = createProxy(map);

        servicemetadata.setTarget(ref);
        servicemetadata.addAttribute(PROXY_CLASS_REF, ref);
        // 保存提供者代理类的引用关系
        repository.lookupReferredService(servicemetadata.getServiceKey()).setProxyObject(ref);

        initialized = true;

        // dispatch a ReferenceConfigInitializedEvent since 2.7.4
        // 分发 ReferenceConfigInitializedEvent 事件
        dispatch(new ReferenceConfigInitializedEvent(this, invoker));
    }

这里对 ReferenceConfig#init 方法进行了简化,不过我们可以看到核心逻辑在 ReferenceConfig#createProxy 中。下面我们具体来看:

3. ReferenceConfig#createProxy

ReferenceConfig#createProxy 创建了 Dubbo service的代理对象,是 消费者端的核心方法,其实现如下:

    private T createProxy(Map map) {
        if (shouldJvmRefer(map)) {
			... 本地服务调用
        } else {
            urls.clear();
            // // 如果 url不为空,则说明可能会进行点对点调用,即服务直连
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
            
				... 对服务直连的处理,将指定的url解析出注册中心地址,保存到urls 中
				 
            } else { // assemble URL from register center's configuration
                // if protocols not injvm checkRegistry
                // 非本地调用
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                    checkRegistry();
                    // 1. 解析 注册中心地址
                    List us = ConfigValidationUtils.loadRegistries(this, false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config  to your spring config.");
                    }
                }
            }
			// 单个注册中心或服务提供者(服务直连)
            if (urls.size() == 1) {
            	// 2. 
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
            	...  多个注册中心或多个服务提供者,或者两者混合 调用,与单一差不多
            }
        }

		... 
        
         // 将 ServiceDefinition 发布到 WritablemetadataService 中。这里WritablemetadataService  默认为 lcoal,被 dubbo.application.parameters.metadata 控制,默认为 local
        String metadata = map.get(metaDATA_KEY);
        WritablemetadataService metadataService = WritablemetadataService.getExtension(metadata == null ? DEFAULT_metaDATA_STORAGE_TYPE : metadata);
        if (metadataService != null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            // 3. 
            metadataService.publishServiceDefinition(consumerURL);
        }
        // create service proxy
        // 创建 Service 代理对象
        // 4. 
        return (T) PROXY_FACTORY.getProxy(invoker);
    }

上面的进行了代码省略,我们主要关注下面四点:

  1. ConfigValidationUtils.loadRegistries(this, false); :解析获取注册中心地址,这里会对服务自省的模式进行判断。
  2. REF_PROTOCOL.refer(interfaceClass, urls.get(0)); :在服务自省模式下,这里会调用 ServiceDiscoveryRegistryProtocol#refer 来获取 Invoker。该方法在第三部分进行了详细分析。
  3. metadataService.publishServiceDefinition(consumerURL) :发布 service definition ,这里默认会将 service definition 发布到本地元数据中心。 service definition 中包括 :接口名、接口方法、方法入参等信息。
  4. PROXY_FACTORY.getProxy(invoker); : 获取最终的代理对象。

下面我们对这四个部分进行详细分析 :

3.1 ConfigValidationUtils.loadRegistries(this, false);

在上面的代码中会通过 ConfigValidationUtils.loadRegistries(this, true) 是获取到当前的注册中心地址,这点逻辑和提供者相同。 ConfigValidationUtils.loadRegistries(this, true) ,内部会执行下面代码:

  url = URLBuilder.from(url)
          .addParameter(REGISTRY_KEY, url.getProtocol())
          // 这里会通过ConfigValidationUtils#extractRegistryType 判断 dubbo.registry.parameters.registry-type 是否为 service 
          // 如果是则将url Protocol 设置为 service-discovery-registry 否则为 registry
          .setProtocol(extractRegistryType(url))
          .build();

也就是说,如果我们设置了 dubbo.registry.parameters.registry-type = service,在这则会将注册中心协议修改 为 service-discovery-registry ,如下:

// 协议类型为 service-discovery-registry,在后续会对应加载 ServiceDiscoveryRegistryProtocol
service-discovery-registry://localhost:2181/org.apache.dubbo.registry.RegistryService?application=simple-provider&dubbo=2.0.2&logger=slf4j&metadata.type=remote&pid=25236&qos.enable=false®istry=zookeeper®istry-type=service®istry.type=service&release=2.7.5×tamp=1634545621271

否则协议类型会为设置为 registry,如下:

// 协议类型为 registry, 在后续对应加载 RegistryProtocol
registry://localhost:2181/org.apache.dubbo.registry.RegistryService?application=simple-provider&dubbo=2.0.2&logger=slf4j&metadata.type=local&pid=38688&qos.enable=false®istry=zookeeper&release=2.7.5&simplified=false×tamp=1636683500716

3.2 REF_PROTOCOL.refer(interfaceClass, urls.get(0));

这里 REF_PROTOCOL是 Protocol$Adaptive类型,即Protocol 的 SPI 适配器,会根据入参中的 url.getProtocol() 获取的值来选择调用具体的实现类,而我们上面提到,如果我们设置了 dubbo.registry.parameters.registry-type = service,URL的协议类型为 service-discovery-registry,因此这里会调用 ServiceDiscoveryRegistryProtocol#refer 来完成后续引用 *** 作。关于 ServiceDiscoveryRegistryProtocol#refer 的内容,我们在第三部分再详细分析。


3.3 metadataService.publishServiceDefinition(consumerURL)

该部分是将当前服务的 ServiceDefinition 发布到 WritablemetadataService 中,需要注意的是,当前这个 WritablemetadataService 的类型并非是 dubbo.application.metadata-type = local 来控制,而是通过参数中的 metadata 属性,如 dubbo.application.parameters.metadata,默认情况下为 local 。

        String metadata = map.get(metaDATA_KEY);
        WritablemetadataService metadataService = WritablemetadataService.getExtension(metadata == null ? DEFAULT_metaDATA_STORAGE_TYPE : metadata);
        if (metadataService != null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataService.publishServiceDefinition(consumerURL);
        }

需要注意:

如果服务自省模式下使用远程元数据中心 (dubbo.application.metadata-type=remote) 的情况下
并且,这里将元数据设置为了远程( dubbo.application.parameters.metadata = remote )会产生冲突,因为其需要在元数据中心上创建的节点路径相同。

下面以ZK 作为元数据中心说明:

  • dubbo.application.parameters.metadata = remote 时会在元数据中心上创建 ServiceDefinition 的相关节点,如对于ProviderService其节点路径为 /dubbo/metadata/com.kingfish.service.ProviderService/2.0.0/spring , 节点值如下:

    {
        "canonicalName": "com.kingfish.service.ProviderService",
        "codeSource": "file:/E:/HKingFish/dubbo-demo/simple-spring-duboo/simple-api/target/classes/",
        "methods": [
            {
                "name": "sayHelloWorld",
                "parameterTypes": [],
                "returnType": "java.lang.String"
            },
            {
                "name": "sayHello",
                "parameterTypes": [
                    "java.lang.String"
                ],
                "returnType": "void"
            },
            {
                "name": "sayHello",
                "parameterTypes": [],
                "returnType": "void"
            }
        ],
        "types": [
            {
                "type": "int",
                "typeBuilderName": "org.apache.dubbo.metadata.definition.builder.DefaultTypeBuilder"
            },
            {
                "type": "char",
                "typeBuilderName": "org.apache.dubbo.metadata.definition.builder.DefaultTypeBuilder"
            },
            {
                "type": "java.lang.String",
                "typeBuilderName": "org.apache.dubbo.metadata.definition.builder.DefaultTypeBuilder"
            },
            {
                "type": "void",
                "typeBuilderName": "org.apache.dubbo.metadata.definition.builder.DefaultTypeBuilder"
            }
        ]
    }
    
  • dubbo.application.metadata-type=remote 时会在元数据中心上创建 dubbo service的相关节点,如 /dubbo/metadata/com.kingfish.service.ProviderService/2.0.0/spring/provider/dubbo/revision3124311290257149701。节点值为如下 :

    dubbo%3A%2F%2F192.168.110.57%3A20880%2Forg.apache.dubbo.metadata.metadataService%3Fanyhost%3Dtrue%26application%3Dsimple-provider%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26group%3Dsimple-provider%26interface%3Dorg.apache.dubbo.metadata.metadataService%26logger%3Dslf4j%26metadata.type%3Dremote%26methods%3DgetExportedURLs%2CtoURLs%2CserviceName%2CismetadataServiceURL%2Cversion%2CgetSubscribedURLs%2CtoSortedStrings%2CgetServiceDefinition%26pid%3D35544%26release%3D2.7.5%26revision%3D2.7.5%26side%3Dprovider%26timestamp%3D1637305980358%26version%3D1.0.0
    

可以看到,两种模式都需要对 /dubbo/metadata/com.kingfish.service.ProviderService/2.0.0 节点处理,因此会产生冲突。

3.4 PROXY_FACTORY.getProxy(invoker);

这里默认会通过 JavassistProxyFactory#getProxy 来创建代理对象。该内容在 Dubbo笔记 ⑧ : 消费者启动流程 - ReferenceConfig#get 中有过详细介绍,这里不再赘述。

三、ServiceDiscoveryRegistryProtocol#refer

下面我们来看一下 ServiceDiscoveryRegistryProtocol#refer 的具体实现,其实现在父类 RegistryProtocol 中,如下:

	// RegistryProtocol#refer  
    public  Invoker refer(Class type, URL url) throws RpcException {
    	// //取 registry 参数值,并将其设置为协议头
        url = getRegistryUrl(url);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        // / 将 url 查询字符串转为 Map
        Map qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            	// 如果是多分组的情况下,通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑。
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        // 调用 doRefer 继续执行服务引用逻辑
        return doRefer(cluster, registry, type, url);
    }
   

这里我们看 RegistryProtocol#doRefer 实现如下:

	// RegistryProtocol#doRefer
    private  Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {
        RegistryDirectory directory = new RegistryDirectory(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map parameters = new HashMap(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
            // 1. 注册,这里虽然调用了registry,但消费者并不会真正注册
            registry.register(directory.getRegisteredConsumerUrl());
        }
        // 2. 构建路由链
        directory.buildRouterChain(subscribeUrl);
        // 3. 订阅节点信息,监听服务的变化
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
		// 4. 获取 Invoker
        Invoker invoker = cluster.join(directory);
        return invoker;
    }

RegistryProtocol#refer 中短短几行代码,却囊括了 Dubbo 集群容错的逻辑。下面我们按照注释的顺序进行分析。


1. RegistryService#register

这里的实现是 ServiceDiscoveryRegistry#register,其逻辑同提供者,由于当前是消费者,所以并不会进行注册。如下:

    @Override
    public final void register(URL url) {
    	// 消费者不会注册,这里会直接return
        if (!shouldRegister(url)) { // Should Not Register
            return;
        }
        super.register(url);
    }
    
2. RegistryDirectory#buildRouterChain

保存路由链。关于 Dubbo 的 Router 相关内容,如有需要详参:Dubbo笔记 ⑮ :Dubbo集群组件 之 Router

    public void buildRouterChain(URL url) {
    	// 创建一个 RouterChain 并赋值给 RegistryDirectory#routerChain
        this.setRouterChain(RouterChain.buildChain(url));
    }

    public void setRouterChain(RouterChain routerChain) {
        this.routerChain = routerChain;
    }
3. RegistryDirectory#subscribe

RegistryDirectory#subscribe 是对节点的订阅,也是我们需要分析的核心逻辑。其实现如下:

	// 这里的入参如: consumer://192.168.110.57/com.kingfish.service.ProviderService?application=simple-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&group=spring&init=false&interface=com.kingfish.service.ProviderService&logger=slf4j&metadata=remote&metadata.type=local&methods=sayHello,sayHelloWorld&pid=32616&qos.enable=false&release=2.7.5&revision=2.0.0&side=consumer&sticky=false×tamp=1637308080597&version=2.0.0
    public void subscribe(URL url) {
        setConsumerUrl(url);
        // 添加当前类为监听器
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
        // service 配置监听器
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
        // 这里调用 ServiceDiscoveryRegistry 重写了 该方法
        registry.subscribe(url, this);
    }

其中 ServiceDiscoveryRegistry#subscribe 实现如下:

    @Override
    public final void subscribe(URL url, NotifyListener listener) {
    	// 非消费者直接 return。这里是消费者才可以进行订阅
        if (!shouldSubscribe(url)) { // Should Not Subscribe
            return;
        }
        // 这里经过流转后还是会调用 ServiceDiscoveryRegistry#doSubscribe
        super.subscribe(url, listener);
    }

    @Override
    public void doSubscribe(URL url, NotifyListener listener) {
    	// 交由 ServiceDiscoveryRegistry#subscribeURLs 处理
        subscribeURLs(url, listener);
    }

ServiceDiscoveryRegistry#subscribeURLs 在 ServiceDiscoveryRegistry中有多个重载方法,该处调用的代码如下:

	// ServiceDiscoveryRegistry#doSubscribe 会调用该方法
    protected void subscribeURLs(URL url, NotifyListener listener) {
		// 1. 订阅当前 URL
        writablemetadataService.subscribeURL(url);
		// 2. 获取服务名
        Set serviceNames = getServices(url);
        if (CollectionUtils.isEmpty(serviceNames)) {
            throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
        }
		// 3. 遍历服务进行订阅
        serviceNames.forEach(serviceName -> subscribeURLs(url, listener, serviceName));

    }

上面我们可以看到其逻辑大致如下:

  1. 订阅当前 URL : 实际上是将当前 URL 保存到本地元数据中心中。
  2. 获取提供当前需要的接口服务的 应用名
  3. 对提供服务的应用进行遍历订阅,获取应用提供的服务,并进行订阅。

下面我们按照注释顺序来一步一步分析:

3.1 writablemetadataService.subscribeURL(url)

这里的 writablemetadataService 默认为 InMemoryWritablemetadataService,受 dubbo.registry.parameters.dubbo.metadata.storage-type 参数控制。

下面我们来看InMemoryWritablemetadataService 和 RemoteWritablemetadataService 的实现

  1. InMemoryWritablemetadataService#subscribeURL

    	// subscribedServiceURLs 是 InMemoryWritablemetadataService 的 map属性,
    	// 这里直接将 url 添加到 subscribedServiceURLs 中, key 为 url.getServiceKey()
        @Override
        public boolean subscribeURL(URL url) {
        	// 将入参中的 url 添加到 subscribedServiceURLs
            return addURL(subscribedServiceURLs, url);
        }
        
        boolean addURL(Map> serviceURLs, URL url) {
            return executeMutually(() -> {
                SortedSet urls = serviceURLs.computeIfAbsent(url.getServiceKey(), this::newSortedURLs);
                // make sure the parameters of tmpUrl is variable
                return urls.add(url);
            });
        }
    
  2. RemoteWritablemetadataService#subscribeURL

        @Override
        public boolean subscribeURL(URL url) {
        	// 恒为 true
            return true;
        }
    

3.2 getServices(url)

参数中的 url 中保存了要调用的 Dubbo Service 信息,此时需要通过ServiceDiscoveryRegistry#getServices 来获取提供该服务的应用名。我们来看其实现过程:

    protected Set getServices(URL subscribedURL) {
        Set subscribedServices = new linkedHashSet<>();
		// 1. 获取指定的应用名,如果指定了则会只调用指定的应用
        String serviceNames = subscribedURL.getParameter(PROVIDED_BY);
        if (StringUtils.isNotEmpty(serviceNames)) {
            subscribedServices = parseServices(serviceNames);
        }
		// 如果没有指定
        if (isEmpty(subscribedServices)) {
        	// 2. 根据映射查找服务
            subscribedServices = findMappedServices(subscribedURL);
            if (isEmpty(subscribedServices)) {
            	// 3.如果还为空,则 使用注册表 url 中指定的服务。
                subscribedServices = getSubscribedServices();
            }
        }
        return subscribedServices;
    }

    public Set getSubscribedServices() {
        return subscribedServices;
    }

这里可以看到,ServiceName 的查找有优先级:

  1. 首先查找消费者通过 provided-by 参数指定的应用。
  2. 其次查找映射的应用。我们在提供者的文章中提到,提供者启动后,dubbo会在配置中心创建映射节点(dubbo/config/mapping 节点下的映射关系),这里便是通过其映射节点找到对应服务。
  3. 最后使用注册表 url 中指定的服务。

下面我们来看一下 ServiceDiscoveryRegistry#findMappedServices 的实现:

    protected Set findMappedServices(URL subscribedURL) {
    	// 获取 服务接口、分组、版本、协议
        String serviceInterface = subscribedURL.getServiceInterface();
        String group = subscribedURL.getParameter(GROUP_KEY);
        String version = subscribedURL.getParameter(VERSION_KEY);
        String protocol = subscribedURL.getParameter(PROTOCOL_KEY, DUBBO_PROTOCOL);
        // 进行查找
        return serviceNameMapping.get(serviceInterface, group, version, protocol);
    }

其中 ServiceNameMapping#get 的实现类为 DynamicConfigurationServiceNameMapping,其实现如下:

    @Override
    public Set get(String serviceInterface, String group, String version, String protocol) {
		// 获取动态配置中心
        DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();

        Set serviceNames = new linkedHashSet<>();
        execute(() -> {
        	// 查找指定的配置节点。这里buildGroup 构建的结果为 mapping/com.kingfish.service.ProviderService
            Set keys = dynamicConfiguration.getConfigKeys(buildGroup(serviceInterface, group, version, protocol));
            serviceNames.addAll(keys);
        });
        return Collections.unmodifiableSet(serviceNames);
    }

综上这里会查找 /dubbo/config/mapping 下面的节点,根据接口名获取对应的应用名(该节点是提供者在启动时监听 ServiceConfigExportedEvent 事件注册的,如有需要详参 Dubbo笔记 ㉖ : DubboBootstrap 的服务暴露)。如下图:

3.3 subscribeURLs(url, listener, serviceName)

经过上面两步,现在已经知道了可以提供当前服务的应用名称,这里会对所有的应用名称进行遍历订阅,以获取应用的信息。如下:

        serviceNames.forEach(serviceName -> subscribeURLs(url, listener, serviceName));

ServiceDiscoveryRegistry#subscribeURLs 实现如下,关于该方法的分析再第四部分会进行介绍。

	// ServiceDiscoveryRegistry#subscribeURLs(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.lang.String)
    protected void subscribeURLs(URL url, NotifyListener listener, String serviceName) {
		// 1. 获取 ServiceInstance 实例
        List serviceInstances = serviceDiscovery.getInstances(serviceName);
		// 2. 订阅 URL 
        subscribeURLs(url, listener, serviceName, serviceInstances);

        // register ServiceInstancesChangedListener
        // 3. 注册 ServiceInstance 监听器
        registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {

            @Override
            public void onEvent(ServiceInstancesChangedEvent event) {
                subscribeURLs(url, listener, event.getServiceName(), new ArrayList<>(event.getServiceInstances()));
            }
        });
    }
4. Cluster#join

关于 Dubbo 集群容错的内容,如有需要,详参: Dubbo笔记 ⑮ :Dubbo集群组件 之 Router


四、ServiceDiscoveryRegistry#subscribeURLsURL, NotifyListener, String)

上面我们提到消费者通过配置中心的映射节点找到了提供服务的应用。这里便对应用进行订阅获取信息,其实现如下:

    protected void subscribeURLs(URL url, NotifyListener listener, String serviceName) {
		// 1. 获取 ServiceInstance 实例
        List serviceInstances = serviceDiscovery.getInstances(serviceName);
		// 2. 订阅 URL 
        subscribeURLs(url, listener, serviceName, serviceInstances);

        // register ServiceInstancesChangedListener
        // 3. 注册 ServiceInstance 监听器
        registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {

            @Override
            public void onEvent(ServiceInstancesChangedEvent event) {
                subscribeURLs(url, listener, event.getServiceName(), new ArrayList<>(event.getServiceInstances()));
            }
        });
    }

这里按照注释顺序为:

  1. 根据 serviceName 从元数据中心获取到应用的元数据信息并包装成 ServiceInstance
  2. 根据 ServiceInstance 的信息,获取元数据中心中发布的 接口服务信息,并进行订阅
  3. 注册 ServiceInstance 的监听器,当 ServiceInstance 发生变化时可以感知并同步。

下面我们来具体看:

1. serviceDiscovery.getInstances(serviceName);

serviceDiscovery.getInstances(serviceName); 的作用是根据 serviceName 从 元数据中心上获取应用的元数据信息。这里的调用链如下

 EventPublishingServiceDiscovery#getInstances ->  ZookeeperServiceDiscovery#getInstances

我们这里直接来看 ZookeeperServiceDiscovery#getInstances :

	// ZookeeperServiceDiscovery#getInstances
    @Override
    public List getInstances(String serviceName) throws NullPointerException {
    	// 1. s.queryForInstances(serviceName) 调用的是 ServiceDiscoveryImpl#queryForInstances(java.lang.String)
    	// 2. build(s.queryForInstances(serviceName)) 将第一步查出来的结果封装成 ServiceInstance
        return doInServiceDiscovery(s -> build(s.queryForInstances(serviceName)));
    }

其中 s.queryForInstances(serviceName) 调用的是 ServiceDiscoveryImpl#queryForInstances(java.lang.String),其实现如下:

	// 入参为应用名,如 simple-provider
    @Override
    public Collection> queryForInstances(String name) throws Exception
    {
        return queryForInstances(name, null);
    }

    List> queryForInstances(String name, Watcher watcher) throws Exception
    {
        ImmutableList.Builder> builder = ImmutableList.builder();
        // 路径为 /services/simple-provider
        String path = pathForName(name);
        List instanceIds;

        if ( watcher != null )
        {
            instanceIds = getChildrenWatched(path, watcher, true);
        }
        else
        {
            try
            {
            	// 获取 /services/simple-provider 的子节点,这里为 192.168.110.57:9999 (该应用的ip:端口)
                instanceIds = client.getChildren().forPath(path);
            }
            catch ( KeeperException.NoNodeException e )
            {
                instanceIds = Lists.newArrayList();
            }
        }

        for ( String id : instanceIds )
        {
        	// 按照 /services/simple-provider/192.168.110.57:9999 路径查询并获取节点信息
            ServiceInstance instance = queryForInstance(name, id);
            if ( instance != null )
            {
                builder.add(instance);
            }
        }
        return builder.build();
    }

可以看到 : 这一步的目的就是获取提供当前接口服务的所有应用的元数据信息。在上一步获取到了提供当前接口服务的应用名称后,这一步去获取 service/{应用名} 节点下所有的应用,并获取其元数据信息。


注:
如下图(存在一台机器 192.168.100.57:9999 提供了 simple-provider 的应用服务) :

其中 service/simple-provider/192.168.100.57:9999 节点的值如下:

{
    "name": "simple-provider",
    "id": "192.168.110.57:9999",
    "address": "192.168.110.57",
    "port": 9999,
    "sslPort": null,
    "payload": {
        "@class": "org.apache.dubbo.registry.zookeeper.ZookeeperInstance",
        "id": null,
        "name": "simple-provider",
        "metadata": {
            "dubbo.metadata-service.url-params": "{"dubbo":{"version":"1.0.0","dubbo":"2.0.2","release":"2.7.5","port":"20880"}}",
            "dubbo.subscribed-services.revision": "N/A",
            "dubbo.endpoints": "[{"port":9999,"protocol":"dubbo"}]",
            "dubbo.metadata.storage-type": "remote",
            "dubbo.exported-services.revision": "3124311290257149701"
        }
    },
    "registrationTimeUTC": 1637310981486,
    "serviceType": "DYNAMIC",
    "uriSpec": null
}
2. subscribeURLs(url, listener, serviceName, serviceInstances);

这里调用的方法是:

ServiceDiscoveryRegistry#subscribeURLs(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.lang.String, java.util.Collection) 

其实现如下:

    protected void subscribeURLs(URL subscribedURL, NotifyListener listener, String serviceName,
                                 Collection serviceInstances) {
		// 没有提供服务的应用直接返回
        if (isEmpty(serviceInstances)) {
            return;
        }

        List subscribedURLs = new linkedList<>();

         // 1. 从 metadataService 中 获取应用中的所有暴露的URL,并保存到 subscribedURLs中
        subscribedURLs.addAll(getExportedURLs(subscribedURL, serviceInstances));
		// 如果为空,尝试合成
        if (subscribedURLs.isEmpty()) { // If empty, try to synthesize
            subscribedURLs.addAll(synthesizeSubscribedURLs(subscribedURL, serviceInstances));
        }
		// 2. 通知监听器刷新,这里的监听器是 RegistryDirectory
        listener.notify(subscribedURLs);
    }

这里我们关注两点:

  1. getExportedURLs(subscribedURL, serviceInstances) : 获取应用实例暴露的服务接口的URL 信息。
  2. listener.notify(subscribedURLs) :该方法会对 subscribedURLs 分类并解析,以更新本地的 Invoker 列表。如有需要详参: Dubbo笔记 ⑨ : 消费者启动流程 - RegistryProtocol#refer

这里我们关注 getExportedURLs(subscribedURL, serviceInstances) 实现如下:

    private List getExportedURLs(URL subscribedURL, Collection instances) {

        // local service instances could be mutable
        // 筛选出合适的 ServiceInstance :  启用 && 健康 && 是 dubbo 服务实例
        List serviceInstances = instances.stream()
                .filter(ServiceInstance::isEnabled)
                .filter(ServiceInstance::isHealthy)
                .filter(ServiceInstancemetadataUtils::isDubboServiceInstance)
                .collect(Collectors.toList());

        int size = serviceInstances.size();

        if (size == 0) {
            return emptyList();
        }

        // Prepare revision exported URLs
        // 	1. 准备修订导出的 URL
        prepareServiceRevisionExportedURLs(serviceInstances);

        // Clone the subscribed URLs from the template URLs
        // 2. 从模板 URL 克隆订阅的 URL
        List subscribedURLs = cloneExportedURLs(subscribedURL, serviceInstances);

        // clear local service instances
        // 清除本地 服务实例
        serviceInstances.clear();

        return subscribedURLs;
    }

我们按照注释继续往下看:

2.1 prepareServiceRevisionExportedURLs(serviceInstances);

ServiceDiscoveryRegistry#prepareServiceRevisionExportedURLs 其实现如下:

    private void prepareServiceRevisionExportedURLs(List serviceInstances) {
        executeExclusively(() -> {
            // 1. expunge stale
            // 清除过期 revision
            expungeStaleRevisionExportedURLs(serviceInstances);
            // 2. Initialize
            // 初始化当前 serviceInstances 导出的 URL
            initializeRevisionExportedURLs(serviceInstances);
        });
    }

当应用的配置不同时,其 revision 也不相同。同样的,如果应用有配置修改,其revision 也会变更,原先的revision 就已经过期。这一步的目的就是清除过期的应用revision,初始化当前 serviceInstances 的 url。


关于 Dubbo 服务修订版本,以下内容部分于引用 Apache dubbo 服务自省架构设计 :

当业务出现变化时,Dubbo Service 的 Dubbo 服务也会随之升级。通常,Provider 先行升级,Consumer 随后跟进。服务的升级可以包括

  1. Dubbo 服务 interface 升级 :由于 Dubbo 基于 Java 接口来暴露服务,同时 Java 接口通常在 Dubbo 微服务中又是唯一的。如果 interface 的全类名调整的话,那么,相当于 com.acme.Interface1 做下线处理,Consumer 将无法消费到该 Dubbo 服务,这种情况不予考虑。如果是 Provider 新增服务接口的话,那么 com.acme.Interface1 则并没有变化,也无需考虑。所以,有且仅有一种情况考虑,即“Dubbo interface 方法声明升级”,包括:增加服务方法、删除服务方法、修改方法签名

  2. Dubbo 服务 group、version 和 protocol 升级 :假设 P1 在升级过程中,新的服务实例部署仅存在调整 group 后的 Dubbo 服务,如 dubbo:com.acme.Interface1:v1:test ,那么这种升级就是不兼容升级,在新老交替过程中,Consumer 仅能消费到老版本的 Dubbo 服务。当新版本完全部署完成后,Consumer 将无法正常服务调用。如果,新版本中 P1 同时部署了 dubbo:com.acme.Interface1:v1:default

    和 dubbo:com.acme.Interface1:v1:test 的话,相当于 group 并无变化。同理,version 和 protocol 变化,相当于 Dubbo 服务 ID 变化,这类情况无需处理。

  3. Dubbo 服务元数据升级 : 这是一种比较特殊的升级方法,即 Provider 所有服务实例 Dubbo 服务 ID 相同,然而 Dubbo 服务的参数在不同版本服务实例存在差异,假设 Dubbo Service P1 部署 5 台服务,其中 3 台服务实例设置 timeout 为 1000 ms,其余 2 台 timeout 为 3000 ms。换言之,P1 拥有两个版本(状态)的 metadataService 。

综上所述,无论是 Dubbo interface 方法声明升级,还是 Dubbo 服务元数据升级,均可认为是 Dubbo 服务升级的因子,这些因子所计算出来的数值称之为“Dubbo 服务修订版本”,服务自省架构将其命名为“revision”。架构设设计上,当 Dubbo Service 增加或删除服务方法、修改方法签名以及调整 Dubbo 服务元数据,revision 也会随之变化,revision 数据将存放在其 Dubbo 服务实例的 metadata 中。当 Consumer 订阅 Provider Dubbo 服务元信息时,metadataService 远程调用的次数取决于服务实例列表中出现 revision 的个数,整体执行流程如下图所示:

2.1.1 ServiceDiscoveryRegistry#expungeStaleRevisionExportedURLs

ServiceDiscoveryRegistry#expungeStaleRevisionExportedURLs 用于清除过期的版本信息。其实现如下:

	// key 为 应用名,value 为 revision:提供的接口服务列表
    private final Map>> serviceRevisionExportedURLsCache = new linkedHashMap<>();

    private void expungeStaleRevisionExportedURLs(List serviceInstances) {

        String serviceName = serviceInstances.get(0).getServiceName();
        // revisionExportedURLsMap is mutable
        // 获取缓存的版本导出 Map
        Map> revisionExportedURLsMap = getRevisionExportedURLsMap(serviceName);
	
        if (revisionExportedURLsMap.isEmpty()) { // if empty, return immediately
            return;
        }
		// 获取缓存的 Revisions
        Set existedRevisions = revisionExportedURLsMap.keySet(); // read-only
        // 获取当前的 Revisions 集合
        Set currentRevisions = serviceInstances.stream()
                .map(ServiceInstancemetadataUtils::getExportedServicesRevision)
                .collect(Collectors.toSet());
        // staleRevisions = existedRevisions(copy) - currentRevisions
        Set staleRevisions = new HashSet<>(existedRevisions);
        // 移除当前的Revisions 集合
        staleRevisions.removeAll(currentRevisions);
        // remove exported URLs if staled
        // 移除非当前的 Revisions  集合。
        staleRevisions.forEach(revisionExportedURLsMap::remove);
    }
2.1.2 ServiceDiscoveryRegistry#initializeRevisionExportedURLs(List)

ServiceDiscoveryRegistry#initializeRevisionExportedURLs(List) 在这里会查找所有serviceInstances 暴露的服务 URL 并缓存到 ServiceDiscoveryRegistry#serviceRevisionExportedURLsCache中。
其中 ServiceDiscoveryRegistry#serviceRevisionExportedURLsCache 结构如下:

	// key 为 serviceName, 
	// value 为 revision :当前serviceName 和 当前 revision 下对应的暴露的服务 URL
    private final Map>> serviceRevisionExportedURLsCache = new linkedHashMap<>();

ServiceDiscoveryRegistry#initializeRevisionExportedURLs(List) 实现如下:

    private void initializeRevisionExportedURLs(List serviceInstances) {
        // initialize the revision exported URLs that the selected service instance exported
        // 初始化所选服务实例导出的修订导出 URL
        initializeSelectedRevisionExportedURLs(serviceInstances);
        // initialize the revision exported URLs that other service instances exported
        // 初始化其他服务实例导出的修订导出 URL
        serviceInstances.forEach(this::initializeRevisionExportedURLs);
    }

  1. ServiceDiscoveryRegistry#initializeSelectedRevisionExportedURLs

        private void initializeSelectedRevisionExportedURLs(List serviceInstances) {
            // Try to initialize revision exported URLs until success
            // 
            for (int i = 0; i < serviceInstances.size(); i++) {
                // select a instance of {@link ServiceInstance}
                // 筛选一个 ServiceInstance
                ServiceInstance selectedInstance = selectServiceInstance(serviceInstances);
                // 进行初始化
                List revisionExportedURLs = initializeRevisionExportedURLs(selectedInstance);
                if (isNotEmpty(revisionExportedURLs)) {    // If the result is valid
                    break;
                }
            }
        }
    	// 选择 ServiceInstance 
        private ServiceInstance selectServiceInstance(List serviceInstances) {
            int size = serviceInstances.size();
            if (size == 0) {
                return null;
            } else if (size == 1) {
                return serviceInstances.get(0);
            }
            // 默认实现 RandomServiceInstanceSelector,随机选择
            ServiceInstanceSelector selector = getExtensionLoader(ServiceInstanceSelector.class).getAdaptiveExtension();
            return selector.select(getUrl(), serviceInstances);
        }
    
  2. ServiceDiscoveryRegistry#initializeRevisionExportedURLs(ServiceInstance)

        private List initializeRevisionExportedURLs(ServiceInstance serviceInstance) {
    
            if (serviceInstance == null) {
                return emptyList();
            }
    
            String serviceName = serviceInstance.getServiceName();
            // 从 serviceInstance metadata 中获取 dubbo.exported-services.revision 的值
            String revision = getExportedServicesRevision(serviceInstance);
    		// 获取当前 serviceName 的缓存 map
            Map> revisionExportedURLsMap = getRevisionExportedURLsMap(serviceName);
    		// 尝试从缓存中获取同版本的 URL
            List revisionExportedURLs = revisionExportedURLsMap.get(revision);
    
            boolean firstGet = false;
    
            if (revisionExportedURLs == null) { // The hit is missing in cache
    			// 不为空则说明 具有不同修订的当前服务实例
                if (!revisionExportedURLsMap.isEmpty()) { // The case is that current ServiceInstance with the different revision
    				... 日志打印
                } else { // Else, it's the first time to get the exported URLs
                // 否则说明当前版本是第一次导出
                    firstGet = true;
                }
    			// 获取 serviceInstance中暴露的 URl
                revisionExportedURLs = getExportedURLs(serviceInstance);
    			// 不为空则使用 revision保存当前暴露的 URL
                if (revisionExportedURLs != null) { // just allow the valid result into exportedURLsMap
                    revisionExportedURLsMap.put(revision, revisionExportedURLs);
                    }
                }
            } else { // Else, The cache is hit
            }
    
            return revisionExportedURLs;
        }
    
2.2 cloneExportedURLs(subscribedURL, serviceInstances);

上一步已经获取到了 ServiceInstance 所暴露的 url。这一步则是 ServiceDiscoveryRegistry#cloneExportedURLs 会从 ServiceInstance 所包括的 URL 中筛选出满足当前服务接口的 URL,并进行克隆后返回。具体实现如下:

    private List cloneExportedURLs(URL subscribedURL, Collection serviceInstances) {
		// 为空直接返回
        if (isEmpty(serviceInstances)) {
            return emptyList();
        }

        List clonedExportedURLs = new linkedList<>();

        serviceInstances.forEach(serviceInstance -> {
			// 获取host 
            String host = serviceInstance.getHost();
			// getTemplateExportedURLs(subscribedURL, serviceInstance) 会 从 serviceInstance 暴露的 URl 中筛选出匹配 subscribedURL 的服务URL
            getTemplateExportedURLs(subscribedURL, serviceInstance)
                    .stream()
                    .map(templateURL -> templateURL.removeParameter(TIMESTAMP_KEY))
                    .map(templateURL -> templateURL.removeParameter(PID_KEY))
                    .map(templateURL -> {
                    	// 进行克隆
                        String protocol = templateURL.getProtocol();
                        int port = getProtocolPort(serviceInstance, protocol);
                        if (Objects.equals(templateURL.getHost(), host)
                                && Objects.equals(templateURL.getPort(), port)) { // use templateURL if equals
                            return templateURL;
                        }

                        URLBuilder clonedURLBuilder = from(templateURL) // remove the parameters from the template URL
                                .setHost(host)  // reset the host
                                .setPort(port); // reset the port

                        return clonedURLBuilder.build();
                    })
                    .forEach(clonedExportedURLs::add);
        });
        return clonedExportedURLs;
    }
    
3. registerServiceInstancesChangedListener

registerServiceInstancesChangedListener 会订阅 ServiceInstance 节点信息,当 ServiceInstance 实例有变化时可以感知并更新本地的缓存信息。

       registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {

           @Override
           public void onEvent(ServiceInstancesChangedEvent event) {
               subscribeURLs(url, listener, event.getServiceName(), new ArrayList<>(event.getServiceInstances()));
           }
       });

其中 ServiceDiscoveryRegistry#registerServiceInstancesChangedListener 的实现如下:

	// url 为消费者消费的 URl
    private void registerServiceInstancesChangedListener(URL url, ServiceInstancesChangedListener listener) {
    	// 创建监听 id 。如 : simple-provider:consumer://192.168.110.57/com.kingfish.service.ProviderService?group=spring&version=2.0.0
        String listenerId = createListenerId(url, listener);
        if (registeredListeners.add(listenerId)) {
        	// 监听服务实例,
        	// 如zk,这里调用 EventPublishingServiceDiscovery -> ZookeeperServiceDiscovery 监听 /services/{应用名} 节点, 如/services/simple-provider 
            serviceDiscovery.addServiceInstancesChangedListener(listener);
        }
    }
    
    private String createListenerId(URL url, ServiceInstancesChangedListener listener) {
        return listener.getServiceName() + ":" + url.toString(VERSION_KEY, GROUP_KEY, PROTOCOL_KEY);
    }

这里可以看到, 消费者通过 EventPublishingServiceDiscovery#addServiceInstancesChangedListener -> ZookeeperServiceDiscovery#addServiceInstancesChangedListener 监听注册中心上的应用节点(/service/{应用名称}),当节点发生变化时调用 ServiceDiscoveryRegistry#subscribeURLs(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.lang.String, java.util.Collection) 来更新本地 URL 列表。

五、总结

注册中心、元数据中心、配置中心的三者的配合逻辑,如下:

  1. 首先获取 配置中心 上的映射节点,来获取提供该接口服务的应用名称。
  2. 根据获取到的应用名称,获取注册中心上的应用元数据。
  3. 根据应用元数据,确定应用的元数据中心地址,并从元数据中心获取接口服务的URL信息。

其中关于配置中心,在 Apache-Dubbo-服务自省架构设计 中提到过本地化映射配置,但【该特性并未在最新 Dubbo 2.7.6 全面发布,部分特性已在 Dubbo Spring Cloud 中发布】。


整个消费者逻辑的总结如下:

  1. 消费者服务启用后,要引用 Dubbo 接口服务 A。则程序会通过 ReferenceBean#get 来获取A的代理对象 AProxy,该代理对象会建立与提供者的服务的网络连接,所有的调用都会通过该代理对象发送给提供者并接收结果。
  2. ReferenceBean#get 通过 ReferenceConfig#createProxy 来创建代理对象。
  3. 在 ReferenceConfig#createProxy 中首先会获取 注册中心地址。此时由于我们设置了 dubbo.registry.parameters.registry-type = service,所以注册中心的协议类型为 service-discovery-registry 。所以会通过 ServiceDiscoveryRegistryProtocol#refer 来进行下一步处理。
  4. ServiceDiscoveryRegistryProtocol#refer 中首先注册自身、构建路由链。随后通过 ServiceDiscoveryRegistryProtocol#subscribe 来订阅服务。
  5. 在 ServiceDiscoveryRegistryProtocol#subscribe 中会通过 ServiceDiscoveryRegistry#getServices 从配置中心节点获取到服务接口和应用的映射关系,这一步结束后便获取到提供当前接口服务的应用名集合。
  6. 随后通过 ServiceDiscoveryRegistry#subscribeURLs 来进行订阅。
  7. ServiceDiscoveryRegistry#subscribeURLs 中首先会通过应用名从注册中心上获取应用信息构建出来应用实例对象。
  8. 之后会清除自身过期的缓存接口服务信息,并通过应用实例来获取到元数据中心地址,从元数据中心获取到当前应用提供的接口服务URL并克隆到本地。至此,消费者获取到了应用实例提供的所有接口服务URL。
  9. 之后通过 ServiceDiscoveryRegistry#registerServiceInstancesChangedListener 对应用实例进行订阅,当应用配置改变时,消费者可以感知并更新本地缓存配置。

以上:内容部分参考
Apache dubbo 服务自省架构设计
如有侵扰,联系删除。 内容仅用于自我记录学习使用。如有错误,欢迎指正

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5606759.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存