Kafka 生产者与消费者

Kafka 生产者与消费者,第1张

Kafka 生产者与消费者 1. 生产者 1.1 分区策略

分区的原因:

  1. 方便在集群中扩展:每个 Topic 可动态的调整 Partition 的数量以适应整个集群
  2. 提高并发:能够以 Partition 为单位进行读写

分区的原则:

命令行中新建 Topic 时,可设置分区的数量

在 API 中,使用 Producer 发送的数据被封装成一个 ProducerRecord 对象。

  • 指明 Partition 的而情况下,将指明的值作为 Partition 值;
  • 未指明 Partition,但存在 key 的情况下,将 key 的 hash 值与当前 Topic 的分区总数取模,得到 Partition 值;
  • 未指明 Partition,也不存在 key 的情况下,第一次调用时随机生成一个整数(后面调用时,在次基础上自增),将该值与 Topic 的分区总数取模,得到 Partition 值(也就是 round-robin 算法)。
1.2 数据可靠性保证

为保证 Producer 发送的数据能可靠的发送到指定的 Topic,Topic 的每个 Partition 收到 Producer 发送的数据后,都要向 Producer 返回一个 ack(acknowledgement);如果 Producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

此时,再副本需要同步的情况下,何时发送 ack 成为了关键!

1.2.1 副本同步策略 方案优点缺点半数以上完成同步,发送 ack低延迟选举新的 leader 时,如果容忍 n 台节点的故障,需要 2n+1 个副本全部完成同步,发送 ack选举新的 leader 时,如果容忍 n 台节点的故障,需要 n+1 个副本高延迟

Kafka 为第二种方案!

1.2.2 ISR

如果采用如上的第二种方案,leader 需要等到所有 follower 完成同步,如果有 follower 因故障无法完成同步,leader 就需要等待其同步完成。所以就出现了 ISR(in-sync replica)!

Leader 维护了一个动态的 ISR 集合,其中存着与 Leader 保持同步的 Follower。当 ISR 中的所有 Follwer 完成数据的同步后,Leader 就会返回 ack。

如果有 Follower 长时间未向 Leader 同步数据,就会被移出 ISR,此时间域值由 replia.lag.max.ms 参数设定。

Leader 挂了之后,就会从 ISR 中选举新的 Leader。

1.2.3 ack

如果严格遵循第二种方案,那么在数据不太重要的情况下,就尝试费时费力的事情。此时就没必要等待 ISR 中所有的 Follower 完成同步,就应该进行下面的 *** 作。为此, Kafka 提供了三种可靠性级别,用户可根据实际情况进行调整。

ack 参数:

  • 设为 0:Producer 不等待 Broker 的 ack。(当 broker 故障时,会出现数据丢失)
  • 设为 1:Producer 等待 Broker 的 ack。Partition 在Leader 落盘成功后返回 ack。(当 Follower 同步完成之前,Leader 就故障,会出现数据丢失)
  • 设为 -1 (all ):Producer 等待 Broker 的 ack。Partition 的 Leader 和全部 Follower 落盘成功后才发返回 ack。(如果在 Follower 同步完成后,Leader 在返回 ack 之前就故障,会出现数据重复)
1.2.4 故障处理

LEO(Log End Offset):每个副本最大的 offset;

HW(High Watermark,高水位):消费者能见到的最大的 offset,ISR 中最小的 LEO

Follower 故障:

Follower 发生故障后,经过一定时间会被提出 ISR,待该Follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 Leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 Follower 追上了 Leader 之后,就可以重新加到 ISR 中。

Leader 故障:

Leader 发生故障后,会从 ISR 中选出一个新的 Leader,之后,为保证多个副本之间的数据一致性,其余的 Follower 会先将各自的 log 文件高于 HW 的部分截取掉,然后从新的 Leader 同步数据。

只能保证副本之间的数据一致性,不能保证数据不丢失和不重复!

1.2.5 Exactly once 语义

ACK 的级别设置为 1 时,可以保证 Producer 到 Server 之间不会丢失数据(At Least Once)。

ACK 的级别设置为 -1 时,可以保证每条消息只会被发送一次(At Most Once)。

At Least Once:保证数据不丢失,不能保证数据不重复!

At Most Once:保证数据不重复,不能保证数据不丢失!

如果对与特定的使用场景,比如交易数据,需要同时做到数据不丢失和数据不重复,即 Exactly once 语义。

在 0.11 版本后,Kafka 引入了 幂等性。启用幂等性,可设置 Producer 参数 enable.idmpotence 设置为 true。

幂等性: 即要求 Producer 不论向 Server发送多少次重复的数据,Server 只持久化一次。

开启幂等性的 Producer 在初始化时会分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number,Broker 端会对 做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。

PID会虽重启时变化,不同的 Partition 也具有不同的主键,所以幂等无法保证保证跨分区跨会话的 Exactly Once。

2. 消费者 2.1 消费方式

两种消费方式:

  • Broker push(推):消息发送的速率由 Broker 决定,容易造成 Consumer 来不及处理消息(即拒绝服务以及网络拥堵)
  • Consumer pull(拉):根据消费者能力以适当的速率消费数据

pull 模式中,如果 Kafka 没有数据,Consumer 会陷入循环中,且返回空数据。对此,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,Consumer 等待 timeout 时间后再返回

2.2 offset 的维护

由于 Consumer 在消费过程中可能会出现断电宕机等故障,Consumer 恢复后,需要从故 障前的位置的继续消费,所以 Consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

1️⃣ 修改配置文件 consumer.properties

exclude.internal.topic=false

2️⃣ 读取 offset

bin/kafka-console-consumer.sh zookeeper ---formatter
"kafka.coordinator.GroupmetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
--topic __consumer_offsets node1:2181 --consumer.config config/consumer.properties ---formatter
"kafka.coordinator.group.GroupmetadataManager$OffsetsMessageForm atter"
--from-
beginning
2.3 Rebalance 机制

消费组中的消费者没有指明分区来消费,且当消费组中的消费者和分区的关系发生变化时,就会触发 Rebalance 机制。

分区分配策略:

  • Round-Robin
  • Range:根据公式计算得到每个消费者消费哪几个分区

sticky:粘合策略,如果需要 rebalance,会在之前已分配谁为基础上调整,不会改变之前的分配情况。如果这个策略没有开,那么就会进行全部的重新分配!

2.4 分区与消费组的关系

一个 Partition 只能被一个消费组的一个消费者消费,目的是为了保证消费的顺序性,但多个 Partition 的多个消费者消费的总的顺序的得不到保障。

Partition 的数量决定了消费组中消费者的数量。

如果消费者挂了,会出发 Rebalance 机制,会让其他的消费者来消费该分区的数据。

 


❤️ END ❤️

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5686646.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存