Dubbo参数回调

Dubbo参数回调,第1张

Dubbo参数回调

dubbo参数回调配置:


        
            
        
    

在开发中会出现callback的实例超限情况。对于每个连接,即每个channel默认回调的实例数为1.如果超过该值,则会报错。

 回调处理类CallbackServiceCodec源代码分析:

客户端:

private static String exportOrUnexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException {
    // 获取回调实例的hash code
    int instid = System.identityHashCode(inst);

    Map params = new HashMap<>(3);
    // no need to new client again
    params.put(IS_SERVER_KEY, Boolean.FALSE.toString());
    // mark it's a callback, for troubleshooting
    params.put(IS_CALLBACK_SERVICE, Boolean.TRUE.toString());
    String group = (url == null ? null : url.getParameter(GROUP_KEY));
    if (group != null && group.length() > 0) {
        params.put(GROUP_KEY, group);
    }
    // add method, for verifying against method, automatic fallback (see dubbo protocol)
    params.put(METHODS_KEY, StringUtils.join(Wrapper.getWrapper(clazz).getDeclaredMethodNames(), ","));

    Map tmpMap = new HashMap<>(url.getParameters());
    tmpMap.putAll(params);
    tmpMap.remove(VERSION_KEY);// doesn't need to distinguish version for callback
    tmpMap.put(INTERFACE_KEY, clazz.getName());
    URL exportUrl = new URL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(), channel.getLocalAddress().getPort(), clazz.getName() + "." + instid, tmpMap);

    // no need to generate multiple exporters for different channel in the same JVM, cache key cannot collide.
    // 该Channel关联的回调实例的key
    String cacheKey = getClientSideCallbackServiceCacheKey(instid);
    // 该Channel关联的回调实例的数量
    String countKey = getClientSideCountKey(clazz.getName());
    if (export) {
        // one channel can have multiple callback instances, no need to re-export for different instance.
        // 如果已经存在
        if (!channel.hasAttribute(cacheKey)) {
             // 判断实例数是否超限
            if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {
                Invoker invoker = PROXY_FACTORY.getInvoker(inst, clazz, exportUrl);
                // should destroy resource?
                Exporter exporter = PROTOCOL.export(invoker);
                // this is used for tracing if instid has published service or not.
                channel.setAttribute(cacheKey, exporter);
                logger.info("Export a callback service :" + exportUrl + ", on " + channel + ", url is: " + url);
                increaseInstanceCount(channel, countKey);
            }
        }
    } else {
        if (channel.hasAttribute(cacheKey)) {
            Exporter exporter = (Exporter) channel.getAttribute(cacheKey);
            exporter.unexport();
            channel.removeAttribute(cacheKey);
            decreaseInstanceCount(channel, countKey);
        }
    }
    return String.valueOf(instid);
}

首先会根据回调对象实例得到一个唯一的instid,不同的回调对象有不同的instid。当第一次执行时:

callback.service.instid.com.tiger.service.CallbackListener.COUNT=1 

callback.service.instid.1639044302=

{DubboExporter@9643} "interface com.tiger.service.CallbackListener

当第二次调用时: 

cacheKey=callback.service.instid.1862630585
countKey=callback.service.instid.com.tiger.service.CallbackListener.COUNT

由于cacheKey不存在,则会调用isInstancesOverLimit()方法,会将callback.service.instid.com.tiger.service.CallbackListener.COUNT中的值取出来和limit比较,如果大于等于limit,则抛出异常,否则callback.service.instid.com.tiger.service.CallbackListener.COUNT对应的值加1。如果回调对象每次都是同一个,则不会进入isInstancesOverLimit()方法。

服务端源代码:

private static Object referOrdestroyCallbackService(Channel channel, URL url, Class clazz, Invocation inv, int instid, boolean isRefer) {
    Object proxy = null;
    String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid);
    String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid);
    proxy = channel.getAttribute(proxyCacheKey);
    String countkey = getServerSideCountKey(channel, clazz.getName());
    if (isRefer) {
        if (proxy == null) {
            URL referurl = URL.valueOf("callback://" + url.getAddress() + "/" + clazz.getName() + "?" + Constants.INTERFACE_KEY + "=" + clazz.getName());
            referurl = referurl.addParametersIfAbsent(url.getParameters()).removeParameter(Constants.METHODS_KEY);
            // 判断Channel对应的实例数是否超限
            if (!isInstancesOverLimit(channel, referurl, clazz.getName(), instid, true)) {
                @SuppressWarnings("rawtypes")
                Invoker invoker = new ChannelWrappedInvoker(clazz, channel, referurl, String.valueOf(instid));
                proxy = proxyFactory.getProxy(invoker);
                channel.setAttribute(proxyCacheKey, proxy);
                channel.setAttribute(invokerCacheKey, invoker);
                increaseInstanceCount(channel, countkey);

                //convert error fail fast .
                //ignore concurrent problem. 
                Set> callbackInvokers = (Set>) channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY);
                if (callbackInvokers == null) {
                    callbackInvokers = new ConcurrentHashSet>(1);
                    callbackInvokers.add(invoker);
                    channel.setAttribute(Constants.CHANNEL_CALLBACK_KEY, callbackInvokers);
                }
                logger.info("method " + inv.getMethodName() + " include a callback service :" + invoker.getUrl() + ", a proxy :" + invoker + " has been created.");
            }
        }
    } else {
        if (proxy != null) {
            Invoker invoker = (Invoker) channel.getAttribute(invokerCacheKey);
            try {
                Set> callbackInvokers = (Set>) channel.getAttribute(Constants.CHANNEL_CALLBACK_KEY);
                if (callbackInvokers != null) {
                    callbackInvokers.remove(invoker);
                }
                invoker.destroy();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            // cancel refer, directly remove from the map
            channel.removeAttribute(proxyCacheKey);
            channel.removeAttribute(invokerCacheKey);
            decreaseInstanceCount(channel, countkey);
        }
    }
    return proxy;
}

 每个连接,即Channel绑定的callback实例数可以配置,默认为1。所以每个Channel绑定的实例数取决于最小的那个配置。

当前Dubbo尚未提供取消参数回调的方式。在开发中,如果回调实例数被限制,则回调实例可以被共享。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5655954.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存