原来你是这样的生产者

原来你是这样的生产者,第1张

01上帝视角--俯瞰生产者

       看过上一篇文章的读者应该了解到生产者和消费者都是kafka的客户端,是向kafka集群发送消息记录的。KafkaProducer是线程安全的,多线程共享同一个实例要比多实例的性能更好。

      一般,我们使用生产者时可能采用如下方式创建:

        Properties producerProperties = new Properties();
        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        producerProperties.put(ProducerConfig.RETRIES_CONFIG, 3);
        producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 0);
        producerProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
        KafkaProducer producer = new KafkaProducer<>(properties);

        ProducerRecord record = new ProducerRecord<>(TOPIC,"111","test-data");

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.out.println(recordMetadata.offset());
            }
        });

       kafka的生产端提供了很多的配置参数,上面只是配置了一小部分,具体的参数作用会在后面涉及到的地方做一个详细的解释。

      生产者的内部维护一个缓冲区用来存储未发起发送请求的记录,同时会开启IO线程不断地将符合发送条件的所有记录异步的封装成网络请求发向kafka集群。整个过程看起来如下:

     

 

      整个的生产者的过程分为两个大的步骤。

       第一步是将消息记录写入上面提到的缓冲区BufferPool,但是这一步为了提高生产者性能,并不是将记录直接写入缓冲区,而是将多条记录封装到一个ProducerBatch对象中写入BufferPool,之后再加入队列Deque。

     第二步是IO线程从缓冲区取出消息封装成网络请求将消息发送到kafka集群。这两个过程构成了一个“小型”的生产消费模型,生产者线程有多个,发送IO线程只有一个,通过对队列Deque加锁实现线程安全。

      在第一步中的主要角色是“记录收集器”,负责将记录写入最终的BufferPool中,但是在写入到BufferPool之前消息需要几道工序的处理,我们将深入到细节看一看生产者在生产消息时都做了什么?

       

        

     一条消息在被加入到记录收集器之前,会经历拦截器处理、分区元数据更新、分配分区、key-value序列化和一些消息大小校验等工作。

       下面对每一步进行更加细节的介绍(更新元数据在未来的章节单独介绍)。

      

02消息的拦截处理

    

      我们在发送消息前,所有的消息都必须(如有)经过拦截器的处理,那么拦截器是怎么初始化的呢?或者说我们怎么构造我们自己的拦截器呢?

        翻看源码我们可以看到KafkaProducer中有这么几个重载的构造方法

       

       所有的重载的最终都会调用最后一个缺省权限的方法,这个构造会处理所有你配置的生产者的配置项。其中interceptor.classes就是用来配置拦截器的,要求interceptor.classes对应的类实现ProducerInterceptor接口。      

/**
 * 生产者拦截器
 * @author wushuang
 */
public class MyProducerInterceptor implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        return null;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map configs) {
    }
}

      我们定义了一个生产者拦截器,里面都是空的实现,我们需要在生产者初始化的property中加入参数。

//ProducerConfig.INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.ccutsky.config.KafkaProducerConfig");

       该参数的值可以是多个,是以逗号分隔的一个字符串。

       我们重点看一下拦截器接口的几个方法的触发时机。      

public interface ProducerInterceptor extends Configurable {
    ProducerRecord onSend(ProducerRecord var1);

    void onAcknowledgement(RecordMetadata var1, Exception var2);

    void close();
}

       Interceptor定义了三个方法:

      onSend:在消息的key和value以及分区分配前触发,允许对key和value修改,并返回修改后的值,这里要注意最好不要改记录的topic和partition。拦截器调用顺序和interceptor.classes配置的顺序相同,并且后一个拦截器的入参是前一个拦截器的返回值,依次传递下去,当其中一个拦截器抛出异常时整个拦截链的执行不会停止,会将最后一个不抛出异常的拦截器的返回值作为后续调用参数继续执行。

     onAcknowledgement:这个方法触发的时机有两个:

    1)服务端成功接收消息并ack 

    2)KafkaProducer抛出异常。

    close:在生产者close的时候触发i,一般是用来清理一下资源等。

       根据拦截器的几个方法调用时机,我们可以利用拦截器来统一处理我们的消息体,比如 我们可以对消息体进行加密处理,或者我们对某些消息字段如手机号、身份z号等个人信息进行脱敏处理,这当然是根据你的业务场景来考虑,在消费端会通过消费者拦截器进行解密处理,这样保证了消息的安全性。

      还有拦截器适合做一些统计类的功能,比如总发送消息数,成功数量以及失败数量等,总之你想统一修改消息key、value的所有业务场景都可以在拦截器中统一处理。  

