前面分析了Consumer是如何发起远程服务调用的,最终DubboInvoker会利用ExchangeClient客户端发送网络请求。Dubbo会将网络请求封装为Request对象发送,但是网络传输的总是字节序列,Request对象必须经过编码才能被发送。同理,服务端在接收到客户端的请求后,也必须先解码才能得到Request对象,Response亦是如此。
Dubbo网络通讯协议分为两部分,分别是Header和Body,Header部分采用Codec编解码,Body部分使用序列化。本篇文章会分析Dubbo对于消息的编解码和序列化的细节。
2. 编解码Dubbo默认使用Netty作为网络传输层框架,因此我们也以Netty为例,分别从客户端编码和服务端解码两个视角去分析。
2.1 Encoder消息的编码相对于解码来说要简单的多,因为不用考虑TCP粘包/拆包的问题。要想知道发送的Request对象经历了什么,我们首先要从NettyClient说起。
使用Netty,作为开发者而言,最重要的就是设计ChannelHandler,编排ChannelHandlerPipeline。一般来说,入站的数据需要先解码,出站的数据最终需要编码,因此会在Pipeline的头部设置编解码处理器,Dubbo也正是这么处理的。
ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)) .addLast("handler", nettyClientHandler);
Dubbo在Pipeline的头部放了编码器InternalEncoder,它依赖于ExchangeCodec,所以我们直接看ExchangeCodec#encodeRequest()。
编码分为两部分,根据协议设置Header,再根据序列化策略将Request里的Data序列化后写入Body。
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { // 序列化策略 默认hessian2 Serialization serialization = getSerialization(channel); // 协议头Header 16字节 byte[] header = new byte[HEADER_LENGTH]; // 2字节 魔数 Bytes.short2bytes(MAGIC, header); header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); if (req.isTwoWay()) { header[2] |= FLAG_TWOWAY; } if (req.isEvent()) { header[2] |= FLAG_EVENT; } // RequestId 全局自增 Bytes.long2bytes(req.getId(), header, 4); int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); // 序列化Data ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) {// 事件消息 encodeEventData(channel, out, req.getData()); } else { encodeRequestData(channel, out, req.getData(), req.getVersion()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); // 校验负载,Body是否太大 checkPayload(channel, len); Bytes.int2bytes(len, header, 12); buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); // write header. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); }
关于Dubbo网络通讯协议的部分,请查阅之前的文章。
这里需要注意一点,Header后4个字节记录的是BodyLength,服务端会基于此解决TCP粘包拆包的问题。
2.2 Decoder解码比编码要复杂的多,因为要考虑TCP粘包拆包的场景。
和编码一样,针对服务端的解码,我们要从NettyServer说起,下面是Dubbo Pipeline的设置。
ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler);
可以发现,和客户端几乎如出一辙,唯一的不同是最后的Handler,一个用来处理客户端逻辑,一个用来处理服务端逻辑。
Dubbo在Pipeline的头部放了解码器InternalDecoder,它继承自Netty提供的ByteToMessageDecoder抽象类。它依赖DubboCountCodec,本身不处理解码逻辑,就是个简单的循环,以便读取多条消息。
private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf input, List
DubboCountCodec也是个装饰者,它本身也不处理解码逻辑,它在DubboCodec的基础上增加了解码多条消息的能力,而且会把解码的消息数写入attachments。
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int save = buffer.readerIndex(); // 由于网络问题,可能会接收到多条消息 MultiMessage result = MultiMessage.create(); do { Object obj = codec.decode(channel, buffer); if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) { // 读到的数据不完整,需要对端发送更多的数据 buffer.readerIndex(save); break; } else { result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); } } while (true); if (result.isEmpty()) { // 没有读取到消息 return Codec2.DecodeResult.NEED_MORE_INPUT; } if (result.size() == 1) { return result.get(0); } return result; }
最终会调用ExchangeCodec#decode()开始解码单条消息,首先会尝试读取Header,但是由于TCP拆包的问题,读取到的Header可能并不完整,后面会做判断。
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { // 可读字节数 int readable = buffer.readableBytes(); // header可能不完整 byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); return decode(channel, buffer, readable, header); }
接下来会对接收到的数据做判断,是否满足16字节,如果不满足,说明连最基本的Header都不完整,此时会返回NEED_MORE_INPUT,表示暂不处理,等待对端发送更多的数据。
// Header不完整,需要等待对端发送更多的数据 if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; }
如果Header读取完毕,则开始解析BodyLength,判断Body是否读取完整,如果不完整照样没法处理,需要等待对端发送更多的数据。
// 从Header解析BodyLength int len = Bytes.bytes2int(header, 12); checkPayload(channel, len); // 总的消息长度 int tt = len + HEADER_LENGTH; if (readable < tt) { // 消息不完整,等待对端传输更多数据 return DecodeResult.NEED_MORE_INPUT; }
到这里,说明一条完整的消息已经接收到了,可以调用decodeBody()方法进行解码了。解码的方式也不复杂,客户端会按照协议格式写入数据,服务端按照相同的格式读取出来即可,最终的到Request对象。
这里有个点需要注意,Dubbo对于消息Body的反序列化是可以设置工作线程的,默认是在业务线程上进行,也可以通过参数decode.in.io设置在IO线程上进行。
DecodeableRpcInvocation inv; // 反序列化是否使用IO线程,默认false if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); inv.decode();// IO线程直接反序列化 } else { inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv;
解码出来的Request后面会交给DecodeHandler处理,方法是received(),如果前面IO线程没有反序列化,这里会利用业务线程反序列化,最终交给Handler处理。
public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } if (message instanceof Request) { // 解码Data 反序列化 decode(((Request) message).getData()); } if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message); }
如果是来自Consumer的RPC调用请求,解码后的结果就是RpcInvocation,最终会交给DubboProtocol里的ExchangeHandler处理,调用本地Invoker,响应结果,流程结束。
3. 序列化Dubbo支持多种序列化策略,例如Java本身的序列化、hessian2、Kryo等,序列化策略也是通过SPI加载的,可以非常方便的更换。
Serialization是Dubbo对序列化的抽象接口,默认的序列化方案是Hessian2,我们以它为例分析。
序列化和反序列化其实就是把Java对象和字节序列相互转换的一个过程,Dubbo将这个过程也抽象成了两个接口,分别是ObjectOutput和ObjectInput,前者用于序列化,后者用于反序列化。
public class Hessian2Serialization implements Serialization { @Override public ObjectOutput serialize(URL url, OutputStream out) throws IOException { return new Hessian2ObjectOutput(out); } @Override public ObjectInput deserialize(URL url, InputStream is) throws IOException { return new Hessian2ObjectInput(is); } }
Hessian2序列化,底层依赖Hessian2Output,它是hessian框架提供的序列化类,感兴趣的朋友可以去了解一下,不在本文的讨论范围之内。
4. 总结网络传输的总是字节序列,无论是请求还是响应,发送方要编码,接收方要解码。Dubbo默认使用Netty作为网络传输层框架,实现消息编解码的方式是在Pipeline的头部设置编解码器,如此一来,对于出站数据,最终会经过Encoder编码,对于入站数据,首先要经过Decoder解码。
解码相较于编码要更加的复杂,因为它要处理TCP粘包/拆包的问题,Dubbo的解决方案是将BodyLength写入Header后4个字节,接受方首先要保证读取到一个完整的Header,然后提取出BodyLength,以此判断接收到的消息是否完整。如果不完整,会返回NEED_MORE_INPUT代表需要等待对端传输更多的数据。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)