好久不见,大家最近可好,近期赶了一个月的项目终于完事了,可以闲下来学点东西了,今天咱们看下dubbo的异步转同步。
之所以说这个是因为近期项目中遇到一个需求也需要异步转同步,于是借鉴了dubbo的实现,咱们今天一看下dubbo具体是怎么做的。
源码分析dubbo远程rpc协议和网络框架有多种,我们以默认的dubbo协议、网络框架netty作为切入点,做分析,包结构如下图:
DubboInvoker这个类很重要,因为客户端没有具体的实现都是通过代理实现的调用逻辑,而这个类就是最终的工作者,其内部核心方法如下:
protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isoneway = RpcUtils.isoneway(getUrl(), invocation); int timeout = calculateTimeout(invocation, methodName); invocation.put(TIMEOUT_KEY, timeout); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { ExecutorService executor = getCallbackExecutor(getUrl(), inv); CompletableFuture appResponseFuture = // todo 这里就是真正发起请求地方,大家可以自行打断点,远程rpc调用最后就走到这里 currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
通过断点我们可以发现最后由ExchangeChannel接口的 :
CompletableFuture
api发起的rpc请求
这个接口返回CompletableFuture这是java1.8引入的一个类,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,一路跟踪下去发现最后是由HeaderExchangeChannel这个实现类做的,源码如下:
@Override public CompletableFuture
这里分成了2步,首先通过DefaultFuture创建一个DefaultFuture然后调用send方法发送消息,再将DefaultFuture返回。
咱们看下send方法,最终会调用到NettyChannel类,这就相对简单了就是原生netty发送消息的写法如下:
public void send(Object message, boolean sent) throws RemotingException { // whether the channel is closed super.send(message, sent); boolean success = true; int timeout = 0; try { ChannelFuture future = channel.writeAndFlush(message); if (sent) { // wait timeout ms timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null) { throw cause; } } catch (Throwable e) { removeChannelIfDisconnected(channel); throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } }
好再来看下DefaultFuture类都有哪些 *** 作。
private static final MapCHANNELS = new ConcurrentHashMap<>(); private static final Map FUTURES = new ConcurrentHashMap<>(); // 重点,通过次方法实现异步转同步 *** 作 private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } // the result is returning, but the caller thread may still waiting // to avoid endless waiting for whatever reason, notify caller thread to return. if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; if (threadlessExecutor.isWaiting()) { threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + " which is not an expected state, interrupt the thread manually by returning an exception.")); } } } private static class TimeoutCheckTask implements TimerTask { private final Long requestID; TimeoutCheckTask(Long requestID) { this.requestID = requestID; } @Override public void run(Timeout timeout) { DefaultFuture future = DefaultFuture.getFuture(requestID); if (future == null || future.isDone()) { return; } if (future.getExecutor() != null) { future.getExecutor().execute(() -> notifyTimeout(future)); } else { notifyTimeout(future); } } private void notifyTimeout(DefaultFuture future) { // create exception response. Response timeoutResponse = new Response(future.getId()); // set timeout status. timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. DefaultFuture.received(future.getChannel(), timeoutResponse, true); } }
这里源码只展示了关键代码,2个变量CHANNELS、FUTURS是用来保存当前链接和调用线程的。
doReceived方法就是来接收服务端返回的,并且将其返回信息设置到调用者的CompletableFuture结果中,调用链如下:
NettyClientHandler---->
channelRead-->
HeaderExchangeHandler--->
received---->
handleResponse----->
DefaultFuture---->
received---->
doReceived
涉及到的源码如下:
NettyClientHandler @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.received(channel, msg); } HeaderExchangeHandler @Override public void received(Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } } DefaultFuture public static void received(Channel channel, Response response) { received(channel, response, false); } public static void received(Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { Timeout t = future.timeoutCheckTask; if (!timeout) { // decrease Time t.cancel(); } future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response status is " + response.getStatus() + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result."); } } finally { CHANNELS.remove(response.getId()); } }
我们还看到DefaultFuture做了超时处理,如果一定时间内没有得到响应就设置超时返回实现方法由DefaultFuture的内部类:
TimeoutCheckTask----->notifyTimeout
总结至此呢dubbo异步转同步的核心逻辑也算梳理清楚了,其核心类便是DefaultFutrue,使用了concurrentHashMap来记录id和对应的defaultFuture对象,并使用CompletableFuture来达到同步调用的效果。
我们借鉴了dubbo的思想,但是我们的交互采用的是短连接即每次交互重新创建链接(由于服务端是通过4G的物联网卡传输信号不稳定,电量有限所以设置的短连接),所以不能在本地保存链接信息。
所以我们采用redis保存当前事务流水,服务端一但返回就将此流水的处理结果保存至redis中,当前进程轮训redis获取结果,并启动异步线程如果超过固定时间就做超时返回处理。
通过阅读源码并解决自己在实际开发中的问题这种心情非常好,至此也更加确信了自己阅读源码的行动是正确的,dubbo处处是精华,非常多的地方值得我们借鉴,今年给自己立个flag 完整的阅读dubbo源码并坚持至少每月2篇dubbo源码分析。
好了今天就到这里了,咱们下期见!
MYSQL系列经典文章
MYSQl 深入探索系列一 redo log
MYSQl深入探索系列二 undo log
MYSQl深入探索系列三 MVCC机制
MYSQl深入探索系列四 服务端优化
MYSQL深入探索系列之五 buffer_pool
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)