2. 带回调函数的 API 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 Recordmetadata 和 Exception ,如果 Exception 为 null ,说明消息发送成功,如果 Exception 不为 null ,说明消息发送失败。 注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
3 同步发送 API
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack 。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可
2 Consumer API
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故 不用担心数据丢失问题。 由于 consumer 在消费过程中可能会出现断电宕机等故障, consumer 恢复后,需要从故 障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset ,以便故障恢 复后继续消费。 所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。 2.1 自动提交 offset 1 )导入依赖
2.2 手动提交 offset
虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API 。 手动提交 offset 的方法有两种:分别是 commitSync (同步提交) 和 commitAsync (异步 提交) 。两者的相同点是,都会将 本次 poll 的一批数据最高的偏移量提交 ;不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。 1 )同步提交 offset 由于同步提交 offset 有失败重试机制,故更加可靠,以下为同步提交 offset 的示例
2)异步提交 offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞 吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。 以下为异步提交 offset 的示例:
3 ) 数据漏消费和重复消费分析 无论是同步提交还是异步提交 offset ,都有可能会造成数据的漏消费或者重复消费。先 提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset ,有可能会造成数据 的重复消费。 2.3 自定义存储 offset Kafka 0.9 版本之前, offset 存储在 zookeeper , 0.9 版本及之后,默认将 offset 存储在 Kafka 的一个内置的 topic 中。除此之外, Kafka 还可以选择自定义存储 offset 。 offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace 。 当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发 生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance 。 消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。 因此消费者要首先 获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。 要实现自定义存储 offset ,需要借助 ConsumerRebalanceListener ,以下为 示例代码 ,其 中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)