Dubbo消息编解码和序列化

Dubbo消息编解码和序列化,第1张

Dubbo消息解码和序列化 1. 前言

前面分析了Consumer是如何发起远程服务调用的,最终DubboInvoker会利用ExchangeClient客户端发送网络请求。Dubbo会将网络请求封装为Request对象发送,但是网络传输的总是字节序列,Request对象必须经过编码才能被发送。同理,服务端在接收到客户端的请求后,也必须先解码才能得到Request对象,Response亦是如此。

Dubbo网络通讯协议分为两部分,分别是Header和Body,Header部分采用Codec编解码,Body部分使用序列化。本篇文章会分析Dubbo对于消息的编解码和序列化的细节。

2. 编解码

Dubbo默认使用Netty作为网络传输层框架,因此我们也以Netty为例,分别从客户端编码和服务端解码两个视角去分析。

2.1 Encoder

消息的编码相对于解码来说要简单的多,因为不用考虑TCP粘包/拆包的问题。要想知道发送的Request对象经历了什么,我们首先要从NettyClient说起。

使用Netty,作为开发者而言,最重要的就是设计ChannelHandler,编排ChannelHandlerPipeline。一般来说,入站的数据需要先解码,出站的数据最终需要编码,因此会在Pipeline的头部设置编解码处理器,Dubbo也正是这么处理的。

ch.pipeline()
    .addLast("decoder", adapter.getDecoder())
    .addLast("encoder", adapter.getEncoder())
    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
    .addLast("handler", nettyClientHandler);

Dubbo在Pipeline的头部放了编码器InternalEncoder,它依赖于ExchangeCodec,所以我们直接看ExchangeCodec#encodeRequest()。
编码分为两部分,根据协议设置Header,再根据序列化策略将Request里的Data序列化后写入Body。

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    // 序列化策略 默认hessian2
    Serialization serialization = getSerialization(channel);
    // 协议头Header 16字节
    byte[] header = new byte[HEADER_LENGTH];
    // 2字节 魔数
    Bytes.short2bytes(MAGIC, header);
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    if (req.isTwoWay()) {
        header[2] |= FLAG_TWOWAY;
    }
    if (req.isEvent()) {
        header[2] |= FLAG_EVENT;
    }
    // RequestId 全局自增
    Bytes.long2bytes(req.getId(), header, 4);
    int savedWriteIndex = buffer.writerIndex();
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    // 序列化Data
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    if (req.isEvent()) {// 事件消息
        encodeEventData(channel, out, req.getData());
    } else {
        encodeRequestData(channel, out, req.getData(), req.getVersion());
    }
    out.flushBuffer();
    if (out instanceof Cleanable) {
        ((Cleanable) out).cleanup();
    }
    bos.flush();
    bos.close();
    int len = bos.writtenBytes();
    // 校验负载,Body是否太大
    checkPayload(channel, len);
    Bytes.int2bytes(len, header, 12);
    buffer.writerIndex(savedWriteIndex);
    buffer.writeBytes(header); // write header.
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

关于Dubbo网络通讯协议的部分,请查阅之前的文章。

这里需要注意一点,Header后4个字节记录的是BodyLength,服务端会基于此解决TCP粘包拆包的问题。

2.2 Decoder

解码比编码要复杂的多,因为要考虑TCP粘包拆包的场景。

和编码一样,针对服务端的解码,我们要从NettyServer说起,下面是Dubbo Pipeline的设置。

ch.pipeline()
    .addLast("decoder", adapter.getDecoder())
    .addLast("encoder", adapter.getEncoder())
    .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
    .addLast("handler", nettyServerHandler);

可以发现,和客户端几乎如出一辙,唯一的不同是最后的Handler,一个用来处理客户端逻辑,一个用来处理服务端逻辑。

Dubbo在Pipeline的头部放了解码器InternalDecoder,它继承自Netty提供的ByteToMessageDecoder抽象类。它依赖DubboCountCodec,本身不处理解码逻辑,就是个简单的循环,以便读取多条消息。

private class InternalDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf input, List out) throws Exception {
        ChannelBuffer message = new NettyBackedChannelBuffer(input);
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        // 循环读,可能有多条消息
        do {
            // 先保存读索引
            int saveReaderIndex = message.readerIndex();
            Object msg = codec.decode(channel, message);
            if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                // 读到的数据不完整,恢复读索引,等待对端发送更多的数据
                message.readerIndex(saveReaderIndex);
                break;
            } else {
                if (saveReaderIndex == message.readerIndex()) {
                    throw new IOException("Decode without read data.");
                }
                if (msg != null) {
                    out.add(msg);
                }
            }
        } while (message.readable());
    }
}
 

DubboCountCodec也是个装饰者,它本身也不处理解码逻辑,它在DubboCodec的基础上增加了解码多条消息的能力,而且会把解码的消息数写入attachments。

