Dubbo学习记录

Dubbo学习记录,第1张

服务调用的前置学习【二】

服务调用涉及到的东西比较多, 需要一个个的理解透彻, 最终才能串起来;

服务端DubboInvoker的包装

DubboInvoker的生成是在服务导出的过程中创建的;由于Dubbo的SPI机制, DubboInvoker的生成会被多个包装类处理;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 当前服务没有被导出并且没有卸载,才导出服务
        if (!isExported() && !isUnexported()) {
            // 服务导出(服务注册)
            export();
        }
    }

最终层层调用,在ServiceConfig的doExportUrlsFor1Protocol会创建一个Invoker实例;

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
		//...省略部分代码;
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
  
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    // 如果有注册中心,则将服务注册到注册中心
                    for (URL registryURL : registryURLs) {

                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

                        // DelegateProviderMetaDataInvoker也表示服务提供者,包括了Invoker和服务的配置
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

这个Invoker是通过PROXY_FACTORY工厂生成, ProxyFactory是一个接口扩展点, 接口上面指定了扩展点实现类的名称;即使用的JavaasistProxyFactory来生成这个代理实例;
getInvoker方法传了几个重要参数:

  • ref :代表服务实现类;
  • interfaceClass: 代表实现的接口;
  • registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()) : 服务的注册中心URL
ProxyFactory动态代理扩展点
@SPI("javassist")
public interface ProxyFactory {
	//省略部分代码
    @Adaptive({PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

已知的扩展点实现类有:

  • org.apache.dubbo.rpc.proxy.JdkProxyFactory
    该动态代理扩展实现类使用的JDK动态代理生成代理实例;
  • org.apache.dubbo.rpc.proxy.JavassistProxyFactory
    该动态代理实例使用的是Javassit技术生成代理实例;
第一层: AbstractProxyInvoker

调用ProxyFactory#getInvoker最终返回了一个AbstractProxyInvoker

JavassistProxyFactory
public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {

                // 执行proxy的method方法
                // 执行的proxy实例的方法
                // 如果没有wrapper,则要通过原生的反射技术去获取Method对象,然后执行
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

  • wrapper.getWrapper方法, 会根据接口类获取一个wrapper对象, 如果已经创建过,则直接从缓存中拿到Wrapper实例,如果没有,则通过Javaassist技术,在makeWrapper方法中,使用ClassGenerator生成一个Java动态代理类;
  • 动态代理类中的代理方法invokeMethod中, 调用实现类的业务方法;
	Wrapper
    public static Wrapper getWrapper(Class<?> c) {
    	//省略部分代码;
        Wrapper ret = WRAPPER_MAP.get(c);
        if (ret == null) {
            ret = makeWrapper(c);
            WRAPPER_MAP.put(c, ret);
        }
        return ret;
    }

invokeMethod涉及的几大参数:

  • proxy : 实现类
  • methodName : 接口方法名;
  • parameterTypes : 参数类型
  • arguments : 参数值;

最终生成动态代理实例返回,该代理实例的类型为AbstractProxyInvoker,这个Invoker是与我们的实现类ref直接挂钩, 也就是最底层的Invoker;

JDK扩展点实现类JdkProxyFactory
  • 需要根据methodName和paramterTypes从获取类的Method元数据, 再通过Method.invoke(proxy,arguments)执行业务方法;这是反射的常见 *** 作;
  • 另外提一点: 执行业务方法的前后过程中,可以增加一些前置增强,后置增强(这就是AOP);
public class JdkProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }

}

最终生成动态代理实例返回,该代理实例的类型为AbstractProxyInvoker,这个Invoker是与我们的实现类ref直接挂钩, 也就是最底层的Invoker;

第二层:DelegateProviderMetaDataInvoker

调用完PROXY_FACTORY返回一个AbstractProxyInvoker实例invoker, 将当前ServiceConfig类实例和invoker作为参数, 创建DelegateProviderMetaDataInvoker实例wrapperInvoker;

                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

                        // DelegateProviderMetaDataInvoker也表示服务提供者,包括了Invoker和服务的配置
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Protocol扩展点

调用Protocol#export方法导出服务;

DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);Exporter<?> exporter = protocol.export(wrapperInvoker);
@SPI("dubbo")
public interface Protocol {
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
	//省略部分代码
}

Protocol是一个扩展点, 而Protocol的扩展点实现类默认是DubboProtocol,因此会调用DubboProtocol#export方法导出服务, Dubbo的SPI机制在调用DubboProtocol扩展点实现类的过程中,会先调用Protocol的包装扩展点实现类进行处理
Protocol的包装类扩展点:

