- 1.offset四个概念:
- 2.两种 offset
- Current Offset
- Committed Offset
- 3.服务端常用命令
- 1.查看所有 topic
- 2.查看 topic partition、replica、ISR 详情
- 3.查看 consumer group 列表(新版信息保存在broker中)(老版信息保存在zookeeper中)
- 4.查看 topic 的最 latest、earliest 移量(默认 time 是 1 latest)
- 5.查看消费者组的消费进度
- 6.重置消费者组的 offset【预览】
- 7.重置消费者组的 offset【执行】,只能重置处于非活跃状态的 consumer group
- 8.消费数据
- 4.ISR
- LogStartOffset
表示一个Partition的起始位移,初始为0,虽然消息的增加以及日志清除策略的影响,这个值会阶段性的增大。 - ConsumerOffset
消费位移,表示Partition的某个消费者消费到的位移位置。 - HighWatermark
简称HW,代表消费端所能“观察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。 - LogEndOffset
简称LEO, 代表Partition的最高日志位移,其值对消费者不可见。比如在ISR(In-Sync-Replicas)副本数等于3的情况下(如下图所示),消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW必然不会一直与Leader的LEO相等,即LEO>=HW。
当我们调用 poll 方法时,Kafka 会向我们发送一些消息。假设分区中有 100 条记录。当前偏移量的初始位置为 0。我们进行了第一次调用并收到了 20 条消息。现在 Kafka 会将当前偏移量移动到 20。当我们发出下一个请求时,它会从 20 开始发送更多消息,并再次将当前偏移量向前移动。偏移量是一个简单的整数,Kafka 使用它来维护消费者的当前位置。当前偏移量是指向 Kafka 在最近轮询中已发送给消费者的最后一条记录的指针。因此,由于当前的偏移量,消费者不会两次获得相同的记录。用于避免将相同的记录再次发送给同一个消费者。
Committed Offset现在让我们来提交偏移量,这个偏移量是消费者已经确认的关于处理的位置。提交的偏移量是指向消费者已成功处理的最后一条记录的指针。用于避免在分区重新平衡的情况下将相同的记录重新发送给新的消费者。
3.服务端常用命令 1.查看所有 topicbin/kafka-topics.sh --zookeeper hadoop102:2181 --list2.查看 topic partition、replica、ISR 详情
bin/kafka-topics.sh --zookeeper hadoop102:2181 --topic topic_name --describe3.查看 consumer group 列表(新版信息保存在broker中)(老版信息保存在zookeeper中)
# 新版 bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --list # 老版 bin/kafka-consumer-groups.sh --zookeeper hadoop102:2181 --list4.查看 topic 的最 latest、earliest 移量(默认 time 是 1 latest)
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop102:9092 --topic topic_name --time -15.查看消费者组的消费进度
bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --describe --group group_id_name6.重置消费者组的 offset【预览】
(–to-current、–to-datetime ‘YYYY-MM-DDTHH:mm:SS.sss’、–to-earliest、–to-latest、–to-offset
bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic topic_name --group group_id_name --reset-offsets --to-latest7.重置消费者组的 offset【执行】,只能重置处于非活跃状态的 consumer group
bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic topic_name --group group_id_name --reset-offsets --to-latest --excute8.消费数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic topic_name --group group_id_name4.ISR
-
所有副本列表=ISR+OSR(Outof-Sync Replicas)
-
时间延迟和条数延迟都会被踢出ISR,0.10.x后只有时间延迟,扩展时新加入的也会进入OSR
-
leader写入新消息,consumer不能立刻消费,leader会等待所有ISR中的replicas同步后跟新HW,消息才可能被consumer消费,这样broker失效后从达到HW的broker中选取leader
-
新进来的消息先写到一个临时位置进行同步,ack机制确认之后,commit 改变 hw
-
ISR 保存在zookeeper:/brokers/topics/[topic]/partitions/[partition]/state,有两个地方会对zookeeper这个节点维护:
1.Controller 下的 LeaderSelector 选举新的 leader 2.Leader 有单独的线程定期检测 ISR 中的follower是否脱离 ISR,如果变化,将新的 ISR 的信息返回到 Zookeeper 的相关节点
-
kafka 集群中一个broker会被选举为 Controller,负责 partition 管理、副本状态管理、重分配partition之类的管理任务
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)