kafka 客户端之producer API发送消息以及简单源码分析

kafka 客户端之producer API发送消息以及简单源码分析,第1张

kafka 客户端之producer API发送消息以及简单源码分析

背景:我使用docker-compose 搭建的kafka服务
kafka的简单介绍以及docker-compose部署单主机Kafka集群

Kafka API简单介绍

kafka除了用于管理和管理任务的命令行工具,Kafka还有5个用于Java和Scala的核心API

他们分别是

The Admin API : 用于管理和inspect topics, brokers和其他 Kafka 对象The Producer API: 将事件流发布(写入)到一个或多个 Kafka topicsThe Consumer API: 订阅(读取)一个或多个topics并处理它们生成的事件流The Kafka Streams API: 用于实现流处理应用程序和微服务,它提供了更高级别的方法来处理事件流,包括转换、聚合和连接等有状态 *** 作、窗口化、基于事件时间的处理等等。从一个或多个topics读取输入以生成一个或多个topics的输出,有效地将输入流转换为输出流。The Kafka Connect API:用于构建和运行可重用 的数据导入/导出connectors,这些connectors从外部系统和应用程序消费(读取)或产生(写入)事件流,以便它们可以与 Kafka 集成。例如,与 PostgreSQL 等关系数据库的连接器可能会捕获表的每次更改。但是,在实践中,您通常不需要实现自己的connectors,因为 Kafka 社区已经提供了数百个即用型connectors。

我使用的wurstmeister/kafka镜像的kafka是2.8.1版本的,通过docker inspect命令可以查看

kafka客户端 *** 作之Admin API对admin 的API做了一些简单使用介绍,这里来介绍一下producer 发送消息的API的基本使用和简单的源码分析。

Producer

Producer是一个接口,下面有两个实现

KafkaProducerMockProducer

MockProducer是Producer接口的模拟,可用于测试使用 Kafka 的代码
我们使用KafkaProducer

创建producer实例
public static Producer createProducer() {
    Properties properties = new Properties();
    //配置文件里面的变量都是静态final类型的,并且都有默认的值
    //用于建立与 kafka 集群连接的 host/port
    //继承的hashtable,保证了线程安全
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka服务器IP:9092");
    
    properties.put(ProducerConfig.ACKS_CONFIG,"all");
    
    properties.put(ProducerConfig.RETRIES_CONFIG,"0");
    
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
    
    properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
    
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    
    properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"5000");

    //将消息发送到kafka server, 所以肯定需要用到序列化的 *** 作  我们这里发送的消息是string类型的,所以使用string的序列化类
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

    return new KafkaProducer<>(properties);
    
}

在new KafkaProducer时,构造器里做了什么:

读取Properties里的配置项,初始化ProducerConfig基于ProducerConfig初始化一些配置字段初始化MetricConfig监控度量指标配置以及MetricsReporter报告器列表(使用的是JmxReporter,是MetricsReporter接口的实现)和Metrics存储库从配置中加载消息key和value的序列化器(Serializer)初始化RecordAccumulator,一个类似于计数器的东西,用于计算消息批次的。因为Producer并不是接收到一条消息就发送到一条消息,而是达到一定批量后按批次发送的,所以需要有一个计数器来存储和计算批次。初始化用于发送消息的Sender,然后会为其创建一个守护线程,并启动

KafkaProducer所有的属性都是final的,并且均在构造器中完成了初始化,不存在不安全的共享变量,这也就变相说明了KafkaProducer是线程安全的

配置项简单介绍

Properties类继承于 Hashtable.表示一个持久的属性集.属性列表中每个键及其对应值都是一个字符串。put()等设置配置等方法保证了线程安全。主要的作用是通过修改配置文件可以方便的修改代码中的参数,实现不用改class文件即可灵活变更参数。

Properties类里面的变量都是静态final类型的,并且都有默认的值. Properties类里面的ConfigDef()的对象实例CONFIG维护了默认配置

properties.put() 使用这个方法来设置配置。key为配置项的名称,例如下面的ProducerConfig.BOOTSTRAP_SERVERS_CONFIG对应着配置文件字符串
bootstrap.servers,value为我们传入的具体配置值。

ProducerConfig.BOOTSTRAP_SERVERS_CONFIG

