深入浅出dubbo

深入浅出dubbo,第1张

文章目录
    • NettyServer.doOpen
    • NettyHandler#messageReceived()
    • HeaderExchangeHandler.received
    • DubboProtocol->ExchangeHandlerAdapter.reply()
    • 消费端接收信息HeaderExchangeHandler.received->handleResponse方法
    • 总结

消费端把消息发送出去之后,服务端会收到消息,然后把执行的结果返回到客户端。

NettyServer.doOpen

NettyServer开启了netty服务,对应的handler处理是NettyHandler.

@Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }
NettyHandler#messageReceived()

将netty的channel包装成dubbo自身封装的channel
使用dubbo自身的ChannelHandler去处理消息,而Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一 对日志错误进行处理、统一对请求进行计数、控制Handler执行与否

  @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        // 将netty的channel封装成dubbo内部的channel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

handler的执行流程比较复杂,
handler->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler- >HeaderExchangeHandler->最后进入这个方法->DubboProtocol$requestHandler(receive)

  • MultiMessageHandler: 复合消息处理
  • HeartbeatHandler:心跳消息处理,接收心跳并发送心跳响应
  • AllChannelHandler:业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任 务给线程池处理
  • DecodeHandler:业务解码处理器
HeaderExchangeHandler.received

HeaderExchangeHandler前面有一些解码的 *** 作比较复杂,就不分析了.HeaderExchangeHandler的主要 *** 作就是对请求进行处理,
handlerRequest,双向请求
handler.received 单向请求
handleResponse 响应消息(在消费端执行)

  @Override
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 处理请求对象
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    // 处理事件
                    handlerEvent(channel, request);
                } else {
                    // 处理普通的请求
                    // 双向通信
                    if (request.isTwoWay()) {
                        // 向后调用服务,并得到调用结果
                        handleRequest(exchangeChannel, request);
                    } else {
                        // 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
                        handler.received(exchangeChannel, request.getData());
                    }
                }
                // 处理响应对象,服务消费方会执行此处逻辑.
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // telnet调用
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }


再看看handleRequest方法,handleRequest后面执行到handler.reply方法,我们主要看看dubbo协议,就走到DubboProtocol的匿名类对象ExchangeHandlerAdapter里面了.

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);

            channel.send(res);
            return;
        }
        // find handler by message class.
        // 获取 data 字段值,也就是 RpcInvocation 对象
        Object msg = req.getData();
        try {
            // handle data.
            // 继续向下调用
            CompletableFuture<Object> future = handler.reply(channel, msg);
            if (future.isDone()) {
                res.setStatus(Response.OK);
                res.setResult(future.get());
                channel.send(res);
                return;
            }
            future.whenComplete((result, t) -> {
                try {
                    if (t == null) {
                        res.setStatus(Response.OK);
                        res.setResult(result);
                    } else {
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    channel.send(res);
                } catch (RemotingException e) {
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                } finally {
                    // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
                }
            });
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }
DubboProtocol->ExchangeHandlerAdapter.reply()

reply方法主要进行如下步骤,
将message转成Invocation,获取invoker,调用invoker的invoke方法,封装返回参数返回.

 @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: "
                        + (message == null ? null : (message.getClass().getName() + ": " + message))
                        + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            }

            Invocation inv = (Invocation) message;
            // 获取invoker实例
            Invoker<?> invoker = getInvoker(channel, inv);
            // need to consider backward-compatibility if it's a callback
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                String methodsStr = invoker.getUrl().getParameters().get("methods");
                boolean hasMethod = false;
                if (methodsStr == null || !methodsStr.contains(",")) {
                    hasMethod = inv.getMethodName().equals(methodsStr);
                } else {
                    String[] methods = methodsStr.split(",");
                    for (String method : methods) {
                        if (inv.getMethodName().equals(method)) {
                            hasMethod = true;
                            break;
                        }
                    }
                }
                if (!hasMethod) {
                    logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                            + " not found in callback service interface ,invoke will be ignored."
                            + " please update the api interface. url is:"
                            + invoker.getUrl()) + " ,invocation is :" + inv);
                    return null;
                }
            }
            RpcContext rpcContext = RpcContext.getContext();
            rpcContext.setRemoteAddress(channel.getRemoteAddress());
            Result result = invoker.invoke(inv);

            if (result instanceof AsyncRpcResult) {
                return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);

            } else {
                return CompletableFuture.completedFuture(result);
            }
        }

获取invoker对象就是从服务注册的时候,保存的map中获取的:我们看看export方法:

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        boolean isCallBackServiceInvoke = false;
        boolean isStubServiceInvoke = false;
        int port = channel.getLocalAddress().getPort();
        String path = inv.getAttachments().get(Constants.PATH_KEY);

        // if it's callback service on client side
        isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
        if (isStubServiceInvoke) {
            port = channel.getRemoteAddress().getPort();
        }

        //callback
        isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
        if (isCallBackServiceInvoke) {
            path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
            inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
        }

        // 计算 service key,格式为 groupName/serviceName:serviceVersion:port。比如:
        //   dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
        // 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象,
        // 服务导出过程中会将  映射关系存储到 exporterMap 集合中
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
                    ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
        }

        //返回invoker对象
        return exporter.getInvoker();
    }

拿到的invoker实际也是生成的代理类,
invoker=ProtocolFilterWrapper(InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxy Invoker)))

最后一定会进入到这个代码里面 AbstractProxyInvoker
在AbstractProxyInvoker里面,doInvoker本质上调用的是wrapper.invokeMethod()。Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。

public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 类型转换
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根据方法名调用指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
}
消费端接收信息HeaderExchangeHandler.received->handleResponse方法

消费端接收消息主要就是通过DefaultFuture类来实现的,通过生成唯一id来找到对应请求的feature.

   static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

 public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

  private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                // 唤醒用户线程
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }

总结

dubbo的调用有很多我没有讲到的地方,比如netty那块,编码解码我都没怎么看,感觉自己太菜,对那一块也不是很熟悉.故直接跳过了,后续有时间可以再仔细的分析一波.到这里dubbo源码的学习暂时告一段落了.

ChannelEventRunnable#run()> DecodeHandler#received(Channel, Object)> HeaderExchangeHandler#received(Channel, Object)> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)> Filter#invoke(Invoker, Invocation)> AbstractProxyInvoker#invoke(Invocation)> Wrapper0#invokeMethod(Object, String, Class[], Object[])> DemoServiceImpl#sayHello(String)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/758377.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-01
下一篇 2022-05-01

发表评论

登录后才能评论

评论列表(0条)

保存