03消息分区分配策略

   

      上一篇我们已经说过,topic可以分为多个分区,目的是为了增加吞吐量,那么怎么确定我们发送的消息应该属于哪个分区呢?

      翻阅kafka的ProduceRecord的源码我们可以看,record构建有几个重载的方法,有一些构造方法是支持我们指定某一个分区的

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable
headers) {         //为了篇幅,省略了body体     }     public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {         this(topic, partition, timestamp, key, value, null);     }       public ProducerRecord(String topic, Integer partition, K key, V value, Iterable
headers) {         this(topic, partition, null, key, value, headers);     }          public ProducerRecord(String topic, Integer partition, K key, V value) {         this(topic, partition, null, key, value, null);     }     public ProducerRecord(String topic, K key, V value) {         this(topic, null, null, key, value, null);     }        public ProducerRecord(String topic, V value) {         this(topic, null, null, null, value, null);     }

       如果指定了分区,那么就很简单,直接就用指定的分区就好了,我们主要讨论下在没有指定分区的时候消息的分区应该怎么分配。

    负责为消息分配分区的组件我们暂且叫它为“分区器”,英文名叫Partitioner,我们看下它的类图       

      分区器定义了三个方法:

      partiton:计算并返回记录的分区

      close:分区关闭时会触发close方法

      onNewBatch:当新的批记录创建时的回调方法

      截止到kafk-client-3.1版本,一共有三种分区器的内部实现

  • 默认的分区器DefaultPartitioner

     如果消息中指定了key,利用murmur2进行hash运算然后对分区数量求余,得到分区编号;如果消息记录中没有指定key,那么将采用粘滞分区的策略,实现原理是内部维护一个称之为indexCache的缓存,所有请求直接从缓存中返回当前的分区,当第一次访问时,缓存中没有分区,会生成一个随机数对分区数取余作为新的分区号,粘滞分区的目的如果缓存中的分区没有改变就会一直使用缓存中的分区,目的是让一个ProducerBatch尽快填满达到可发送状态(因为每个分区对应一个队列,队列中存放的就是ProducerBatch记录),当当前ProducerBatch记录填满的时候,需要新建ProducerBatch(下一篇会详细说说),会触发onNewBatch接口,这个缓存中的分区将被刷新,生成一个和上次分区编号不同的新的分区号,当然如果只有一个分区直接返回就可以了。

  •      轮询分区器RoundRobinPartitioner

轮询分区器内部维护一个AtomicInteger的map,每次自增然后对分区数取余。        

  • 粘滞分区器UniformStickyPartitioner

      和默认的分区器的区别就是忽略了key的存在,直接统一采用粘滞分区策略,对于通过key来保证消息落在同一个分区进而保证消息的顺序性的需要注意避免使用该分区器

      除上面的内置的几个分区器之外,根据需要我们可以自定义分区的策略,通过生产者的配置参数partitioner.class,配置的类需要实现接口

org.apache.kafka.clients.producer.Partitioner。   

04序列化策略

      生产者发送的消息,需要通过网络请求发送到kafka集群,所以必须对所消息进行序列化成二进制才能进行网络传输。

      在生产者进行创建的时候,需要指定key和value的序列化器,即需要配置参数key.serializer和value.serializer,参数的值为类的全局限定名,并且该类需要实现org.apache.kafka.common.serialization.Serializer

接口,对于key来说,我们一般只需要配置成内置的StringSerializer 就可以 ,value我们可以根据需要自己实现一个,比如将对象转成成JSON的格式再序列化为byte数组,这样在消费端用JSON形式的反序列化就可以读出json格式的的对象这样方便消息的解析和处理。

05版本兼容的策略

     从2012年的1月4号发布0.7.0 Release到现在最新的3.1.0(2022-1-24日发布),我相信整个工程里里外外都“翻新”了一遍,那么在每次更新时,需要考虑前前后后的兼容问题,比如我用老的客户端能否向版本比较新的集群发送消息?或者是新的客户端能否向老版本的集群成功发送消息?甚至kafka的集群中每一个broker版本都不见得一致,那么不同版本的服务端和客户端需要怎么处理兼容问题呢?

      kafka为了解决上述的问题,设计了一个ApiVersion的概念,我们先看一下ApiVersion的类图(省略了一些字段和方法):

         

      我们先看NodeApiVersions,这个对象主要是存储了一个Node节点的API版本信息,成员变量supportedVersions的key为api的请求类型,所有的请求类型都有一个对应的ApiKeys,比如生产请求的key为ApiKeys.PRODUCE,消费请求为ApiKeys.FETCH,获取元数据的请求为ApiKeys.METADATA等,集群提供的api接口很多具体看下枚举类org.apache.kafka.common.protocol.ApiKeys。supportedVersion的值即为ApiVersion,可以看到有两个比较重要的属性,最大和最小的版本,控制着node节点支持的最大和最小版本号,但是这个版本不是kafka的发布版本号,是api的版本号。

      生产者使用ApiVersions的最重要的一个用途是确定消息的格式的版本,