用于建立与 kafka 集群连接的 host/port

ProducerConfig.ACKS_CONFIG

producer 需要 server 接收到数据之后发出的确认接收的信号,此项配置就是指 procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项:
(1)acks=0: 设置为 0 表示 producer 不需要等待任何确认收到的信息。副本将立即加到socket buffer 并认为已经发送。没有任何保障可以保证此种情况下 server 已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的 offset 会总是设置为-1;
(2)acks=1: 这意味着至少要等待 leader已经成功将数据写入本地 log,但是并没有等待所有 follower 是否成功写入。这种情况下,如果 follower 没有成功备份数据,而此时 leader又挂掉,则消息会丢失。
(3)acks=all: 这意味着 leader 需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证,但是这样性能最低
(4)其他的设置,例如 acks=2 也是可以的,这将需要给定的 acks 数量,但是这种策略一般很少用

ProducerConfig.RETRIES_CONFIG

如果请求失败,生产者会自动重试,如果启用重试,则会有重复消息的可能性。
设置大于 0 的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许
重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个 partition,则第一个消息失败第二个发送成功,则第二条消息会比第一
条消息出现要早。

ProducerConfig.BATCH_SIZE_CONFIG

缓存的大小是通过 ProducerConfig.BATCH_SIZE_CONFIG
配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置ProducerConfig.LINGER_MS_CONFIG大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段,可能10条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。

ProducerConfig.LINGER_MS_CONFIG

producer 组将会汇总任何在请求与发送之间到达的消息记录一个单独批量的请求。通常来说,这只有在记录产生速度大于发送速度的时候才能发生。然而,在某些条件下,客户端将希望降低请求的数量,甚至降低到中等负载一下。这项设置将通过增加小的延迟来完成。不是立即发送一条记录,producer 将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理。这可以认为是 TCP 种 Nagle 的算法类似。这项设置设定了批量处理的更高的延迟边界:一旦我们获得某个 partition 的batch.size,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比这项设置要小的多,我们需要“linger”特定的时间以获取更多的消息。 这个设置默认为 0,即没有延迟。设定 linger.ms=5,例如,将会减少请求数目,但是同时会增加 5ms 的延迟

ProducerConfig.BUFFER_MEMORY_CONFIG

producer 可以用来缓存数据的内存大小。如果数据产生速度大于向 broker 发送的速度,将会耗尽这个缓存空间,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和 producer 能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer 使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。

ProducerConfig.MAX_BLOCK_MS_CONFIG

该配置控制 KafkaProducer’s send(),partitionsFor(),inittransaction (),sendOffsetsToTransaction(),commitTransaction() 和abortTransaction()方法将阻塞。对于send(),此超时限制了获取元数据和分配缓冲区的总等待时间"

Kafka的消息传递保障

我们首先要了解一下消息的传递语义,一般存在三种类型语义:

At most once(最多一次):消息传递过程中有可能丢失,丢失的消息也不会重新传递,其实就是保证消息不会重复发送或者重复消费At least once(至少一次):消息在传递的过程中不可能会丢失,丢失的消息会重新传递,其实就是保证消息不会丢失,但是消息有可能重复发送或者重复被消费Exactly once(正好一次):这个是大多数场景需要的语义,其实就是保证消息不会丢失,也不会重复被消费,消息只传递一次

在Kafka中主要通过消息重发和ACK机制来保障消息的传递,消息重发机制主要是提高消息发送的成功率,并不能保证消息一定能发送成功。我们可以通过在创建Producer实例时,设置retries配置项来开启或关闭消息重发机制,另一个消息传递保障机制就是ACK机制。也就是我们上面介绍的ProducerConfig.RETRIES_CONFIG和ProducerConfig.ACKS_CONFIG这两个配置项

序列化

配置序列化的实现类

//将消息发送到kafka server, 所以肯定需要用到序列化的 *** 作
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

找到kafka-clients的包

在依赖的kafka包里面可以看见,common包里面的serialization下面提供了很多序列化的实现

KafkaProducer流程结构图

整体流程步骤

1、客户端写程序,通过props中写的属性来连接broker集群,连接zookeeper集群,获取metadata信息,构建待发送的消息对象ProducerRecord,然后调用KafkaProducer.send方法进行发送。

