dubbo系列7:Dubbo服务调⽤源码解析

dubbo系列7:Dubbo服务调⽤源码解析,第1张

dubbo系列7:Dubbo服务调⽤源码解析

 

 

 

 

服务消费端执⾏逻辑

 1. MockClusterInvoker.invoke(new RpcInvocation(method, args)):Mock逻辑
2. AbstractClusterInvoker.invoke(invocation):把RpcContext中设置的Attachments添加到
invocation对象上,调⽤路由链从服务⽬录上筛选出适合的服务Invoker,获得服务均衡策略
loadbalance
3. FailoverClusterInvoker.doInvoke(invocation, invokers, loadbalance):根据负载均衡策略选出⼀
个invoker,然后执⾏
4. InvokerWrapper.invoke(invocation):没做什么事情
5. CallbackRegistrationInvoker.invoke(invocation):开始执⾏Filter链,执⾏完得到结果后,会获取
ListenableFilter中的listener,执⾏listener的onResponse⽅法
6. ConsumerContextFilter.invoke(invocation):设置RpcContext中LocalAddress、
RemoteAddress、RemoteApplicationName参数
7. FutureFilter.invoke(invocation):
8. MonitorFilter.invoke(invocation):⽅法的执⾏次数+1
9. ListenerInvokerWrapper.invoke(invocation):没做什么事情
10. AsyncToSyncInvoker.invoke(invocation):异步转同步,会先⽤下层Invoker去异步执⾏,然后阻塞Integer.MAX_VALUE时间,直到拿到了结果
11. AbstractInvoker.invoke(invocation):主要调⽤DubboInvoker的doInvoke⽅法,如果doInvoker⽅
法出现了异常,会进⾏包装,包装成AsyncRpcResult
12. DubboInvoker.doInvoke(invocation):从clients轮询出⼀个client进⾏数据发送,如果配置了不关⼼
结果,则调⽤ReferenceCountExchangeClient的send⽅法,否则调⽤
ReferenceCountExchangeClient的request⽅法
13. ReferenceCountExchangeClient.request(Object request, int timeout):没做什么事情
14. HeaderExchangeClient.request(Object request, int timeout):没做什么事情
15. HeaderExchangeChannel.request(Object request, int timeout):构造⼀个Request对象,并且会构造⼀个DefaultFuture对象来阻塞timeout的时间来等待结果,在构造DefaultFuture对象时,会把
DefaultFuture对象和req的id存⼊FUTURES中,FUTURES是⼀个Map,当
HeaderExchangeHandler接收到结果时,会从这个Map中根据id获取到DefaultFuture对象,然后返
回Response。
16. AbstractPeer.send(Object message):从url中获取send参数,默认为false
17. AbstractClient.send(Object message, boolean sent):没做什么
18. NettyChannel.send(Object message, boolean sent):调⽤NioSocketChannel的writeAndFlush
发送数据,然后判断send如果是true,那么则阻塞url中指定的timeout时间,因为如果send是false,在HeaderExchangeChannel中会阻塞timeout时间
19. NioSocketChannel.writeAndFlush(Object msg):最底层的Netty⾮阻塞式的发送数据


总结⼀下上⾯调⽤流程:
1. 最外层是Mock逻辑,调⽤前,调⽤后进⾏Mock
2. 从服务⽬录中,根据当前调⽤的⽅法和路由链,筛选出部分服务Invoker(DubboInvoker)
3. 对服务Invoker进⾏负载均衡,选出⼀个服务Invoker
4. 执⾏Filter链

5. AsyncToSyncInvoker完成异步转同步,因为DubboInvoker的执⾏是异步⾮阻塞的,所以如果是同步调⽤,则会在此处阻塞,知道拿到响应结果
6. DubboInvoker开始异步⾮阻塞的调⽤
7. HeaderExchangeChannel中会阻塞timeout的时间来等待结果,该timeout就是⽤户在消费端所配置的timeout

服务提供端执⾏逻辑

