Kafka API

Kafka API,第1张

Kafka API 1 Producer API 1.1 消息发送流程 Kafka 的 Producer 发送消息采用的是 异步发送 的方式。在消息发送的过程中,涉及到了 两个线程—— main 线程和 Sender 线程 ,以及 一个线程共享变量—— RecordAccumulator 。 main 线程将消息发送给 RecordAccumulator , Sender 线程不断从 RecordAccumulator 中拉取 消息发送到 Kafka broker 。

相关参数: batch.size : 只有数据积累到 batch.size 之后, sender 才会发送数据。 linger.ms : 如果数据迟迟未达到 batch.size , sender 等待 linger.time 之后就会发送数据。 1.2 异步发送 API 1 )导入依赖 org.apache.kafka kafka-clients 0.11.0.0 2 )编写代码 需要用到的类: KafkaProducer :需要创建一个生产者对象,用来发送数据 ProducerConfig :获取所需的一系列配置参数 ProducerRecord :每条数据都要封装成一个 ProducerRecord 对象 1. 不带回调函数的 API

 

2. 带回调函数的 API 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 Recordmetadata 和 Exception ,如果 Exception 为 null ,说明消息发送成功,如果 Exception 不为 null ,说明消息发送失败。 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

 

 3 同步发送 API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack 。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可

 

 2 Consumer API

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故 不用担心数据丢失问题。 由于 consumer 在消费过程中可能会出现断电宕机等故障, consumer 恢复后,需要从故 障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset ,以便故障恢 复后继续消费。 所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。 2.1 自动提交 offset 1 )导入依赖 org.apache.kafka kafka-clients 0.11.0.0 2 )编写代码 需要用到的类: KafkaConsumer :需要创建一个消费者对象,用来消费数据 ConsumerConfig :获取所需的一系列配置参数 ConsuemrRecord :每条数据都要封装成一个 ConsumerRecord 对象 为了使我们能够专注于自己的业务逻辑, Kafka 提供了自动提交 offset 的功能。 自动提交 offset 的相关参数: enable.auto.commit : 是否开启自动提交 offset 功能 auto.commit.interval.ms : 自动提交 offset 的时间间隔 以下为自动提交 offset 的代码:

 

 2.2 手动提交 offset

虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API 。 手动提交 offset 的方法有两种:分别是 commitSync (同步提交) 和 commitAsync (异步 提交) 。两者的相同点是,都会将 本次 poll 的一批数据最高的偏移量提交 ;不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。 1 )同步提交 offset 由于同步提交 offset 有失败重试机制,故更加可靠,以下为同步提交 offset 的示例

 

 2)异步提交 offset

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞 吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。 以下为异步提交 offset 的示例:

 

 

3 ) 数据漏消费和重复消费分析 无论是同步提交还是异步提交 offset ,都有可能会造成数据的漏消费或者重复消费。先 提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset ,有可能会造成数据 的重复消费。 2.3 自定义存储 offset Kafka 0.9 版本之前, offset 存储在 zookeeper , 0.9 版本及之后,默认将 offset 存储在 Kafka 的一个内置的 topic 中。除此之外, Kafka 还可以选择自定义存储 offset 。 offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace 。 当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发 生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance 。 消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。 因此消费者要首先 获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。 要实现自定义存储 offset ,需要借助 ConsumerRebalanceListener ,以下为 示例代码 ,其 中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。

 

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5573242.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存