2、ProducerRecord对象携带者topic,partition,message等信息,在Serializer中被序列化

3、序列化过后的ProducerRecord对象进入Partitioner“中,按照Partitioning 策略决定这个消息将被分配到哪个Partition中。

4、确定partition的ProducerRecord进入一个缓冲区,通过减少IO来提升性能。此时KafkaProducer.send方法成功返回。在缓冲区内,消息被按照TopicPartition信息进行归类整理,相同Topic且相同parition的ProducerRecord被放在同一个RecordBatch中,等待被发送。什么时候发送?都在Producer的props中被指定了,有默认值,显然我们可以自己指定。
一旦,当单个RecordBatch的linger.ms延迟到达或者batch.size达到上限,这个 RecordBatch会被立即发送。另外,如果所有RecordBatch作为一个整体,达到了buffer.memroy或者max.block.ms上限,所有的RecordBatch都会被发送。

5、KafkaProducer中还有一个专门的Sender IO线程负责将缓冲池中的ProducerRecord消息按照分配好的Partition分批次发送给对应的broker,完成真正的消息发送逻辑。broker接收保存消息,更新metadata信息,同步给Zookeeper。

ProducerRecord

消息记录,记录了要发送给kafka集群的消息、分区等信息

topic:必须字段,表示该消息记录record发送到那个topic。value:必须字段,表示消息内容。partition:可选字段,要发送到哪个分区partition。key:可选字段,消息记录的key,可用于计算选定partition。timestamp:可选字段,时间戳;表示该条消息记录的创建时间createtime,如果不指定,则默认使用producer的当前时间。headers:可选字段,(作用暂时不明)。 Producer 异步发送

producer是线程安全的,在线程之间共享单个producer实例,通常单例比多个实例要快。
一个简单的例子,使用producer发送一个有序的key/value(键值对)

public static void producerSend(Producer producer){

    // 消息对象 - ProducerRecoder
    for(int i=0;i<10;i++){
        ProducerRecord record =  new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
        producer.send(record);
    }
    // 所有的通道打开都需要关闭   close方法会会将缓存队列状态置为关闭,唤醒io线程将内存中的数据发往broker,避免这个程序的进程突然挂掉,然后内存里面的消息丢失,所以这个方法结束的时候,将消息数据都发送出去
    producer.close();
}
send() 方法

主要做了以下事情:

    使用序列化器去序列化消息的key和value计算分区,即计算消息具体进入哪一个partition,也就是一个负载均衡的过程计算批次,判断是否需要创建新的批次,然后都需要调用accumulator.append向批次中追加消息当批次满了,调用sender.wakeup在守护线程中去发送消息如果callback实例不为null,调用callback的onCompletion方法