public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int save = buffer.readerIndex();
    // 由于网络问题,可能会接收到多条消息
    MultiMessage result = MultiMessage.create();
    do {
        Object obj = codec.decode(channel, buffer);
        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
            // 读到的数据不完整,需要对端发送更多的数据
            buffer.readerIndex(save);
            break;
        } else {
            result.addMessage(obj);
            logMessageLength(obj, buffer.readerIndex() - save);
            save = buffer.readerIndex();
        }
    } while (true);
    if (result.isEmpty()) {
        // 没有读取到消息
        return Codec2.DecodeResult.NEED_MORE_INPUT;
    }
    if (result.size() == 1) {
        return result.get(0);
    }
    return result;
}

最终会调用ExchangeCodec#decode()开始解码单条消息,首先会尝试读取Header,但是由于TCP拆包的问题,读取到的Header可能并不完整,后面会做判断。

public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    // 可读字节数
    int readable = buffer.readableBytes();
    // header可能不完整
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    buffer.readBytes(header);
    return decode(channel, buffer, readable, header);
}

接下来会对接收到的数据做判断,是否满足16字节,如果不满足,说明连最基本的Header都不完整,此时会返回NEED_MORE_INPUT,表示暂不处理,等待对端发送更多的数据。

// Header不完整,需要等待对端发送更多的数据
if (readable < HEADER_LENGTH) {
    return DecodeResult.NEED_MORE_INPUT;
}

如果Header读取完毕,则开始解析BodyLength,判断Body是否读取完整,如果不完整照样没法处理,需要等待对端发送更多的数据。

// 从Header解析BodyLength
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
// 总的消息长度
int tt = len + HEADER_LENGTH;
if (readable < tt) {
    // 消息不完整,等待对端传输更多数据
    return DecodeResult.NEED_MORE_INPUT;
}

到这里,说明一条完整的消息已经接收到了,可以调用decodeBody()方法进行解码了。解码的方式也不复杂,客户端会按照协议格式写入数据,服务端按照相同的格式读取出来即可,最终的到Request对象。
这里有个点需要注意,Dubbo对于消息Body的反序列化是可以设置工作线程的,默认是在业务线程上进行,也可以通过参数decode.in.io设置在IO线程上进行。

DecodeableRpcInvocation inv;
// 反序列化是否使用IO线程,默认false
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
    inv = new DecodeableRpcInvocation(channel, req, is, proto);
    inv.decode();// IO线程直接反序列化
} else {
    inv = new DecodeableRpcInvocation(channel, req,
                                      new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;

解码出来的Request后面会交给DecodeHandler处理,方法是received(),如果前面IO线程没有反序列化,这里会利用业务线程反序列化,最终交给Handler处理。

public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);
    }
    if (message instanceof Request) {
        // 解码Data 反序列化
        decode(((Request) message).getData());
    }
    if (message instanceof Response) {
        decode(((Response) message).getResult());
    }
    handler.received(channel, message);
}

如果是来自Consumer的RPC调用请求,解码后的结果就是RpcInvocation,最终会交给DubboProtocol里的ExchangeHandler处理,调用本地Invoker,响应结果,流程结束。

3. 序列化

Dubbo支持多种序列化策略,例如Java本身的序列化、hessian2、Kryo等,序列化策略也是通过SPI加载的,可以非常方便的更换。
Serialization是Dubbo对序列化的抽象接口,默认的序列化方案是Hessian2,我们以它为例分析。
序列化和反序列化其实就是把Java对象和字节序列相互转换的一个过程,Dubbo将这个过程也抽象成了两个接口,分别是ObjectOutput和ObjectInput,前者用于序列化,后者用于反序列化。

public class Hessian2Serialization implements Serialization {
    @Override
    public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
        return new Hessian2ObjectOutput(out);
    }

    @Override
    public ObjectInput deserialize(URL url, InputStream is) throws IOException {
        return new Hessian2ObjectInput(is);
    }
}

Hessian2序列化,底层依赖Hessian2Output,它是hessian框架提供的序列化类,感兴趣的朋友可以去了解一下,不在本文的讨论范围之内。

4. 总结

网络传输的总是字节序列,无论是请求还是响应,发送方要编码,接收方要解码。Dubbo默认使用Netty作为网络传输层框架,实现消息编解码的方式是在Pipeline的头部设置编解码器,如此一来,对于出站数据,最终会经过Encoder编码,对于入站数据,首先要经过Decoder解码。
解码相较于编码要更加的复杂,因为它要处理TCP粘包/拆包的问题,Dubbo的解决方案是将BodyLength写入Header后4个字节,接受方首先要保证读取到一个完整的Header,然后提取出BodyLength,以此判断接收到的消息是否完整。如果不完整,会返回NEED_MORE_INPUT代表需要等待对端传输更多的数据。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5673027.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)