  • ProtocolFilterWrapper :会包装Invoker, 常见一系列过滤器拦截处理;
  • ProtocolListenerWrapper : 协议监听器处理;
  • QosProtocolWrapper : 这个包装实现类用到的场景比较少;

Protocol#export的调用链路如下

  1. ProtocolFilterWrapper#export
  2. ProtocolListenerWrapper#export
  3. QosProtocolWrapper#export
  4. DubboProtocol#export
第三层:CallbackRegistrationInvoker实例与过滤器链处理

调用ProtocolFilterWrapper#export会创建过滤器链处理, 进行拦截处理;

public class ProtocolFilterWrapper implements Protocol {
    private final Protocol protocol;
	
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
       //调用buildInvokerChain生成CallbackRegistrationInvoker实例
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }
    //省略了buildInvokerChain方法;
 }
  • 调用buildInvokerChain生成CallbackRegistrationInvoker实例;该实例包装了第二层的DelegateProviderMetaDataInvoker实例与过滤器链; 参数如下:
  • DelegateProviderMetaDataInvoker实例invoker
  • SERVICE_FILTER_KEY : 常量service.filter
  • CommonConstants.PROVIDER : 常量provider

buildInvokerChain的流程:

  1. 获取Filter的扩展点实现类, 其组名为provider的实现类;
  2. 遍历获取到的Filter;每个Filter都会创建一个Invoker, 每个Invoker#invoke方法中都会调用filter#invoke方法处理;返回一个异步结果;
  3. 最终创建CallbackRegistrationInvoker, 传入最外层Filter对应的Invoker实例, 与filters过滤器实例;
public class ProtocolFilterWrapper implements Protocol {
    private final Protocol protocol;
	
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
       //调用buildInvokerChain生成CallbackRegistrationInvoker实例
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        // 根据url获取filter,根据url中的parameters取key为key的value所对应的filter,但是还会匹配group
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                //省略部分代码
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            // 得到一个异步结果
                            asyncResult = filter.invoke(next, invocation);
                        } catch (Exception e) {
                        //异常处理
                        }
                        return asyncResult;
                    }
                };
            }
        }

        return new CallbackRegistrationInvoker<>(last, filters);
    }

}
Filter扩展点
  • Filter扩展文件中,包含了以下的扩展点实现类, “#" 与数值是我加上去的,代表的是组名为Provier的Filter; 数值的负数越小,就越先被先用;
  • 服务提供方和服务消费方调用过程拦截,Dubbo 本身的大多功能均基于此扩展点实现,每次远程方法执行,该拦截都会被执行,对性能有影响;
  • 开发中,如果我们想要新增Filter,只需要实现了Filter接口就可以, 调用顺序在内置的Filter之后;
echo=org.apache.dubbo.rpc.filter.EchoFilter #-110000
generic=org.apache.dubbo.rpc.filter.GenericFilter # -20000
genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
token=org.apache.dubbo.rpc.filter.TokenFilter # 无
accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter #无
activelimit=org.apache.dubbo.rpc.filter.ActiveLimitFilter
classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter #-30000
context=org.apache.dubbo.rpc.filter.ContextFilter # order = -10000
consumercontext=org.apache.dubbo.rpc.filter.ConsumerContextFilter
exception=org.apache.dubbo.rpc.filter.ExceptionFilter #无
executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter #无
deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
compatible=org.apache.dubbo.rpc.filter.CompatibleFilter
timeout=org.apache.dubbo.rpc.filter.TimeoutFilter #无

调用顺序:

  1. EchoFilter#invoke
    提供回声功能, 即客户端判断服务是不是可用的;如果方法名是echo, 则调用结束, 返回结果是参数的第一个值;
@Activate(group = CommonConstants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
            return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
        }
        return invoker.invoke(inv);
    }

}
  1. ClassLoaderFilter#invoke
    类加载过滤器: 设置当前线程的类加载器, 为Invoker的代理的接口类interface的类加载器。
@Activate(group = CommonConstants.PROVIDER, order = -30000)
public class ClassLoaderFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        ClassLoader ocl = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
        try {
            return invoker.invoke(invocation);
        } finally {
            Thread.currentThread().setContextClassLoader(ocl);
        }
    }
}
  1. GenericFilter#invoke
    Dubbo的泛化服务使用; 暂时没有用到;
  2. ContextFilter#invoke
    服务的RpcContext上下文对象 设置Invoker, 参数信息invocation, 与IP端口号信息;
