服务调用涉及到的东西比较多, 需要一个个的理解透彻, 最终才能串起来;
服务端DubboInvoker的包装DubboInvoker的生成是在服务导出的过程中创建的;由于Dubbo的SPI机制, DubboInvoker的生成会被多个包装类处理;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 当前服务没有被导出并且没有卸载,才导出服务
if (!isExported() && !isUnexported()) {
// 服务导出(服务注册)
export();
}
}
最终层层调用,在ServiceConfig的doExportUrlsFor1Protocol会创建一个Invoker实例;
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//...省略部分代码;
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
// 如果有注册中心,则将服务注册到注册中心
for (URL registryURL : registryURLs) {
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker也表示服务提供者,包括了Invoker和服务的配置
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
这个Invoker是通过PROXY_FACTORY工厂生成, ProxyFactory是一个接口扩展点, 接口上面指定了扩展点实现类的名称;即使用的JavaasistProxyFactory来生成这个代理实例;
getInvoker方法传了几个重要参数:
- ref :代表服务实现类;
- interfaceClass: 代表实现的接口;
- registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()) : 服务的注册中心URL
@SPI("javassist")
public interface ProxyFactory {
//省略部分代码
@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
已知的扩展点实现类有:
- org.apache.dubbo.rpc.proxy.JdkProxyFactory
该动态代理扩展实现类使用的JDK动态代理生成代理实例; - org.apache.dubbo.rpc.proxy.JavassistProxyFactory
该动态代理实例使用的是Javassit技术生成代理实例;
调用ProxyFactory#getInvoker最终返回了一个AbstractProxyInvoker
JavassistProxyFactorypublic class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 执行proxy的method方法
// 执行的proxy实例的方法
// 如果没有wrapper,则要通过原生的反射技术去获取Method对象,然后执行
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
- wrapper.getWrapper方法, 会根据接口类获取一个wrapper对象, 如果已经创建过,则直接从缓存中拿到Wrapper实例,如果没有,则通过Javaassist技术,在makeWrapper方法中,使用ClassGenerator生成一个Java动态代理类;
- 动态代理类中的代理方法invokeMethod中, 调用实现类的业务方法;
Wrapper
public static Wrapper getWrapper(Class<?> c) {
//省略部分代码;
Wrapper ret = WRAPPER_MAP.get(c);
if (ret == null) {
ret = makeWrapper(c);
WRAPPER_MAP.put(c, ret);
}
return ret;
}
invokeMethod涉及的几大参数:
- proxy : 实现类
- methodName : 接口方法名;
- parameterTypes : 参数类型
- arguments : 参数值;
最终生成动态代理实例返回,该代理实例的类型为AbstractProxyInvoker,这个Invoker是与我们的实现类ref直接挂钩, 也就是最底层的Invoker;
JDK扩展点实现类JdkProxyFactory- 需要根据methodName和paramterTypes从获取类的Method元数据, 再通过Method.invoke(proxy,arguments)执行业务方法;这是反射的常见 *** 作;
- 另外提一点: 执行业务方法的前后过程中,可以增加一些前置增强,后置增强(这就是AOP);
public class JdkProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}
}
最终生成动态代理实例返回,该代理实例的类型为AbstractProxyInvoker,这个Invoker是与我们的实现类ref直接挂钩, 也就是最底层的Invoker;
第二层:DelegateProviderMetaDataInvoker调用完PROXY_FACTORY返回一个AbstractProxyInvoker实例invoker, 将当前ServiceConfig类实例和invoker作为参数, 创建DelegateProviderMetaDataInvoker实例wrapperInvoker;
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker也表示服务提供者,包括了Invoker和服务的配置
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Protocol扩展点
调用Protocol#export方法导出服务;
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
出
Exporter<?> exporter = protocol.export(wrapperInvoker);
@SPI("dubbo")
public interface Protocol {
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
//省略部分代码
}
Protocol是一个扩展点, 而Protocol的扩展点实现类默认是DubboProtocol,因此会调用DubboProtocol#export方法导出服务, Dubbo的SPI机制在调用DubboProtocol扩展点实现类的过程中,会先调用Protocol的包装扩展点实现类进行处理
Protocol的包装类扩展点:
- ProtocolFilterWrapper :会包装Invoker, 常见一系列过滤器拦截处理;
- ProtocolListenerWrapper : 协议监听器处理;
- QosProtocolWrapper : 这个包装实现类用到的场景比较少;
Protocol#export的调用链路如下
- ProtocolFilterWrapper#export
- ProtocolListenerWrapper#export
- QosProtocolWrapper#export
- DubboProtocol#export
调用ProtocolFilterWrapper#export会创建过滤器链处理, 进行拦截处理;
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
//调用buildInvokerChain生成CallbackRegistrationInvoker实例
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
//省略了buildInvokerChain方法;
}
- 调用buildInvokerChain生成CallbackRegistrationInvoker实例;该实例包装了第二层的DelegateProviderMetaDataInvoker实例与过滤器链; 参数如下:
- DelegateProviderMetaDataInvoker实例invoker
- SERVICE_FILTER_KEY : 常量service.filter
- CommonConstants.PROVIDER : 常量provider
buildInvokerChain的流程:
- 获取Filter的扩展点实现类, 其组名为provider的实现类;
- 遍历获取到的Filter;每个Filter都会创建一个Invoker, 每个Invoker#invoke方法中都会调用filter#invoke方法处理;返回一个异步结果;
- 最终创建CallbackRegistrationInvoker, 传入最外层Filter对应的Invoker实例, 与filters过滤器实例;
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
//调用buildInvokerChain生成CallbackRegistrationInvoker实例
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 根据url获取filter,根据url中的parameters取key为key的value所对应的filter,但是还会匹配group
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
//省略部分代码
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// 得到一个异步结果
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
//异常处理
}
return asyncResult;
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
}
Filter扩展点
- Filter扩展文件中,包含了以下的扩展点实现类, “#" 与数值是我加上去的,代表的是组名为Provier的Filter; 数值的负数越小,就越先被先用;
- 服务提供方和服务消费方调用过程拦截,Dubbo 本身的大多功能均基于此扩展点实现,每次远程方法执行,该拦截都会被执行,对性能有影响;
- 开发中,如果我们想要新增Filter,只需要实现了Filter接口就可以, 调用顺序在内置的Filter之后;
echo=org.apache.dubbo.rpc.filter.EchoFilter #-110000
generic=org.apache.dubbo.rpc.filter.GenericFilter # -20000
genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
token=org.apache.dubbo.rpc.filter.TokenFilter # 无
accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter #无
activelimit=org.apache.dubbo.rpc.filter.ActiveLimitFilter
classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter #-30000
context=org.apache.dubbo.rpc.filter.ContextFilter # order = -10000
consumercontext=org.apache.dubbo.rpc.filter.ConsumerContextFilter
exception=org.apache.dubbo.rpc.filter.ExceptionFilter #无
executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter #无
deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
compatible=org.apache.dubbo.rpc.filter.CompatibleFilter
timeout=org.apache.dubbo.rpc.filter.TimeoutFilter #无
调用顺序:
- EchoFilter#invoke
提供回声功能, 即客户端判断服务是不是可用的;如果方法名是echo, 则调用结束, 返回结果是参数的第一个值;
@Activate(group = CommonConstants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
}
return invoker.invoke(inv);
}
}
- ClassLoaderFilter#invoke
类加载过滤器: 设置当前线程的类加载器, 为Invoker的代理的接口类interface的类加载器。
@Activate(group = CommonConstants.PROVIDER, order = -30000)
public class ClassLoaderFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
ClassLoader ocl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
try {
return invoker.invoke(invocation);
} finally {
Thread.currentThread().setContextClassLoader(ocl);
}
}
}
- GenericFilter#invoke
Dubbo的泛化服务使用; 暂时没有用到; - ContextFilter#invoke
服务的RpcContext上下文对象 设置Invoker, 参数信息invocation, 与IP端口号信息;
@Activate(group = PROVIDER, order = -10000)
public class ContextFilter extends ListenableFilter {
private static final String TAG_KEY = "dubbo.tag";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Map<String, String> attachments = invocation.getAttachments();
if (attachments != null) {
attachments = new HashMap<>(attachments);
attachments.remove(PATH_KEY);
attachments.remove(INTERFACE_KEY);
attachments.remove(GROUP_KEY);
attachments.remove(VERSION_KEY);
attachments.remove(DUBBO_VERSION_KEY);
attachments.remove(TOKEN_KEY);
attachments.remove(TIMEOUT_KEY);
// Remove async property to avoid being passed to the following invoke chain.
attachments.remove(ASYNC_KEY);
attachments.remove(TAG_KEY);
attachments.remove(FORCE_USE_TAG);
}
RpcContext context = RpcContext.getContext();
context.setInvoker(invoker)
.setInvocation(invocation)
// .setAttachments(attachments) // merged from dubbox
.setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
String remoteApplication = invocation.getAttachment(REMOTE_APPLICATION_KEY);
if (StringUtils.isNotEmpty(remoteApplication)) {
context.setRemoteApplicationName(remoteApplication);
} else {
context.setRemoteApplicationName(RpcContext.getContext().getAttachment(REMOTE_APPLICATION_KEY));
}
if (attachments != null) {
if (RpcContext.getContext().getAttachments() != null) {
RpcContext.getContext().getAttachments().putAll(attachments);
} else {
RpcContext.getContext().setAttachments(attachments);
}
}
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
return invoker.invoke(invocation);
} finally {
}
}
}
- TimeoutFilter#invoke
服务端的超时处理, 只会打印日志, 而不会终止服务的运行;
- invoke 方法中设置开始时间;
- TimeoutListener#onResponse中, 会根据当前时间 - 开始时间 得出业务执行时间, 然后在和timeout配置比较, 大于timeout设置的超时时间, 就会打印警告日志;
@Activate(group = CommonConstants.PROVIDER)
public class TimeoutFilter extends ListenableFilter {
private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);
private static final String TIMEOUT_FILTER_START_TIME = "timeout_filter_start_time";
public TimeoutFilter() {
super.listener = new TimeoutListener();
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
invocation.setAttachment(TIMEOUT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
return invoker.invoke(invocation);
}
static class TimeoutListener implements Listener {
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
String startAttach = invocation.getAttachment(TIMEOUT_FILTER_START_TIME);
if (startAttach != null) {
long elapsed = System.currentTimeMillis() - Long.valueOf(startAttach);
if (invoker.getUrl() != null && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE)) {
if (logger.isWarnEnabled()) {
logger.warn("invoke time out. method: " + invocation.getMethodName() + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
}
}
}
}
}
}
- ExceptionFilter#invoke
通过以上,可以得出Invoker的调用顺序。
- CallbackRegistrationInvoker#invoke
1.1 EchoFilter#invoke
1.2 ClassLoaderFilter#invoke
1.3 GenericFilter#invoke
1.4 ContextFilter#invoke
1.5 TimeoutFilter#invoke
1.6 ExceptionFilter#invoke - DelegateProviderMetaDataInvoker#invoke
- JavassitProxyFactory#AbstractProxyInvoker#invoke
服务导出时,
- 服务信息转换为URL放入注册中心
这个步骤调用了父类Protocol父类AbstractProxyProtocol的export方法执行的; - 启动Netty服务器;
这个步骤调用了DubboProtocol#export方法执行的;
会把invoker包装为一个Exporter实例, 放入缓存中;服务请求时,从缓存中拿到exporter去执行;
DubboProtocol#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
// 构造一个Exporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// 开启NettyServer
openServer(url);
return exporter;
}
//判断容器中是否存在Netty服务器, 存在就重新导出,不存在即调用createServer创建NettyServer服务器;
private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
}
}
}
//调用 Exchangers.bind(url, requestHandler)创建服务器;
private ExchangeServer createServer(URL url) {
url = ...;
//省略部分代码;
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
}
return server;
}
Exchangers#bind方法:
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// codec表示协议编码方式
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 通过url得到HeaderExchanger, 利用HeaderExchanger进行bind,将得到一个HeaderExchangeServer
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
//type的值为header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
type的值为header,
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
最终会通过SPI机制, 拿到扩展为header的扩展点实现类;
Exchanger扩展点用于创建请求交换的HeaderExchangerServer服务器;
@SPI(HeaderExchanger.NAME)
public interface Exchanger {
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}
Exchanger的扩展点实现类只有一个:
- HeaderExchanger
header=org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger
HeaderExchanger
用来创建ExchangeServer /ExchangeClient 实例;
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
在创建HeaderExchangerServer时, 调用了Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))方法;又涉及到了Transporter扩展点;
Transporter扩展点获取一个Server实例返回;
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
//省略部分代码
// 调用NettyTransporter去绑定,Transporter表示网络传输层
return getTransporter().bind(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
会通过SPI机制,拿到Transporter的扩展点实现类实例;再调用bind方法;
Transporter接口上使用@SPI注解, 表示默认使用哪个扩展点实现类;
@SPI("netty")
public interface Transporter {
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
Server bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
Transporter扩展点实现类:
在dubbo-remoting-netty模块中只有一个NettyTransporter
netty3=org.apache.dubbo.remoting.transport.netty.NettyTransporter
在dubbo-remoting-grizzly模块中,只有一个GrizzlyTransporter
grizzly=org.apache.dubbo.remoting.transport.grizzly.GrizzlyTransporter
而在NettyTransporter中,bind方法里面,会创建NettyServer返回;
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
启动完服务器后, 最终获取的Server类型HeaderExchangerServer, 其内部包装了一个NettyServer;
HeaderExchangerServer与NettyServer
NettyClient和NettyServer之间发送和接收数据;
那么为什么还要在加个HeaderExchangerServer和HeaderExchangerClient呢?
- HeaderExchangerClient发送数据的时候, 会构造一个Requset请求对象发送给服务端;
- HeaderExchangerServer会构造一个Response响应数据实例发送给客户端;
个人看法就是 : 在数据交换层之上封装了一个请求/响应层协议;类比TCP传输层协议, HTTP是应用层协议一样;
EchoFilter#invoke方法, 该类是回声功能, 判断服务是否可用; 如果可用会调用AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv)返回;
@Activate(group = CommonConstants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
}
return invoker.invoke(inv);
}
}
AsyncRpcResult#newDefaultAsyncResult
- 会创建一个AppResponse实例,设置返回值, 调用 asyncRpcResult.complete(appResponse)表示处理结束,返回;
- 这个AppResponse实例代表响应数据;
public static AsyncRpcResult newDefaultAsyncResult(Invocation invocation) {
return newDefaultAsyncResult(null, null, invocation);
}
public static AsyncRpcResult newDefaultAsyncResult(Object value, Invocation invocation) {
return newDefaultAsyncResult(value, null, invocation);
}
public static AsyncRpcResult newDefaultAsyncResult(Throwable t, Invocation invocation) {
return newDefaultAsyncResult(null, t, invocation);
}
public static AsyncRpcResult newDefaultAsyncResult(Object value, Throwable t, Invocation invocation) {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
AppResponse appResponse = new AppResponse();
if (t != null) {
appResponse.setException(t);
} else {
appResponse.setValue(value);
}
asyncRpcResult.complete(appResponse);
return asyncRpcResult;
}
}
再看看Request请求数据;
HeaderExchangerChannel#send方法:如果是Request就直接发送数据了。如果不是,也会构建一个Request实例,发送请求数据;
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
}
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion(Version.getProtocolVersion());
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
}
}
最终再由NettyChannel通道去传输数据, 这就是个人看法了。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)