dubbo源码分析一:rpc异步转同步

dubbo源码分析一:rpc异步转同步,第1张

dubbo源码分析一:rpc异步转同步 概述

好久不见,大家最近可好,近期赶了一个月的项目终于完事了,可以闲下来学点东西了,今天咱们看下dubbo的异步转同步。

之所以说这个是因为近期项目中遇到一个需求也需要异步转同步,于是借鉴了dubbo的实现,咱们今天一看下dubbo具体是怎么做的。

源码分析

dubbo远程rpc协议和网络框架有多种,我们以默认的dubbo协议、网络框架netty作为切入点,做分析,包结构如下图:

DubboInvoker这个类很重要,因为客户端没有具体的实现都是通过代理实现的调用逻辑,而这个类就是最终的工作者,其内部核心方法如下:

    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isoneway = RpcUtils.isoneway(getUrl(), invocation);
            int timeout = calculateTimeout(invocation, methodName);
            invocation.put(TIMEOUT_KEY, timeout);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture appResponseFuture =
                        // todo 这里就是真正发起请求地方,大家可以自行打断点,远程rpc调用最后就走到这里
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
   

通过断点我们可以发现最后由ExchangeChannel接口的 :

    CompletableFuture request(Object request, int timeout, ExecutorService executor) throws RemotingException; 

 api发起的rpc请求

这个接口返回CompletableFuture这是java1.8引入的一个类,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,一路跟踪下去发现最后是由HeaderExchangeChannel这个实现类做的,源码如下:

    @Override
    public CompletableFuture request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        //重点在这里。通过DefaultFuture创建一个Future,并将此返回。
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    } 

这里分成了2步,首先通过DefaultFuture创建一个DefaultFuture然后调用send方法发送消息,再将DefaultFuture返回。

咱们看下send方法,最终会调用到NettyChannel类,这就相对简单了就是原生netty发送消息的写法如下:

    public void send(Object message, boolean sent) throws RemotingException {
        // whether the channel is closed
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                // wait timeout ms
                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.cause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            removeChannelIfDisconnected(channel);
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }
        if (!success) {
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }

好再来看下DefaultFuture类都有哪些 *** 作。

    private static final Map CHANNELS = new ConcurrentHashMap<>();

    private static final Map FUTURES = new ConcurrentHashMap<>();
    
    // 重点,通过次方法实现异步转同步 *** 作
    private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }

        // the result is returning, but the caller thread may still waiting
        // to avoid endless waiting for whatever reason, notify caller thread to return.
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            if (threadlessExecutor.isWaiting()) {
                threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
                        " which is not an expected state, interrupt the thread manually by returning an exception."));
            }
        }
    }
    
    private static class TimeoutCheckTask implements TimerTask {

        private final Long requestID;

        TimeoutCheckTask(Long requestID) {
            this.requestID = requestID;
        }

        @Override
        public void run(Timeout timeout) {
            DefaultFuture future = DefaultFuture.getFuture(requestID);
            if (future == null || future.isDone()) {
                return;
            }

            if (future.getExecutor() != null) {
                future.getExecutor().execute(() -> notifyTimeout(future));
            } else {
                notifyTimeout(future);
            }
        }

        private void notifyTimeout(DefaultFuture future) {
            // create exception response.
            Response timeoutResponse = new Response(future.getId());
            // set timeout status.
            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
            // handle response.
            DefaultFuture.received(future.getChannel(), timeoutResponse, true);
        }
    }

这里源码只展示了关键代码,2个变量CHANNELS、FUTURS是用来保存当前链接和调用线程的。

doReceived方法就是来接收服务端返回的,并且将其返回信息设置到调用者的CompletableFuture结果中,调用链如下:

NettyClientHandler---->

                channelRead-->

HeaderExchangeHandler--->

                 received---->

                 handleResponse----->

DefaultFuture---->

                 received---->

                 doReceived

涉及到的源码如下:

    NettyClientHandler
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
          handler.received(channel, msg);
      }
   HeaderExchangeHandler 
         @Override
          public void received(Channel channel, Object message) throws RemotingException {
              final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
              if (message instanceof Request) {
                  // handle request.
                  Request request = (Request) message;
                  if (request.isEvent()) {
                      handlerEvent(channel, request);
                  } else {
                      if (request.isTwoWay()) {
                          handleRequest(exchangeChannel, request);
                      } else {
                          handler.received(exchangeChannel, request.getData());
                      }
                  }
              } else if (message instanceof Response) {
                  handleResponse(channel, (Response) message);
              } else if (message instanceof String) {
                  if (isClientSide(channel)) {
                      Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                      logger.error(e.getMessage(), e);
                  } else {
                      String echo = handler.telnet(channel, (String) message);
                      if (echo != null && echo.length() > 0) {
                          channel.send(echo);
                      }
                  }
              } else {
                  handler.received(exchangeChannel, message);
              }
          }
       static void handleResponse(Channel channel, Response response) throws RemotingException {
            if (response != null && !response.isHeartbeat()) {
                DefaultFuture.received(channel, response);
            }
        }
 DefaultFuture
        public static void received(Channel channel, Response response) {
            received(channel, response, false);
        }
    
        public static void received(Channel channel, Response response, boolean timeout) {
            try {
                DefaultFuture future = FUTURES.remove(response.getId());
                if (future != null) {
                    Timeout t = future.timeoutCheckTask;
                    if (!timeout) {
                        // decrease Time
                        t.cancel();
                    }
                    future.doReceived(response);
                } else {
                    logger.warn("The timeout response finally returned at "
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                            + ", response status is " + response.getStatus()
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                            + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
                }
            } finally {
                CHANNELS.remove(response.getId());
            }
        }

我们还看到DefaultFuture做了超时处理,如果一定时间内没有得到响应就设置超时返回实现方法由DefaultFuture的内部类:

                                        TimeoutCheckTask----->notifyTimeout

总结

至此呢dubbo异步转同步的核心逻辑也算梳理清楚了,其核心类便是DefaultFutrue,使用了concurrentHashMap来记录id和对应的defaultFuture对象,并使用CompletableFuture来达到同步调用的效果。

我们借鉴了dubbo的思想,但是我们的交互采用的是短连接即每次交互重新创建链接(由于服务端是通过4G的物联网卡传输信号不稳定,电量有限所以设置的短连接),所以不能在本地保存链接信息。

所以我们采用redis保存当前事务流水,服务端一但返回就将此流水的处理结果保存至redis中,当前进程轮训redis获取结果,并启动异步线程如果超过固定时间就做超时返回处理。

通过阅读源码并解决自己在实际开发中的问题这种心情非常好,至此也更加确信了自己阅读源码的行动是正确的,dubbo处处是精华,非常多的地方值得我们借鉴,今年给自己立个flag 完整的阅读dubbo源码并坚持至少每月2篇dubbo源码分析。

好了今天就到这里了,咱们下期见!

MYSQL系列经典文章

MYSQl 深入探索系列一 redo log

MYSQl深入探索系列二 undo log

MYSQl深入探索系列三 MVCC机制

MYSQl深入探索系列四 服务端优化

MYSQL深入探索系列之五 buffer_pool

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5717779.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)