@Activate(group = PROVIDER, order = -10000)
public class ContextFilter extends ListenableFilter {
    private static final String TAG_KEY = "dubbo.tag";
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Map<String, String> attachments = invocation.getAttachments();
        if (attachments != null) {
            attachments = new HashMap<>(attachments);
            attachments.remove(PATH_KEY);
            attachments.remove(INTERFACE_KEY);
            attachments.remove(GROUP_KEY);
            attachments.remove(VERSION_KEY);
            attachments.remove(DUBBO_VERSION_KEY);
            attachments.remove(TOKEN_KEY);
            attachments.remove(TIMEOUT_KEY);
            // Remove async property to avoid being passed to the following invoke chain.
            attachments.remove(ASYNC_KEY);
            attachments.remove(TAG_KEY);
            attachments.remove(FORCE_USE_TAG);
        }
        RpcContext context = RpcContext.getContext();

        context.setInvoker(invoker)
                .setInvocation(invocation)
//                .setAttachments(attachments)  // merged from dubbox
                .setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
        String remoteApplication = invocation.getAttachment(REMOTE_APPLICATION_KEY);
        if (StringUtils.isNotEmpty(remoteApplication)) {
            context.setRemoteApplicationName(remoteApplication);
        } else {
            context.setRemoteApplicationName(RpcContext.getContext().getAttachment(REMOTE_APPLICATION_KEY));

        }
		
        if (attachments != null) {
            if (RpcContext.getContext().getAttachments() != null) {
                RpcContext.getContext().getAttachments().putAll(attachments);
            } else {
                RpcContext.getContext().setAttachments(attachments);
            }
        }
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
        }
    }
}
  1. TimeoutFilter#invoke
    服务端的超时处理, 只会打印日志, 而不会终止服务的运行;
  • invoke 方法中设置开始时间;
  • TimeoutListener#onResponse中, 会根据当前时间 - 开始时间 得出业务执行时间, 然后在和timeout配置比较, 大于timeout设置的超时时间, 就会打印警告日志;
@Activate(group = CommonConstants.PROVIDER)
public class TimeoutFilter extends ListenableFilter {

    private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);

    private static final String TIMEOUT_FILTER_START_TIME = "timeout_filter_start_time";

    public TimeoutFilter() {
        super.listener = new TimeoutListener();
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        invocation.setAttachment(TIMEOUT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        return invoker.invoke(invocation);
    }

    static class TimeoutListener implements Listener {

        @Override
        public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
            String startAttach = invocation.getAttachment(TIMEOUT_FILTER_START_TIME);
            if (startAttach != null) {
                long elapsed = System.currentTimeMillis() - Long.valueOf(startAttach);
                if (invoker.getUrl() != null && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE)) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("invoke time out. method: " + invocation.getMethodName() + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
                    }
                }
            }
        }

    }
}
  1. ExceptionFilter#invoke
服务端Invoker的调用顺序

通过以上,可以得出Invoker的调用顺序。

  1. CallbackRegistrationInvoker#invoke
    1.1 EchoFilter#invoke
    1.2 ClassLoaderFilter#invoke
    1.3 GenericFilter#invoke
    1.4 ContextFilter#invoke
    1.5 TimeoutFilter#invoke
    1.6 ExceptionFilter#invoke
  2. DelegateProviderMetaDataInvoker#invoke
  3. JavassitProxyFactory#AbstractProxyInvoker#invoke
启动服务器

服务导出时,

  • 服务信息转换为URL放入注册中心
    这个步骤调用了父类Protocol父类AbstractProxyProtocol的export方法执行的;
  • 启动Netty服务器;
    这个步骤调用了DubboProtocol#export方法执行的;
    会把invoker包装为一个Exporter实例, 放入缓存中;服务请求时,从缓存中拿到exporter去执行;
	DubboProtocol#export
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        // export service.
        String key = serviceKey(url);
        // 构造一个Exporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
        // 开启NettyServer
        openServer(url);
        return exporter;
    }
    //判断容器中是否存在Netty服务器, 存在就重新导出,不存在即调用createServer创建NettyServer服务器;
    private void openServer(URL url) {
        String key = url.getAddress(); 
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
            }
        }
    }
    //调用 Exchangers.bind(url, requestHandler)创建服务器;
    private ExchangeServer createServer(URL url) {
        url = ...;
        //省略部分代码;
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
        }
        return server;
    }

