自定义RPC项目——常见问题及详解(1)

自定义RPC项目——常见问题及详解(1),第1张

        作为Java后端选手,项目是必不可少的,目前主流做的就是各种管理系统和商城项目,同质化严重,那么怎么可以找到一个更加出彩的、与众不同的项目呢,我想下面这个自定义的RPC框架项目是一个不错的选择

项目地址:https://blog.csdn.net/qq_40856284/category_10138756.html

        众所周知,用框架和写框架所需要的对框架的熟悉程度显然不在一个级别。为了更好的去理解这个项目,我会把这个项目的常见问题及解答做一个整理,让我们开始吧。

        一.项目的整体架构

        这个问题能够加深我们对项目整体的认识,以及引出我们后续要讲的问题。

        

         本项目实现了基本的RPC调用的功能,包含实现注册中心、服务提供方和消费方板块。消费者调用提供者的方式取决于消费者的客户端选择,如选用原生 Socket 则该步调用使用 BIO,如选用 Netty 方式则该步调用使用 NIO。如该调用有返回值,则提供者向消费者发送返回值的方式同理。另外,项目实现了四种序列化算法:Json方式、Kryo算法、Hessian算法和Google Protobuf,在默认情况下使用Kryo进行序列化。项目还实现了两种负载均衡算法:随机算法和轮询算法。

        二.序列化算法和自定义协议

说在前面:我建议大家简历上写一种序列化算法即可,将其他的进行一个简单的了解,然后对自己选择的这种进行深入探究。

1,介绍一下项目中实现的序列化算法

JSON

  • JSON 进行序列化的额外空间开销比较大,对于大数据量服务这意味着需要巨大的内存和磁盘开销;
  • JSON 没有类型,但像 Java 这种强类型语言,需要通过反射统一解决,所以性能不会太好(比如反序列化时先反序列化为String类,要自己通过反射还原)。

Kryo

  • 使用变长的int和long保证这种基本数据类型序列化后尽量小
  • 需要传入完整类名或者利用 register() 提前将类注册到Kryo上,其类与一个int型的ID相关联,序列中只存放这个ID,因此序列体积就更小
  • 不是线程安全的,要通过ThreadLocal或者创建Kryo线程池来保证线程安全
  • 不需要实现Serializable接口
  • 字段增、减,序列化和反序列化时无法兼容
  • 必须拥有无参构造函数

Hessian

  • 使用固定长度存储int和long
  • 将所有类字段信息都放入序列化字节数组中,直接利用字节数组进行反序列化,不需要其他参与,因为存的东西多处理速度就会慢点。
  • 把复杂对象的所有属性存储在一个Map中进行序列化。所以在父类、子类存在同名成员变量的情况下,Hessian序列化时,先序列化子类,然后序列化父类,因此反序列化结果会导致子类同名成员变量被父类的值覆盖
  • 需要实现Serializable接口
  • 兼容字段增、减,序列化和反序列化
  • 必须拥有无参构造函数
  • Java 里面一些常见对象的类型不支持,比如:
    • Linked 系列,LinkedHashMap、LinkedHashSet 等;
    • Locale 类,可以通过扩展 ContextSerializerFactory 类修复;
    • Byte/Short 反序列化的时候变成 Integer。

Protobuf:

  • 序列化后体积相比 JSON、Hessian 小很多

  • IDL 能清晰地描述语义,所以足以帮助并保证应用程序之间的类型不会丢失,无需类似XML 解析器;

  • 序列化反序列化速度很快,不需要通过反射获取类型;

  • 打包生成二进制流

  • 预编译过程不是必须的

 2,为什么要进行序列化和反序列化?

        RPC涉及到两个进程间的远程通信。而两个进程之间进行远程通信时,彼此可以发送各种类型的数据,包括文本、图片、音频、视频等, 而这些数据都会以二进制序列的形式在网络上传送。那么,发送方需要把对象转换为字节序列,才能在网络上传送;而接收方则需要把字节序列再恢复为对象进行后续的处理。简单来说,序列化是为跨进程传递格式化数据所服务的。序列化概念如下:

序列化 (Serialization)是将对象的状态信息转换为可以存储传输的形式的过程。在序列化期间,对象将其当前状态写入到临时或持久性存储区。以后,可以通过从存储区中读取或反序列化对象的状态,重新创建该对象。

   通过对概念的理解我们可以看出序列化和反序列化有以下作用:
(1)实现了数据的持久化:永久性保存对象,保存对象的字节序列到本地文件或者数据库中;
(2)序列化实现远程通信:通过序列化以字节流的形式使对象在网络中进行传递和接收;
(3)通过序列化在进程间传递对象;