@Override
public Future send(ProducerRecord record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

send()方法是异步的,添加消息到缓冲区等待发送,并立即返回。生产者将单个的消息批量在一起发送来提高效率。

发送的结果是一个Recordmetadata ,指定记录发送到的分区、分配的偏移量和记录的时间戳。如果主题使用CreateTime ,则时间戳将是用户提供的时间戳,如果用户没有为记录指定时间戳,则时间戳将是记录发送时间。如果主题使用LogAppendTime ,则时间戳将是附加消息时的 Kafka 代理本地时间。

由于发送调用是异步的,它会为将分配给该记录的Recordmetadata返回一个Future 。在这个未来调用get()将阻塞,直到相关请求完成,然后返回记录的元数据或抛出发送记录时发生的任何异常。

如果不想阻塞,就实现一个回调实例传入

形参:

record - 发送的记录(消息)callback - 用户提供的callback,服务器来调用这个callback来应答结果(null表示没有callback) Throws异常:

InterruptException - 如果线程在阻塞中断。SerializationException - 如果key或value不是给定有效配置的serializers。TimeoutException - 如果获取元数据或消息分配内存话费的时间超过max.block.ms。KafkaException - Kafka有关的错误(不属于公共API的异常)。 消息丢失问题

生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会丢失这些消息。因为send()方法只是将消息缓冲到本地,如果客户端进程突然挂掉,就会产生消息丢失,通过设置produce的linger.ms为0可以让消息立即发送,但是本来就是通过了批量发送来提高kafka的吞吐量的,因为批量发送可以减少IO开销。或者在这个方法的末尾,调用一下close(),告诉kafka生产者客户端立即发送。

Producer异步回调发送

回调通常会在生产者的 I/O 线程中执行,因此应该相当快,否则它们会延迟从其他线程发送消息。如果要执行阻塞或计算量大的回调,建议在回调主体中使用自己的java.util.concurrent.Executor来并行处理。

异步回调实现,只是传入了一个回调实例

public static void producerSendWithCallback(Producer producer){

    // 消息对象 - ProducerRecoder
    for(int i=0;i<10;i++){
        ProducerRecord record = new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
        //就是多传入一个回调实例
        
        producer.send(record, new Callback() {
            
            @Override
            public void onCompletion(Recordmetadata recordmetadata, Exception e) {
                System.out.println(
                        "partition : "+recordmetadata.partition()+" , offset : "+recordmetadata.offset());
            }
        });
    }

    // 所有的通道打开都需要关闭  close方法会会将缓存队列状态置为关闭,唤醒io线程将内存中的数据发往broker,避免这个程序的进程突然挂掉,然后内存里面的消息丢失,所以这个方法结束的时候,将消息数据都发送出去
    producer.close();
}

异步回调时序图

回调的顺序性

发送到同一分区的记录的回调保证按顺序执行。也就是说,在以下示例中, callback1保证在callback2之前执行:

producer.send(new ProducerRecord(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
完整代码
public class ProducerSample {

    private final static String TOPIC_NAME="xt";


    
    public static Producer createProducer() {
        Properties properties = new Properties();
        //配置文件里面的变量都是静态final类型的,并且都有默认的值
        //用于建立与 kafka 集群连接的 host/port
        //继承的hashtable,保证了线程安全
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka服务器IP:9092");
        
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        
        properties.put(ProducerConfig.RETRIES_CONFIG,"0");
        
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
        
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
        
        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"5000");

        //将消息发送到kafka server, 所以肯定需要用到序列化的 *** 作  我们这里发送的消息是string类型的,所以使用string的序列化类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        return new KafkaProducer<>(properties);

    }



    public static void main(String[] args) throws ExecutionException, InterruptedException {


        // Producer的主对象
        Producer producer = ProducerSample.createProducer();

        // Producer异步发送
        producerSend(producer);


        // Producer异步发送带回调函数
        producer = ProducerSample.createProducer(); //要重新new,因为上个方法producerSend已经将producer close了
        producerSendWithCallback(producer);

    }

    
    public static void producerSend(Producer producer){

        // 消息对象 - ProducerRecoder
        for(int i=0;i<10;i++){
            ProducerRecord record =  new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
            producer.send(record);
        }
        // 所有的通道打开都需要关闭   close方法会会将缓存队列状态置为关闭,唤醒io线程将内存中的数据发往broker,避免这个程序的进程突然挂掉,然后内存里面的消息丢失,所以这个方法结束的时候,将消息数据都发送出去
        producer.close();
    }


    
    public static void producerSendWithCallback(Producer producer){

        // 消息对象 - ProducerRecoder
        for(int i=0;i<10;i++){
            ProducerRecord record = new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
            //就是多传入一个回调实例
            
            producer.send(record, new Callback() {
                
                @Override
                public void onCompletion(Recordmetadata recordmetadata, Exception e) {
                    System.out.println(
                            "partition : "+recordmetadata.partition()+" , offset : "+recordmetadata.offset());
                }
            });
        }

        // 所有的通道打开都需要关闭  close方法会会将缓存队列状态置为关闭,唤醒io线程将内存中的数据发往broker,避免这个程序的进程突然挂掉,然后内存里面的消息丢失,所以这个方法结束的时候,将消息数据都发送出去
        producer.close();
    }

}
References:

https://coding.imooc.com/class/434.htmlhttps://www.cnblogs.com/xiguage119/p/11192042.htmlhttps://blog.csdn.net/zp17834994071/article/details/108132119https://blog.51cto.com/zero01/2495999

(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料和自己的实践,整理不易,但是难免有不足之处,如有错误,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源和分享的知识。)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716269.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存