1.ThreadPool
2.CompletableFuture
3.MQ
4.BlockingQueue
5.Fork/Join
那么作为一款优秀的RPC框架,dubbo是如何实现异步调用的呢?本文将介绍2.6.x版本以来dubbo异步调用方式的演进。
1.增加consumer配置
2.参数回调(2.7.0已废弃,本文将不展开)
3.事件通知
4.直接定义返回CompletableFuture的服务接口
5.利用AsyncFor注解实现客户端的同步转异步
6.利用RpcContext.startAsync()实现服务端的同步转异步
其中前面3种方式在2.6.x版本中就已支持,但参数回调在2.7.0版本中已废弃,后面3种则是在2.7.0版本中新增的方式。
这种方式很简单,只需要在服务引用时增加<dubbo:method>配置即可,如下所示,其中name为需要异步调用的方法名,async表示是否启用异步调用。
此时consumer端有3种调用方式:
如果只想异步调用,不需要返回值,则可以配置 return="false",这样可以避免Future对象的创建,此时RpcContext.getContext().getFuture()将返回null;
在上述方式中,想获取异步调用的结果,需要从RpcContext中获取,使用起来不是很方便。基于java 8中引入的CompletableFuture,dubbo在2.7.0版本中也增加了对CompletableFuture的支持,我们可以直接定义一个返回CompletableFuture类型的接口。
服务端实现如下:
如此一来,我们就实现了服务端的异步,客户端直接调用接口即可,不需要再从RpcContext中获取返回值:
dubbo允许consumer 端在调用之前、调用之后或出现异常时,触发 oninvoke、onreturn、onthrow 三个事件。类似于Spring中的前置增强、后置增强和异常抛出增强。只需要在服务引用时,增加以下配置指定事件通知的方法即可:
事件通知服务如下:
与Spring增强不同的是,dubbo中的事件通知也可以是异步,只需要将调用方法配置为async="true"即可,但oninvoke方法无法异步执行。
dubbo中的异步调用实际上是通过引入一个FutureFilter来实现的,关键源码如下。
在fireInvokeCallback()方法中,会首先调用getAsyncMethodInfo()获取目标方法的方法信息,看是否有配置事件通知:
获取到调用方法对应的信息后,回到fireInvokeCallback()方法:
方法调用完成后,会回到postProcessResult()方法:
syncCallback和asyncCallback里面的逻辑比较简单,就是根据方法是正常返回还是抛异常,触发对应的事件。可以看到,如果被调用方法是同步的,则这两个事件也是同步的,反之亦然。
在postProcessResult()方法中,第一个参数是invoker.invoke(invocation),这里就会走到下一个Filter链完成filter链的处理,最终调到原始服务,走到DubboInvoker#doInvoke方法:
通过这个过程不难发现,不管是同步调用还是异步调用,最终都会走到ExchangeClient#send方法,再往下会走到HeaderExchangeChannel#request方法,这个一个异步方法,返回ResponseFuture对象。
看到这里我才恍然大悟,原来dubbo中同步调用也是通过异步调用来实现,只是同步调用发起后,直接调用future#get的方法来同步等待结果的返回,而异步调用只返回Future Response,在用户需要关心其结果时才调用get方法。
参考:
http://dubbo.apache.org/zh-cn/blog/dubbo-invoke.html
http://dubbo.apache.org/zh-cn/blog/dubbo-new-async.html
https://mp.weixin.qq.com/s?__biz=MzUzNTY4NTYxMA==&mid=2247484959&idx=5&sn=654b4ae76e76ac1a436f2ceb1774f4a6&chksm=fa80f69acdf77f8c5371d4a929557d6ec7fba3020c3bbb1490f0835218e8aef7af285e91d097&scene=21#wechat_redirect
consumer的RegistryDirectory创建DubboInvoker是在zk配置发生变化回调 RegistryDirectory.notify 的时候。创建DubboInvoker使用的是 protocol.refer(serviceType, url) 方法,Protocol$Adpative会有一些Wapper给DubboInvoker添加很多包装类,所以在调用链中会有一些filter,这和服务发布的时候是一样的
发送数据是异步的,可以参考netty的案例,通过channel发送数据不能直接拿到结果,必须通过epoll中的等待回调事件再取得数据
dubbo的consumer发送请求是非阻塞的,不会等待返回值。provider接收是阻塞的会等待provider调用invoker处理完直接返回给consumer。
dubbo 是基于netty NIO的非阻塞并行调用通信。(阻塞 非阻塞 异步 同步 区别 )dubbo 的通信方式 有3类类型:
异步.有返回值时会将能获取结果的future放在上下文变量中,如果需要取结果直接从future中阻塞获取就可以了;异步,无返回值,则不再处理;同步,有返回值会直接调用future.get
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)