3,为什么选kryo序列化?Kryo原理了解吗?

        之所以使用Kryo序列化替换Java原生序列化方式,是因为Kryo序列化机制比默认的Java序列化机制速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少。Kryo 是一个快速高效的 Java 对象序列化框架,主要特点是高性能、高效和易用。最重要的两个特点,一是基于字节的序列化,对空间利用率较高,在网络传输时可以减小体积;二是序列化时记录属性对象的类型信息。

        对于Kryo原理,可以看下面这个博客:

源码分析kryo对象序列化实现原理_库克look的博客-CSDN博客_kryo 源码分析源码分析kryo对象序列化实现原理_库克look的博客-CSDN博客_kryo 源码分析

        记不住就说只会基本使用哈哈,

4,你说到你自定义了一个简单协议,自定义的协议头里包括哪些内容,多少字节,各自的作用是什么?

如图所示:

Magic Number:魔数,4个字节,作用就是快速识别字节流是否是程序能够处理的,即第一时间判断是否是无效数据包。

Package Type:包类型,4个字节,标明这是一个调用请求还是调用响应

Serializer Type:序列化类型,4个字节,标明这个包的数据的序列化方式

Data Length:数据字节的长度,4个字节

Data Bytes:传输的对象,通常是一个RpcRequestRpcClient对象,取决于Package Type字段,对象的序列化方式取决于Serializer Type字段。

5,如果被序列化对象有一个属性是对象引用,怎么序列化?

        在新版本的 Kryo 中,默认情况下是不启用对象引用的。这意味着如果一个对象多次出现在一个对象图中,它将被多次写入,并将被反序列化为多个不同的对象。

        举个例子,当开启了引用属性,每个对象第一次出现在对象图中,会在记录时写入一个 varint,用于标记。当此后有同一对象出现时,只会记录一个 varint,以此达到节省空间的目标。此举虽然会节省序列化空间,但是是一种用时间换空间的做法,会影响序列化的性能,这是因为在写入/读取对象时都需要进行追踪。

        开发者可以使用 kryo 自带的 setReferences 方法来决定是否启用 Kryo 的引用功能。

6,是否了解Serializable和Externalizable?

        一个对象序列化的接口,一个类只有实现了Serializable接口,它的对象才能被序列化。Serializable是java.io包中定义的、用于实现Java类的序列化 *** 作而提供的一个语义级别的接口。可以看到,Serializable序列化接口没有任何方法或者字段,只是用于标识可序列化的语义。

        Externalizable接口扩展自java.io.Serializable接口。实现java.io.Serializable即可获得对类的对象的序列化功能。而Externalizable可以通过writeExternal()和readExternal()方法可以指定序列化哪些属性。Externalizable自定义序列化可以控制序列化的过程和决定哪些属性不被序列化

7,那你这个序列化还是针对Java语言的,如何实现跨语言的序列化或者RPC框架?

        之前让大家主要提及的是Kryo序列化方式,而这种方式是Java To Java的,而RPC框架要想跨语言,本质是在解决序列化/反序列化的跨语言问题。所以我们这个时候可以顺着面试官的问题说,可以设计多种序列化方式供选择,例如JSON就可以实现跨平台。具体实现看下面。

8,项目编解码及序列化具体实现细节?

        上代码,看序列化的实现。首先定义一个通用的序列化接口CommonSerializer:

public interface CommonSerializer {
    Integer KRYO_SERIALIZER = 0;
    Integer JSON_SERIALIZER = 1;
    Integer HESSIAN_SERIALIZER = 2;
    Integer PROTOBUF_SERIALIZER = 3;
    Integer DEFAULT_SERIALIZER = KRYO_SERIALIZER;
    static CommonSerializer getByCode(int code) {//1.根据编号获取序列化器
        switch (code) {
            case 0:
                return new KryoSerializer();
            case 1:
                return new JsonSerializer();
            case 2:
                return new HessianSerializer();
            case 3:
                return new ProtobufSerializer();
            default:
                return null;
        }
    }
    byte[] serialize(Object obj);//2.序列化
    Object deserialize(byte[] bytes, Class clazz);//3.反序列化
    int getCode();//4.获取序列化编号
}

        根据接口,我们的主要任务就是实现其中的主要两个方法,serialize() 和 deserialize() ,具体的Kryo序列化器实现如下:

        注意的点:

        1.这里 Kryo 可能存在线程安全问题,文档上是推荐放在 ThreadLocal 里,一个线程一个 Kryo实例。

        2.和很多其他的序列化框架一样,Kryo 为了提供性能和减小序列化结果体积,提供注册的序列化对象类的方式。在注册时,会为该序列化类生成 int ID,后续在序列化时使用 int ID 唯一标识该类型。

        3.在序列化时,先创建一个 Output 对象(Kryo 框架的概念),接着使用 writeObject 方法将对象写入 Output 中,最后调用 Output 对象的 toByte() 方法即可获得对象的字节数组。反序列化则是从 Input 对象中直接 readObject,这里只需要传入对象的类型,而不需要具体传入每一个属性的类型信息。