Exchangers#bind方法:

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        // codec表示协议编码方式
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // 通过url得到HeaderExchanger, 利用HeaderExchanger进行bind,将得到一个HeaderExchangeServer
        return getExchanger(url).bind(url, handler);
    }
    public static Exchanger getExchanger(URL url) {
    	//type的值为header
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }
    type的值为header, 
    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

最终会通过SPI机制, 拿到扩展为header的扩展点实现类;

Exchanger扩展点

用于创建请求交换的HeaderExchangerServer服务器;

@SPI(HeaderExchanger.NAME)
public interface Exchanger {

    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

}

Exchanger的扩展点实现类只有一个:

  • HeaderExchanger
header=org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger
HeaderExchanger

用来创建ExchangeServer /ExchangeClient 实例;

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

在创建HeaderExchangerServer时, 调用了Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))方法;又涉及到了Transporter扩展点;

Transporter扩展点

获取一个Server实例返回;

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    //省略部分代码
        // 调用NettyTransporter去绑定,Transporter表示网络传输层
        return getTransporter().bind(url, handler);
    }
  	public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

会通过SPI机制,拿到Transporter的扩展点实现类实例;再调用bind方法;

Transporter接口上使用@SPI注解, 表示默认使用哪个扩展点实现类;

@SPI("netty")
public interface Transporter {

    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

Transporter扩展点实现类:
在dubbo-remoting-netty模块中只有一个NettyTransporter

netty3=org.apache.dubbo.remoting.transport.netty.NettyTransporter

在dubbo-remoting-grizzly模块中,只有一个GrizzlyTransporter

grizzly=org.apache.dubbo.remoting.transport.grizzly.GrizzlyTransporter

而在NettyTransporter中,bind方法里面,会创建NettyServer返回;

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}

启动完服务器后, 最终获取的Server类型HeaderExchangerServer, 其内部包装了一个NettyServer;

HeaderExchangerServer与NettyServer


NettyClient和NettyServer之间发送和接收数据;
那么为什么还要在加个HeaderExchangerServer和HeaderExchangerClient呢?

  • HeaderExchangerClient发送数据的时候, 会构造一个Requset请求对象发送给服务端;
  • HeaderExchangerServer会构造一个Response响应数据实例发送给客户端;
    个人看法就是 : 在数据交换层之上封装了一个请求/响应层协议;类比TCP传输层协议, HTTP是应用层协议一样;

EchoFilter#invoke方法, 该类是回声功能, 判断服务是否可用; 如果可用会调用AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv)返回;

@Activate(group = CommonConstants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
            return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
        }
        return invoker.invoke(inv);
    }

}

AsyncRpcResult#newDefaultAsyncResult

  • 会创建一个AppResponse实例,设置返回值, 调用 asyncRpcResult.complete(appResponse)表示处理结束,返回;
  • 这个AppResponse实例代表响应数据;
    public static AsyncRpcResult newDefaultAsyncResult(Invocation invocation) {
        return newDefaultAsyncResult(null, null, invocation);
    }

    public static AsyncRpcResult newDefaultAsyncResult(Object value, Invocation invocation) {
        return newDefaultAsyncResult(value, null, invocation);
    }

    public static AsyncRpcResult newDefaultAsyncResult(Throwable t, Invocation invocation) {
        return newDefaultAsyncResult(null, t, invocation);
    }

    public static AsyncRpcResult newDefaultAsyncResult(Object value, Throwable t, Invocation invocation) {
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
        AppResponse appResponse = new AppResponse();
        if (t != null) {
            appResponse.setException(t);
        } else {
            appResponse.setValue(value);
        }
        asyncRpcResult.complete(appResponse);
        return asyncRpcResult;
    }
}

再看看Request请求数据;
HeaderExchangerChannel#send方法:如果是Request就直接发送数据了。如果不是,也会构建一个Request实例,发送请求数据;

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
        }
        if (message instanceof Request
                || message instanceof Response
                || message instanceof String) {
            channel.send(message, sent);
        } else {
            Request request = new Request();
            request.setVersion(Version.getProtocolVersion());
            request.setTwoWay(false);
            request.setData(message);
            channel.send(request, sent);
        }
    }

最终再由NettyChannel通道去传输数据, 这就是个人看法了。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/800402.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存