不同的node节点接受的消息格式可能会有出路,需要确定一个能适合所有的node节点的消息格式,ApiVersions类会依赖NodeApiVersions类,每当nodeApiVersions有更新时,会重新计算一个叫生产者"魔数"(下文统一为Magic)的变量,正是这个变量来决定写入消息的格式(这里的消息格式并不是消息体中的字段,而是kafka内部使用的存储格式),具体计算逻辑如下:    

    switch (produceRequestVersion) {
            case 0:
            case 1:
                //0
                return RecordBatch.MAGIC_VALUE_V0;

            case 2:
                //1
                return RecordBatch.MAGIC_VALUE_V1;

            default:
                //2
                return RecordBatch.MAGIC_VALUE_V2;
        }

      每个节点都会有一个对应的api的适用版本号范围,这里需要取出

ApiKeys.PRODUCE对应的api版本支持描述信息,因为每个版本都有一个最大支持的版本,在所有的node节点中取出能够支持对大版本的版本号,再从这些最大版本号中取出最小的一个作为所有node节点的最大的版本号,通过上面的逻辑转换为Magic(switch逻辑),最后和客户端(RecordBatch中常量CURRENT_MAGIC_VALUE,3.1版本的客户端Magic为2)的Magic对比再取一个最小值作为最匹配的Magic。

 
06 不同版本消息的内部存储结构 


      截止目前(2022-04-27)kafka的最新稳定版本为3.1,消息的Magic目前有三个值,分别为0,1和2。每个版本对应的消息格式不一样,为什么我们讲这个呢?因为后续的消息的序列化会计算消息的大小,而每种格式的消息序列化后的大小(除去业务消息的占用)占用空间也是不一样的。

   当稳定的魔法数 >= MAGIC_VALUE_V2(即2)时

      这个版本也是目前最新的ProducerBatch的schema,我们取个名字V2+

  

       看了这么多概念,是不是感觉有点信息爆炸,脑容量溢出?要了解每一个字段的作用其实需要对好多内容有一个全面的了解,我们先对这些字段有一个大概的了解,后面的章节涉及到具体的字段再细细道来。

  • baseOffsetLength:预留的起始位置的偏移量字段长度8字节

  • Length:批记录的总体长度

  • partitionLeaderEpoch:分区的leader的纪元

  • magic:批记录的魔法数标识(对应着maxUsableProduceMagic)

  • CRC:采用CRC-32C (Castagnoli) 算法来校验消息的完整性和防篡改,消息记录中参与crc计算的字段包含从attributes一直到批记录结构的最后,不包含partitionLeaderEpoch,这样就避免broker接收到批记录时重新计算crc的

  • attributes:一共两个字节,即16个位

      1. 删除线标识:用来决定batch的firstTimestamp属性是否已经被(墓碑或事务标记器,后面到)标记为待删除,如果设置为真,firstTimestamp作为删除线,否则为批记录的第一个时间戳。

      2. 控制位标识:是否为控制记录

      3. 事务标识:标识记录是否为开启事务

      4. 时间戳类型:0代表记录创建时间,1代表日志追加时间

      5. 记录的压缩类型:0-NONE即不压缩;1-gzip 2-snappy;3-lz4;4-zstd

  • lastOffsetDelta:批记录的最后一个消息的offset(相对offset

  • baseTimestamp:即firstTimestamp,第一条记录的时间戳或者为墓碑记录的时间戳

  • maxTimestamp:批记录的最大记录时间戳

  • productorId:在开启了幂等或事务的producer,需要向broker请求生成

  • productorEpoch:在开启了幂等或事务的producer,需要向broker请求生

  • baseSequence:开启了幂等或事务,会生成sequence,broker会判断该值是否比已收到请求的sequence大,否则说明重复抛出OutOfOrderSequenceException。

  •  recordCount:批记录的总记录数

当稳定的魔法数 < MAGIC_VALUE_V2(即2)并且采取了压缩算法

    这种情况下Magic0和Magic1的消息格式差别不大,消息存储分为三部分:LOG_OVERHEAD、记录压缩存储需要的额外存储结构(recordOverhead)和记录本身的存储(recordSize)。

     LOG_OVERHEAD的结构,由8字节的offset和4字节的长度组成如下

 

      记录recordOverhead存储结构如下 

  

 

     记录本身c x的存储结构,recordOverhead+实际的key的字节数+实际的消息字节数

     总结起来magic0和magic1版本消息在压缩情况下的存储结构如下图

      

Magic为1或0并且不压缩

 

      这种情况和上面的压缩方案比少了一个recordOverhead       

       

07小结

       本篇首先主要从全局的“上帝视角”总结了生产者的生产消息的过程,对生产者拦截器、分区的算法、kafka的版本兼容处理着重的分析了一下,最后对不同Magic的消息存储结构进行了详细的介绍,本来打算一口气将进入到记录收集器之前的所有逻辑都说完,后来看看篇幅有点大(怕劝退),后续内容放到下一篇吧。

       更多文章请关注公众号 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/795155.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存