10.0.1 Topic
topic无法保证消息的有序性10.1 生产者(Producer) 10.1.1 消息不丢失
#解决producer生产的消息在收到kafka 正确保存的ACK后由于卡夫卡服务宕机导致的消息丢失 1 ACK设置为ALL可以解决副本一致性问题 2 精准一次性引入SEQ,解决已保存但未ACK此时Leader挂掉,但是生产者仍重试导致的重复问题,用SEQ可以进行过滤 3 加入事务可以保证,由于生产者宕机导致的,PID变化进而无法验证SEQ的问题10.1.1 .1 ACK
ack = 0 无需确认直接返回ACK: 网络抖动即可造成数据丢失 ack = 1 leader需确认 返回ACK leader挂掉即可造成数据丢失 ack = -1(ALL) 所有flower需确认才返回ACK ISR中仅剩leader,且leader挂掉10.1.2 消息不重复
1 生产者不重复生产 -- Exactly once + Transaction 2 消费者不重复消费 -- Redis 去重10.1.2.1 Exactly once
0.11 后开始支持幂等性 精准一致性= at lest once + 幂等性 1 设置方法 //设置幂等性 ENABLE_IDEMPOTENCE_ConFIG = "enable.idempotence"; properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true"); //设置at lest once ACKS_ConFIG = "acks"; properties.setProperty(ProducerConfig.ACKS_CONFIG,"all"); 2 实现方法 producer 根据 自动生成的PID,Partitioner,SEQ发送消息 Broker 按上述条件对消息进行去重10.1.2.2 Producer 事务
Exactly onec 由于PID自动生成,所以只能保证单会话消息不重复 使用事务新形势每次通过事务获取PID,可以保证相同事务的会话重新建立时PID保持一致从而保证消息一致性10.1.3 producer API
Properties properties = new Properties(); KafkaProducer10.1.3.1 ProducerRecordkafkaProducer = new KafkaProducer(properties); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"server:prot"); ProducerRecord producerRecord = new ProducerRecord("p1", 1, "key", "value"); kafkaProducer.send(producerRecord);
1 topic必填项 2 仅有value Partition采用轮询的策略,且第一次轮询到的Partition随机 3 仅有key,value Partition按key的哈希值确定Partition 4 有patiton 按传入的Partition来决定发送到那个Partition10.1.3.2 同步发送
//设置Partitioner为1仍有可能因重发导致消息不同步,可以使用Future的get方法阻塞worker线程从何实现和main线程同步的效果 try { kafkaProducer.send(producerRecord).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }10.2 消费者 (Consumer) 10.2.1 offset 10.2.1 HW/LEO/LSO
HW: (hight waterMark) leader和flower中最短的消息 LEO:(Log End Offset) 日志末端 LSO: (Log Stable offset) 已提交的事务末端,即consumer可以消费的10.2.2 分区分配策略
假设 topic:T1(P11,P12,P13) T2(P21,P22,P23) consumerGroup CG1(C1,C2) CG2(C3) 订阅关系 C1,C2订阅T1 C2,C3订阅T2 Range算法分区分配为 C1:P11 C2:P12,P13,P21,P22,P23 C3:P21,P22,P23 RoundRobin算法分区分配为 C1:P11, P13 ,P22(C1消费了没有订阅的T2中的P22,有逻辑问题) C2: P12, P21 ,P23 C3: P21, P22 ,P2310.2.2.1 range
默认分区分配策略 按照topic划分,没次将topic平均分到多个consumer上 缺点:可能某个consumer被分配多个Partition10.2.2.2 RoundRobin
按consumer划分,将consumer订阅的topic汇总后再平均分到consumer上 缺点:可能会分配到没有订阅的topic,产生逻辑问题10.2.2 Lag
不使用事务时为Lag等于HW-ConsumerOffset
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rFiwn5Ey-1641611310193)(…image20190621140605172.png)]
使用事务时,Lag等于LSO-ComsumerOffset
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XZv9gQYX-1641611310193)(…image20190621140742729.png)]
10.2.3 API 10.2.3.1 从头消费auto_offset_config 默认latest可以设置为earliest 可以从头消费的条件 1)相同消费组第一次(需要新建消费组) 2)offset存在 (segment不会因为过期而删除) Properties properties = new Properties(); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupid"); KafkaConsumer10.2.3.2 手动提交stringStringKafkaConsumer = new KafkaConsumer (properties);
重复消费:消费未提交导致的宕机会出现重复消费问题自动手动均无法保证,需自定义提交策略 消息丢失:提交了offset但未消费会产生消息丢失问题,手动提交可以保证确实消费了才提交 异步提交不阻塞main线程 stringStringKafkaConsumer.commitAsync(); 同步提交阻塞main线程 stringStringKafkaConsumer.commitSync();10.2.3.2 自定义提交
//保证消息不重复实现消费的幂等性和精准一次性语义基本思路就是将offset的提交和消费放在一个事务中 //如果消费需要使用MySQL则将offset也置于MySQL中的同一事务中即可保证不重复消费 //offset存储在MySQL例子 https://blog.csdn.net/qq_26838315/article/details/106882637 //1 自己实现rebalance consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { // rebalance之前将记录进行保存 @Override public void onPartitionsRevoked(Collection10.3 Partitionpartitions) { for (TopicPartition partition : partitions) { // 获取分区 int sub_topic_partition_id = partition.partition(); // 对应分区的偏移量 long sub_topic_partition_offset = consumer.position(partition); String date = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss").format( new Date( new Long( System.currentTimeMillis() ) ) ); DBUtils.update("replace into offset values(?,?,?,?,?)", new Offset( group, topic, sub_topic_partition_id, sub_topic_partition_offset, date ) ); } } // rebalance之后读取之前的消费记录,继续消费 @Override public void onPartitionsAssigned(Collection partitions) { for (TopicPartition partition : partitions) { int sub_topic_partition_id = partition.partition(); long offset = DBUtils.queryOffset( "select sub_topic_partition_offset from offset where consumer_group=? and sub_topic=? and sub_topic_partition_id=?", group, topic, sub_topic_partition_id ); System.out.println("partition = " + partition + "offset = " + offset); // 定位到最近提交的offset位置继续消费 consumer.seek(partition, offset); } } }); //2 自己管理offset while (true) { ConsumerRecords records = consumer.poll(100); List offsets = new ArrayList<>(); for (ConsumerRecord record : records) { String date = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss").format( new Date( new Long( System.currentTimeMillis() ) ) ); offsets.add(new Offset(group, topic, record.partition(), record.offset(), date)); System.out.println("|---------------------------------------------------------------n" + "|groupttopictpartitiontoffsetttimestampn" + "|" + group + "t" + topic + "t" + record.partition() + "t" + record.offset() + "t" + record.timestamp() + "n" + "|---------------------------------------------------------------" ); } for (Offset offset : offsets) { DBUtils.update("replace into offset values(?,?,?,?,?)", offset); } offsets.clear(); }
一个topic可以分为多个Partition,实现高并发, 一个Partition只能被一个consumer组中的一个consummer消费 Partition中消息是有序的10.3.1 replication
广义副本的特点: 1 提高高可用性 2 提高读取速度 3 节点可以读取物理位置近的节点数据 kafka只有高可用性 一个Partitioner可以设置多个replication实现高可用 Partitioner中leader 节点宕机后一致性问题参见10.0.2.2 HW副本一致性10.3.2 HW副本一致性
waterMark:广义为已处理问题和未处理问题的时间分割线,水位很低系统可能被洪峰击垮 HW(hight waterMark) 保证Partitioner副本一致性 1 leader转移,后所有offset的LEO均截取到HW 2 所有follower均向Leader同步数据10.3.3 ISR/OSR
ISR :in-sync replication OSR : out-sync replication 成为ISR的条件为同步时间不少于10秒, 可以在replace.lag.time.max.ms中配置, 默认为10 超过10秒消息仍不一致提出ISR队列 只要追赶到HW则可重新进入ISR leader宕机后根据zk的通知功能可以实时感知,会从ISR依次选举新的leader10.3.4 AR
Assigned Repllicas 分区中所有副本被称为AR10.3.5 Prefer replication
优先副本 Topic:test1 PartitionCount:3 ReplicationFactor:2 Configs: Topic: test1 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: test1 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test1 Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3 AR中第一个即为PR,一般PR即为leader,kafka会保证PR分配均衡,也就保证leader分配均衡 auto.leader.rebalance.enable ,此参数的默认值为 true,会自动进行平衡 broker 中的不平衡率=非优先副本的 leader 数/分区总数,当大于设定值进行rebalance kafka-perferred-replica election.sh 也可以进行rebalance 由于leader均匀分配并无法保证集群负载均衡,(还有考虑Partitioner是否均衡,以及Partitioner的消费度问题),所以 auto.leader.rebalance.enable常设置为false10.3.6 Unclean Leader election
当ISR是空时,进行leader选举 unclean.leader.election.enable 默认值从11版本由true改为false CAP – Consistency, Availability, Partition Tolerance C unclean.leader.election.enable设置为false时保证一致性,但无leader被选举 A无法保证 A unclean.leader.election.enable设置为true会在OSR中寻找leader,无法保证数据一致性,会丢数据 P 分区容错性,分区挂掉kafka 仍能提供服务10.3.7 自定义分区器 10.3.7.1 实现partitioner接口
public class MyPatition implements Partitioner { public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { return 0; } public void close() { } public void configure(Map10.3.7.2 修改配置文件map) { } }
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,"MyPatition");10.3.7.3 随机分配
public int partition(String topic, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { List10.3.7.4 按地理信息分区partitionInfos = cluster.availablePartitionsForTopic(topic); return ThreadLocalRandom.current().nextInt(partitionInfos.size()); }
List10.4 ZK 10.3.1 controller选举partitionInfos = cluster.availablePartitionsForTopic(topic); for (PartitionInfo p : partitionInfos) { return ---p.leader().host(); }
首次注册者为kafka controller ls /brokers/topics/cidbitb01.bean_service/partitions/1/state[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-75I0uY7O-1641611310194)(…image3149801-0d2ed2bd8b7bec25.png)] 10.5 kafka 拦截器
#客户段拦截器 #服务端拦截器11 问题集合 11.1 ISR,OSR,AR是什么
ISR(In-sync Replication) OSR (out-sync replication) AR (Assigned Repllication) 分区中所有副本被称为AR11.2 HW ,LEO 代表什么
HW 高水位:consumer能见到的最大的offset,即所有副本和leader中最小的offset LEO last end offset leader 重新选举后会截取到HW然后同步到副本中11.3 如何体现消费顺序性
https://www.jianshu.com/p/667ab9cd05d8 partition 内有序 通过配置 max.in.flight.requests.per.connection = 1 这个配置是 生产者 往 服务度 发送数据的请求数, 配置为1,则一次只能发送1个请求,11.4 分区器,序列化器,拦截器
均为生产者使用 顺序 拦截器,序列化,分区器 拦截器负责拦截message 序列化器进行序列化 分区器确定分区 确定分区后放入RecordAccumulator中缓存后按批次发送到kafka
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ye8nN1Rc-1641611310194)(…image20210105165047156.png)]
11.5 生产者客户端线程数生产者线程和worker线程两个11.6 消费这组个数多余topic分区数
则有消费者无法消费11.7 消费者提交的offset是最新的消费的offset还是offset+1
offset+111.8 消费者重复消费的情况
保证消息不重复实现消费的幂等性和精准一次性语义基本思路就是将offset的提交和消费放在一个事务中 消费了消息,没提交offset服务宕机11.9 消费者丢消息的情况
提交了offset,没消费服务宕机11.10 创建或删除一个topic卡夫卡做了什么
#1 /brokers/topics下创建topic ls /brokers/topics/ /brokers/topics [MCAST-mst_accept, csv, MCAST-mst_conversion, mst_control, cidbitf01.mqtt, cidbitf03.csv, cidbitb01.oas, mst, MCAST-rest_service, MCAST-mqtt, MCAST-csv, mst_conversion, MCAST-mst_thing_data_store, cidbitb01.bean_service, cidbitb02.rest_service, mqtt, MCAST-mqttudp, MCAST-mst, cidbitf01.coap, cidbitb02.oas, MCAST-event_rule, cidbitf02.mqttudp, MCAST-mst_control, event_rule, cidbitb02.bean_service, cidbitb02.event_rule, coap, cidbitf04.csv, cidbitb02.mst_control, cidbitb01.event_rule, MCAST-oag, test1, cidbitf04.oag, cidbitb01.rest_service, cidbitb02.mst_accept, oag, MCAST-bean_service, cidbitb01.mst_control, cidbitb02.mst_thing_data_store, cidbitb02.mst_conversion, oas, cidbitb02.mst, cidbitf01.mqttudp, rest_service, bean_service, MCAST-coap, mqttudp, cidbitb01.mst_conversion, MCAST-oas, cidbitf02.coap, mst_thing_data_store, mst_accept, cidbitb01.mst_thing_data_store, cidbitb01.mst_accept, cidbitb01.mst, __consumer_offsets, cidbitf03.oag, cidbitf02.mqtt] #2 触发controller的监听程序 #3 controller负责topic的创建并更新meta cache11.11 Topic 的分区数可否增加减少
#1 不可以减少因为数据原因 #2 可以增加,增加后会对消费者进行在分配11.12 分区分配概念
分区如何分配给消费者 1 range 默认分区分配策略 按照topic划分,没次将topic平均分到多个consumer上 缺点:可能某个consumer被分配多个Partition 2 轮询 按consumer划分,将consumer订阅的topic汇总后再平均分到consumer上 缺点:可能会分配到没有订阅的topic,产生逻辑问题 3 自定义分配策略 实现partitioner接口 properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,"MyPatition"); public int partition(String topic, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { List11.13 日志目录结构 11.13.1 indexpartitionInfos = cluster.availablePartitionsForTopic(topic); return ThreadLocalRandom.current().nextInt(partitionInfos.size()); }
00000000000469876095.index 索引文件,用于查找log中消息位置 记录offset和position xxxx@xxxx:~/apps/xxx/etc/kafka/bin$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files ../data/event_rule-1/00000000000065469981.index Dumping ../data/event_rule-1/00000000000065469981.index offset: 65469982 position: 4491 offset: 65469985 position: 10432 offset: 65469986 position: 14923 offset: 65469989 position: 20278 offset: 65469991 position: 25201 offset: 65469994 position: 30342 offset: 65469995 position: 34833 offset: 65469996 position: 39142 offset: 65469998 position: 44065 offset: 65470000 position: 48988 offset: 65470005 position: 57368 offset: 65470007 position: 61634 offset: 65470008 position: 66125 offset: 65470013 position: 73334 offset: 65470014 position: 77825 offset: 65470016 position: 82565 offset: 65470018 position: 87311 offset: 65470021 position: 92666 offset: 65470022 position: 96980 offset: 65470023 position: 101471 offset: 65470025 position: 106980 offset: 65470026 position: 11147111.13.2 log
00000000000471305559.log 存储消息及offset xxxx@xxxx:~/apps/xxxxxx/etc/kafka/bin$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files ../data/event_rule-1/00000000000065469981.log Dumping ../data/event_rule-1/00000000000065469981.log Starting offset: 65469981 baseOffset: 65469981 lastOffset: 65469981 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 7 isTransactional: false position: 0 CreateTime: 1640180466098 isvalid: true size: 4491 magic: 2 compresscodec: NONE crc: 2046061443 baseOffset: 65469982 lastOffset: 65469982 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 7 isTransactional: false position: 4491 CreateTime: 1640182278418 isvalid: true size: 1018 magic: 2 compresscodec: NONE crc: 372938539011.13.3 timeindex
00000000000472735023.timeindex 创建原因 1 分区副本重新分配log日志文件的时间会变化无法删除 2 日志按时间切分 3 流式编程 引入时间戳可以进行 1 日志切分 2 日志清楚 3 timeindex文件用于时间定为 首先由timeindex定为index文件,在由index文件对位log文件的具体位置 https://www.cnblogs.com/huxi2b/p/6050778.html xxxx@xxxxx:~/apps/xxxxx/etc/kafka/bin$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files ../data/event_rule-1/00000000000065469981.timeindex Dumping ../data/event_rule-1/00000000000065469981.timeindex timestamp: 1640182278418 offset: 65469982 timestamp: 1640192474686 offset: 65469985 timestamp: 1640194286163 offset: 65469986 timestamp: 1640204532983 offset: 65469989 timestamp: 1640210519040 offset: 65469991 timestamp: 1640218281636 offset: 65469994 timestamp: 1640220931717 offset: 65469995 timestamp: 1640223713665 offset: 65469996 timestamp: 1640230281809 offset: 65469998 timestamp: 1640234469226 offset: 65470000 timestamp: 1640248933032 offset: 65470005 timestamp: 1640258454733 offset: 65470007 timestamp: 1640260266148 offset: 65470008 timestamp: 1640276462401 offset: 65470013 timestamp: 1640278933068 offset: 65470014 timestamp: 1640286651284 offset: 65470016 timestamp: 1640292650308 offset: 65470018 timestamp: 1640298963495 offset: 65470021 timestamp: 1640304080708 offset: 65470022 timestamp: 1640304081328 offset: 65470023 timestamp: 1640312457173 offset: 65470025 timestamp: 1640314933107 offset: 65470026 Found timestamp mismatch in :/pkg/apps/insator-iot/etc/kafka_2.11-1.0.0/bin/../data/event_rule-1/00000000000065469981.timeindex Index timestamp: 0, log timestamp: 1640180466098 Found out of order timestamp in :/pkg/apps/insator-iot/etc/kafka_2.11-1.0.0/bin/../data/event_rule-1/00000000000065469981.timeindex Index timestamp: 0, Previously indexed timestamp: 164031493310711.13.4 snapshot
https://www.cnblogs.com/once/p/12963678.html 停机消费快照内容为空,记录了producer的事务信息 kafka 停机 xxxxx@xxxxxx:~/apps/insator-iot/etc/kafka/data/mst-2$ ll total 164 drwxr-xr-x 2 xxxxxxxxxx 4096 Dec 24 13:30 ./ drwxr-xr-x 198 xxxxxxxxxx 12288 Dec 24 13:30 ../ -rw-r--r-- 1 xxxxxxxxxx 232 Dec 24 13:30 00000000000036970224.index -rw-r--r-- 1 xxxxxxxxxx 124964 Dec 24 13:25 00000000000036970224.log -rw-r--r-- 1 xxxxxxxxxx 372 Dec 24 13:30 00000000000036970224.timeindex -rw-r--r-- 1 xxxxxxxxxx 10 Dec 24 13:03 00000000000036970641.snapshot -rw-r--r-- 1 xxxxxxxxxx 10 Dec 24 13:30 00000000000036970643.snapshot -rw-r--r-- 1 xxxxxxxxxx 26 Dec 24 13:12 leader-epoch-checkpoint xx@xxx:~/apps/xxxx/etc/kafka/data/mst-2$ cat ./00000000000036970641.snapshot xxx@xx:~/apps/xxxx/etc/kafka/data/mst-2$ cat ./00000000000036970643.snapshot xxx@xxx:~/apps/xxxx/etc/kafka/data/mst-2$ kafka 开机后11.14 offset的定为方法
1 有timeindex 先从timeindex查找到 index 2 然后从index查找最近的offset对应的position, 3 log中进行二分查找到给定的offset11.15 选举
1 controller选举,先抢占资源的选为controller 2 replication的leader选举 1)leader宕机后根据zk的通知功能可以实时感知,会从ISR依次选举新的leader 2)当ISR是空时,进行leader选举 如果 unclean.leader.election.enable (默认值从11版本由true改为false) 为true会从OSR中选举leader CAP – Consistency, Availability, Partition Tolerance C unclean.leader.election.enable设置为false时保证一致性,但无leader被选举 A无法保证 A unclean.leader.election.enable设置为true会在OSR中寻找leader,无法保证数据一致性,会丢数据 P 分区容错性,分区挂掉kafka 仍能提供服务11.16 失效副本
成为ISR的条件为同步时间不少于10秒, 可以在replace.lag.time.max.ms中配置, 默认为10 超过10秒消息仍不一致提出ISR队列 只要追赶到HW则可重新进入ISR 老版本会有条数限制11.17 kafka高效原因
1 partition 分布式 2 log的存储策略,总体来说是顺序读,index-> log log二分查找 3 零拷贝
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)