1. NettyServerHandler:接收数据
2. MultiMessageHandler:判断接收到的数据是否是MultiMessage,如果是则获取MultiMessage中的单个Message,传递给HeartbeatHandler进⾏处理
3. HeartbeatHandler:判断是不是⼼跳消息,如果是不是则把Message传递给AllChannelHandler
4. AllChannelHandler:把接收到的Message封装为⼀个ChannelEventRunnable对象,扔给线程池进⾏
处理
5. ChannelEventRunnable:在ChannelEventRunnable的run⽅法中会调⽤DecodeHandler处理
Message
6. DecodeHandler:按Dubbo协议的数据格式,解析当前请求的path,versio,⽅法,⽅法参数等等,
然后把解析好了的请求交给HeaderExchangeHandler
7. HeaderExchangeHandler:处理Request数据,⾸先构造⼀个Response对象,然后调⽤
ExchangeHandlerAdapter得到⼀个CompletionStage future,然后给future通过whenComplete绑
定⼀个回调函数,当future执⾏完了之后,就可以从回调函数中得到ExchangeHandlerAdapter的执⾏
结果,并把执⾏结果设置给Response对象,通过channel发送出去。
8. ExchangeHandlerAdapter:从本机已经导出的Exporter中根据当前Request所对应的服务key,去寻
找Exporter对象,从Exporter中得到Invoker,然后执⾏invoke⽅法,此Invoker为
ProtocolFilterWrapper$CallbackRegistrationInvoker
9. ProtocolFilterWrapper$CallbackRegistrationInvoker:负责执⾏过滤器链,并且在执⾏完了之后回
调每个过滤器的onResponse或onError⽅法
10. EchoFilter:判断当前请求是不是⼀个回升测试,如果是,则不继续执⾏过滤器链了(服务实现者
Invoker也不会调⽤了)
11. ClassLoaderFilter:设置当前线程的classloader为当前要执⾏的服务接⼝所对应的classloader
12. GenericFilter:把泛化调⽤发送过来的信息包装为RpcInvocation对象
13. ContextFilter:设置RpcContext.getContext()的参数
14. TraceFilter:先执⾏下⼀个invoker的invoke⽅法,调⽤成功后录调⽤信息
15. TimeoutFilter:调⽤时没有特别处理,只是记录了⼀下当前时间,当整个filter链都执⾏完了之后回调TimeoutFilter的onResponse⽅法时,会判断本次调⽤是否超过了timeout
16. MonitorFilter:记录当前服务的执⾏次数
17. ExceptionFilter:调⽤时没有特别处理,在回调onResponse⽅法时,对不同的异常进⾏处理,详解Dubbo的异常处理
18. DelegateProvidermetaDataInvoker:过滤器链结束,调⽤下⼀个Invoker

19. AbstractProxyInvoker:在服务导出时,根据服务接⼝,服务实现类对象⽣成的,它的invoke⽅法就会执⾏服务实现类对象的⽅法,得到结果

Dubbo的异常处理

当服务消费者在调⽤⼀个服务时,服务提供者在执⾏服务逻辑时可能会出现异常,对于Dubbo来说,服务消费者需要在消费端抛出这个异常,那么这个功能是怎么做到的呢?


服务提供者在执⾏服务时,如果出现了异常,那么框架会把异常捕获,捕获异常的逻辑在
AbstractProxyInvoker中,捕获到异常后,会把异常信息包装为正常的AppResponse对象,只是
AppResponse的value属性没有值,exception属性有值。


此后,服务提供者会把这个AppResponse对象发送给服务消费端,服务消费端是在
InvokerInvocationHandler中调⽤AppResponse的recreate⽅法重新得到⼀个结果,在recreate⽅法中会去失败AppResponse对象是否正常,也就是是否存在exception信息,如果存在,则直接throw这个exception,从⽽做到服务执⾏时出现的异常,在服务消费端抛出。


那么这⾥存在⼀个问题,如果服务提供者抛出的异常类,在服务消费者这边不存在,那么服务消费者也就抛不出这个异常了,那么dubbo是怎么处理的呢?


这⾥就涉及到了ExceptionFilter,它是服务提供者端的⼀个过滤器,它主要是在服务提供者执⾏完服务后会去识别异常:
1. 如果是需要开发⼈员捕获的异常,那么忽略,直接把这个异常返回给消费者
2. 如果在当前所执⾏的⽅法签名上有声明,那么忽略,直接把这个异常返回给消费者
3. 如果抛出的异常不需要开发⼈员捕获,或者⽅法上没有申明,那么服务端或记录⼀个error⽇志
4. 异常类和接⼝类在同⼀jar包⾥,那么忽略,直接把这个异常返回给消费者
5. 如果异常类是JDK⾃带的异常,那么忽略,直接把这个异常返回给消费者
6. 如果异常类是Dubbo⾃带的异常,那么忽略,直接把这个异常返回给消费者
7. 否则,把异常信息包装成RuntimeException,并覆盖AppResponse对象中的exception属性

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5695965.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存