public class KryoSerializer implements CommonSerializer {

    private static final Logger logger = LoggerFactory.getLogger(KryoSerializer.class);

    private static final ThreadLocal kryoThreadLocal = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        kryo.register(RpcResponse.class);
        kryo.register(RpcRequest.class);
        kryo.setReferences(true);
        kryo.setRegistrationRequired(false);
        return kryo;
    });

    @Override
    public byte[] serialize(Object obj) {
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
             Output output = new Output(byteArrayOutputStream)) {
            Kryo kryo = kryoThreadLocal.get();
            kryo.writeObject(output, obj);
            kryoThreadLocal.remove();
            return output.toBytes();
        } catch (Exception e) {
            logger.error("序列化时有错误发生:", e);
            throw new SerializeException("序列化时有错误发生");
        }
    }

    @Override
    public Object deserialize(byte[] bytes, Class clazz) {
        try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
             Input input = new Input(byteArrayInputStream)) {
            Kryo kryo = kryoThreadLocal.get();
            Object o = kryo.readObject(input, clazz);
            kryoThreadLocal.remove();
            return o;
        } catch (Exception e) {
            logger.error("反序列化时有错误发生:", e);
            throw new SerializeException("反序列化时有错误发生");
        }
    }

    @Override
    public int getCode() {
        return SerializerCode.valueOf("KRYO").getCode();
    }
}

        编解码实现细节,根据我们上面定义的自定义协议进行编码工作:

        CommonEncoder 继承了MessageToByteEncoder 类,就是把 Message(实际要发送的对象)转化成 Byte 数组。CommonEncoder 的工作很简单,就是把 RpcRequest 或者 RpcResponse 包装成协议包。 根据上面提到的协议格式,将各个字段写到管道里就可以了,这里serializer.getCode() 获取序列化器的编号,之后使用传入的序列化器将请求或响应包序列化为字节数组写入管道即可。

public class CommonEncoder extends MessageToByteEncoder {

    private static final int MAGIC_NUMBER = 0xCAFEBABE;

    private final CommonSerializer serializer;

    public CommonEncoder(CommonSerializer serializer) {
        this.serializer = serializer;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        out.writeInt(MAGIC_NUMBER);
        if(msg instanceof RpcRequest) {
            out.writeInt(PackageType.REQUEST_PACK.getCode());
        } else {
            out.writeInt(PackageType.RESPONSE_PACK.getCode());
        }
        out.writeInt(serializer.getCode());
        byte[] bytes = serializer.serialize(msg);
        out.writeInt(bytes.length);
        out.writeBytes(bytes);
    }
}

        CommonDecoder 继承自 ReplayingDecoder ,与 MessageToByteEncoder 相反,它用于将收到的字节序列还原为实际对象。主要就是一些字段的校验,比较重要的就是取出序列化器的编号,以获得正确的反序列化方式,并且读入 length 字段来确定数据包的长度(防止粘包),最后读入正确大小的字节数组,反序列化成对应的对象。

public class CommonDecoder extends ReplayingDecoder {

    private static final Logger logger = LoggerFactory.getLogger(CommonDecoder.class);
    private static final int MAGIC_NUMBER = 0xCAFEBABE;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        int magic = in.readInt();
        if(magic != MAGIC_NUMBER) {
            logger.error("不识别的协议包: {}", magic);
            throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
        }
        int packageCode = in.readInt();
        Class packageClass;
        if(packageCode == PackageType.REQUEST_PACK.getCode()) {
            packageClass = RpcRequest.class;
        } else if(packageCode == PackageType.RESPONSE_PACK.getCode()) {
            packageClass = RpcResponse.class;
        } else {
            logger.error("不识别的数据包: {}", packageCode);
            throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
        }
        int serializerCode = in.readInt();
        CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
        if(serializer == null) {
            logger.error("不识别的反序列化器: {}", serializerCode);
            throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
        }
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        Object obj = serializer.deserialize(bytes, packageClass);
        out.add(obj);
    }
}
 

        Netty 中有一个很重要的设计模式——责任链模式,责任链上有多个处理器,每个处理器都会对数据进行加工,并将处理后的数据传给下一个处理器。将编解码器实现后,在我们使用Netty进行网络传输时,通过addLast()方法编解码器的实例添加到pipeline即可。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/924327.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)