1)Producer :消息生产者,就是向kafka broker 发消息的客户端;
2)Consumer :消息消费者,向kafka broker 取消息的客户端;
3)Consumer Group (CG):消费者组,由多个consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
4)Broker :一台kafka 服务器就是一个broker。一个集群由多个broker 组成。一个broker可以容纳多个topic。
5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic;
6)Partition:为了实现扩展性,一个非常大的topic 可以分布到多个broker(即服务器)上,一个topic 可以分为多个partition,每个partition 是一个有序的队列;
7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition 数据不丢失,且kafka仍然能够继续工作 kafka提供了副本机制,一个 topic的每个分区都有若干个副本,一个 leader和若干个 follower。
8)leader 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
9)follower 每个分区多个副本中的“从”,实时从 leader中同步数据,保持和 leader数据的同步。 leader发生故障时,某个 follower会成为新的 follower。
pom.xml引入kafka-clients如下:
AdminClientorg.apache.kafka kafka-clients2.8.0
基本Admin *** 作如下:
package com.lwy.it; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreatePartitionsResult; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; public class KafkaAdminTest { public static AdminClient initAdminClient() { Properties properties = new Properties(); // 指定连接IP和端口号 properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.3.99:9092"); AdminClient adminClient = AdminClient.create(properties); return adminClient; } @Test @DisplayName("创建Topic") public void createTopic_test() { AdminClient adminClient = initAdminClient(); // 指定分区数量、副本数量 NewTopic newTopic = new NewTopic("test-topic", 2, (short) 1); CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic)); try { // KafkaFuture等待创建,成功则不会有任何报错 createTopicsResult.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Test @DisplayName("列举Topic列表") public void listTopic_test() { AdminClient adminClient = initAdminClient(); ListTopicsOptions listTopicsOptions = new ListTopicsOptions(); // 是否查看内部Topic listTopicsOptions.listInternal(false); ListTopicsResult listTopicsResult = adminClient.listTopics(listTopicsOptions); SetKafkaProducerset = Collections.emptySet(); try { set = listTopicsResult.names().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } set.forEach(System.out::println); } @Test @DisplayName("删除Topic") public void deleteTopic_test() { AdminClient adminClient = initAdminClient(); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("test-topic")); try { deleteTopicsResult.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Test @DisplayName("查看某个Topic详情") public void describeTopics_test() { AdminClient adminClient = initAdminClient(); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList("test-topic")); Map stringTopicDescriptionMap = Collections.emptyMap(); try { stringTopicDescriptionMap = describeTopicsResult.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } stringTopicDescriptionMap.forEach((topicName, topicDescription) -> { System.out.println("name:" + topicName + ",desc:" + topicDescription); }); } @Test @DisplayName("增加分区数量") public void createPartitions_test() { Map map = new HashMap<>(); NewPartitions newPartitions = NewPartitions.increaseTo(3); map.put("test-topic", newPartitions); AdminClient adminClient = initAdminClient(); CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(map); try { createPartitionsResult.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
生产者常见配置
官方文档:https://kafka.apache.org/documentation/#producerconfigs
#kafka地址,即broker地址 bootstrap.servers #当producer向leader发送数据时,可以通request.required.acks参数来设置数据可靠性的级别,分别是0,1,all。 acks #请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性 retries #每个分区未发送消息总字节大小,单位:字节,超过设置的值就会提交数据到服务器,默认是16kb batch.size #消息在缓冲区保留的时间,超过设置的值就会提交到服务端。默认值就是0,消息是立刻发送的,即使batch.size缓冲空间还没满,如果想减少请求的数量,可以设置linger.ms大于0。满足batch.size缓冲区填满或linger.ms时间其中一个消息就会被发送 linger.ms #buffer.memory是用来约束kafka producer能够使用的内存缓冲的大小,默认值32MB。如果buffer.memory设置的太小,可能导致消息快速的写入内容缓冲里,但是Sender线程来不及吧消息发送到Kafka服务器会造成内存缓冲很快被写满,而一旦写满,就会阻塞用户线程,不让继续往kafka写消息了。buffer.memory要大于batch.size,否则会报申请内存不足的错误,不要超过物理内存,根据实际情况调整 buffer.memory #Key的序列化器,将用户提供的key和value对象ProducerRecord进行序列化处理,key.serializer必须被设置,即使消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将key序列化成字节数组 key.serializer value.serializer
Producer核心API使用如下:
package com.lwy.it; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class KafkaProduceTest { public static Properties initProperties() { Properties properties = new Properties(); // 指定连接IP和端口号 properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.3.99:9092"); // 当producer向leader发送数据时,可以通request.required.acks参数来设置数据可靠性的级别,分别是0,1,all。 properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性 properties.put(ProducerConfig.RETRIES_CONFIG, 0); // 每个分区未发送消息总字节大小,单位:字节,超过设置的值就会提交数据到服务器,默认是16kb properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 消息在缓冲区保留的时间,超过设置的值就会提交到服务端 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 用来约束kafka producer能够使用的内存缓冲的大小,默认值32MB properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // Key的序列化器,将用户提供的key和value对象ProducerRecord进行序列化处理,key.serializer必须被设置 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // value的序列化器 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return properties; } @Test @DisplayName("生产者发送消息") public void producer_send_test() { Properties properties = initProperties(); Producerproducer = new KafkaProducer (properties); ProducerRecord stringStringProducerRecord = new ProducerRecord<>("test-topic", "key", "value"); Future future = producer.send(stringStringProducerRecord); try { // 同步阻塞 // 不关心发送结果不需写这行 Recordmetadata recordmetadata = future.get(); // Recordmetadata格式:topic + "-" + partition + "@" + offset; System.out.println("发送状态:" + recordmetadata.toString()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } // 关闭 producer.close(); } }
ProducerRecord介绍和key的作用:
发送给Kafka Broker的key/value值对,封装基础数据信息。
public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, null, key, value, null); }
key默认是null,大多数应用程序会用到key
- 如果key为空,kafka使用默认的partitioner,使用RoundRobin算法将消息均衡地分布在跟个partition上
- 如果key不为空,kafka使用自己实现的hash算法对key进行哈希处理,决定消息该被写到Topic的哪个partition上,拥有相同key的消息会被写到同一个partition,实现顺序消息
生产者发消息是异步调用,怎么知道是否有异常?
- 发送消息配置回调函数即可,该回调方法会在Producer收到ack时被调用,为异步调用
- 回调函数有两个参数,Recordmetadata和Exception,如果Exception为null,则消息发送成功,否则失败
@Test @DisplayName("生产者发送消息,并且回调") public void producer_send_callback_test() { Properties properties = initProperties(); ProducerKafkaConsumerproducer = new KafkaProducer (properties); ProducerRecord stringStringProducerRecord = new ProducerRecord<>("test-topic", "key", "value"); Future future = producer.send(stringStringProducerRecord, new Callback() { @Override public void onCompletion(Recordmetadata metadata, Exception exception) { if (Objects.isNull(exception)) { System.out.println("发送状态:" + metadata.toString()); } else { exception.printStackTrace(); } } }); // 关闭 producer.close(); }
简介:Consumer消费者机制和分区策略
Kafka的Consumer消费者机制和分区策略讲解
消费者根据什么模式从broker获取数据的?
- 消费者采用pull拉取方式,从broker的partition获取数据
为什么是pull模式,而不是broker主动push?
- pull模式则可以根据consumer的消费能力进行自己调整,不同的消费者性能不一样,如果broker没有数据,consumer可以配置timeout时间,阻塞等待一段时间之后再返回
- 如果是broker主动push,优点是可以快速处理消息,但是容易造成消费者处理不过来,消息堆积和延迟。
消费者从哪个分区进行消费?一个topic有多个partition,一个消费者组里面有多个消费者,那是怎么分配的?
- 一个主题topic可以有多个消费者,因为里面有多个partition分区(leader分区)
- 一个partition leader可以由一个消费者组中的一个消费者进行消费
一个topic有多个partition,所以有多个partition leader,给多个消费者消费,那分配策略如何?
消费者从哪个分区进行消费?——两个策略
- 顶层接口
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
- round-robin(RoundRobinAssignor非默认策略)轮询
【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的partition和所有的consumer都列出来,所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题
例如如下有7个分区,同组内2个消费者:
topic-p0 / topic-p1 / topic-p2 / topic-p3 / topic-p4 / topic-p5 / topic-p6 c-1:topic-p0 / topic-p2 / topicp4 / topic-p6 c-2:topic-p1 / topic-p3 / topicp5
弊端:如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分
区分配的不均匀
如上图:有3个消费者C0、C1和C2,他们共订阅了3个主题:topic0、topic1和topic2
topic0有1个分区(P0),topic1有两个分区(P0、P1),topic2有3个分区(P0、P1、P2)
消费者C0订阅的是主题topic0,消费者C1订阅的是主题topic0和topic1,消费者C2订阅的是主题topic0、topic1和topic2
range(RangeAssignor默认策略)范围
【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区,一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者
topic-p0 / topic-p1 / topic-p2 / topic-p3 / topic-p4 / topic-p5 / topic-p6 c-1:topic-p0 / topic-p1 / topicp2 / topic-p3 c-2:topic-p4 / topic-p5 / topicp6
弊端:
- 只是针对1个topic而言,c-1多消费一个分区影响不大
- 如果有N多个topic,那么针对每个topic,消费者C-1都将多消费1个分区,topic越多则消费的分区
也越多,则性能有所下降
简介:Consumer消费者重新分配策略和offset维护机制
什么是Rebalance *** 作?
- kafka怎么均匀地分配某个topic下的所有partition到各个消费者从而使得消息的消费速度达到最快,这就
是平衡(balance),前面讲了Range范围分区和RoundRobin轮询分区,也支持自定义分区策略。 - 而rebalance(重平衡)其实就是重新进行partition的分配,从而使得partition的分配重新达到平衡状态
假如有7个分区,2个消费者(先启动一个消费者,后续再启动一个消费者),这个会怎么分配?
Kafka会进行一次分区分配 *** 作,即Kafka消费者端的Rebalance *** 作,下面都会发生rebalance *** 作
- 当消费者组内的消费者数量发生变化(增加或者减少),就会产生重新分配patition
- 分区数量发生变化时(即topic的分区数量发生变化时)
当消费者在消费过程突然宕机了,重新恢复后是从哪里消费,会有什么问题?
消费者会记录offset,故障恢复后从这里继续消费。
这个offset记录在哪里?
记录在zk里面和本地、新版默认将offset保证在kafka的内置topic中,名称是__consumer offsets
- 该Topic默认有50个Partition,每个Partition有3个副本,分区数量由参数offset.topic.num.partition配置
- 通过groupId的哈希值和该参数取模的方式来确定某个消费者组已消费的offset保存到__consumer offsets主题的哪个分区中
- 由消费者组名+主题+分区,确定唯一的offset的key,从而获取对应的值
- 三元组:group.id+topic+分区号,而value就是offset的值
#消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息 group.id #为true则自动提交偏移量 enable.auto.commit #自动提交offset周期 auto.commit.interval.ms #重置消费偏移量策略,消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理 auto.offset.reset #序列化器 key.deserializer
package com.lwy.it; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndmetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Objects; import java.util.Properties; @Slf4j public class KafkaConsumerTest { public static Properties initProperties() { Properties properties = new Properties(); // broker地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.3.99:9092"); // 消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-3"); // 开启自动提交offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 自动提交offset延迟时间 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000"); // 默认是latest,如果需要从头开始消费partition消息,需改为earliest,且消费者组名变更才生效 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } @Test @DisplayName("消费者消费消息") public void consumer_pull_test() { Properties properties = initProperties(); Consumer理论知识consumer = new KafkaConsumer<>(properties); // 订阅主题 consumer.subscribe(Arrays.asList("test-topic")); while (true) { // 100ms阻塞超时时间 ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100)); consumerRecords.forEach((ConsumerRecord consumerRecord) -> { System.out.println("~~~~~~~~~~~~~~~~~~~~~"); log.info("Topic is {},Offset is {},Key is {},Value is {}", consumerRecord.topic(), consumerRecord.offset(), consumerRecord.key(), consumerRecord.value()); System.out.println("~~~~~~~~~~~~~~~~~~~~~"); }); // 非空才手工提交 if (!consumerRecords.isEmpty()) { // commitSync同步阻塞当前线程(自动失败重试) // consumer.commitSync(); // commitAsync异步不会阻塞当前线程,没有失败重试,回调callback函数获取提交信息,记录日志 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception exception) { if (Objects.isNull(exception)) { System.out.println("手工提交Offset成功:" + offsets.toString()); } else { System.out.println("手工提交Offset失败:" + offsets.toString()); exception.printStackTrace(); } } }); } } } }
简介:分布式应用核心CAP知识
CAP定理:指的是在一个分布式系统中,Consistency(一致性)、Availability(可用性)、Partition tolerance(分区容错性),三者不可同时获得
- 一致性(C):所有节点都可以访问到最新的数据;锁定其他节点,不一致之前不可读
- 可用性(A):每个请求都是可以得到响应的,不管请求是成功还是失败;被节点锁定后无法响应
- 分区容错性(P):除了全部整体网络故障,其他故障都不能导致整个系统不可用;节点间通信可能失败,无法避免
CAP理论就是说在分布式存储系统中,最多只能实现上面的两点。而由于当前的网络硬件肯定会出现延迟丢包等问题,所以分区容忍性是我们必须需要实现的。所以我们只能在一致性和可用性之间进行权衡。
-
CA:如果不要求P(不允许分区),则C(强一致性)和(可用性)是可以保证的。但放弃P的同时也就意味着放弃了系统的扩展性,也就是分布式节点受限,没办法部署子节点,这是违背分布式系统设计的初衷的
-
CP:如果不要求A(可用),每个请求都需要在服务器之间保持强一致,而P(分区)会导致同步时间无限延长(也就是等待数据同步完才能正常访问服务),一旦发生网络故障或者消息丢失等情况,就要牺牲用户的体验,等待所有数据全部一致了之后再让用户访问系统
-
AP:要高可用并允许分区,则需放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。
分布式系统中P肯定要满足,所以只能在CA中二选一
没有最好的选择,最好的选择是根据业务场景来进行架构设计:
- CP:适合支付、交易类,要求数据强一致性,宁可业务不可用,也不能出现脏数据
- AP:互联网业务,比如信息流架构,不要求数据强一致,更想要服务可用
简介:Kafka数据可靠性保证原理之副本机制Replica介绍
Partition什么时间发送ack确认机制(要追求高吞吐量,那么就要放弃可靠性)?
当producer向leader发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别
副本数据同步策略,ack有3个可选值:分别是0,1,all。
ack=0
- producer发送一次就不再发送了,不管是否发送成功
- 发送出去的消息还在半路,或者还没写入磁盘Partition Leader所在Broker就直接挂了,客户端认为消息发送成功了,此时就会导致这条消息就丢失
ack=1(默认)
- 只要Partition Leader接收到消息而且写入【本地磁盘】,就认为成功了,不管他其他的Follower有没有
同步过去这条消息了 - 问题点:万一Partition Leader刚刚接收到消息,Follower还没来得及同步过去,结果Leader所在的broker宕机了
ack=all(即-1)
-
producer只有收到分区内所有副本的成功写入全部落盘的通知才认为推送消息成功
-
备注:leader会维持一个与其保持同步的replica集合,该集合就是ISR,leader副本也在isr里面
-
问题一:如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。
- 数据发送到leader后,部分ISR的副本同步,leader此时挂掉。比如follower1和follower2都有可能变成新的leader,producer端会得到返回异常,producer端会重新发送数据,数据可能会重复
-
问题二:acks=all就可以代表数据一定不会丢失了吗?
- Partition只有一个副本,也就是一个Leader,任何Follower都没有接收完消息后宕机,也会导致数据丢失,acks=all,必须跟ISR列表里至少有2个以上的副本配合使用
- 在设置request.required.acks=1的同时,也要 min.insync.replicas 这个参数设定ISR中的最小副本数
是多少,默认值为1,改为>=2,如果ISR中的副本数少于 min.insync.replicas 配置的数量时,客户端会返回异常
简介:Kafka数据可靠性保证原理之ISR机制讲解
-
什么是ISR(in-sync replica set)
- leader会维持一个与其保持同步的replica集合,该集合就是ISR,每一个leader partition都有一个ISR,leader动态维护,要保证kafka不丢失message,就要保证ISR这组集合存活(至少有一个存活),并且消息 commit成功
- Partition leader保持同步的Partition Follower集合,当ISR中的Partition Follower完成数据的同步之后就会给leader发送ack
- 如果Partition follower长时间(replica.lag.time.max.ms)未向leader同步数据,则该Partition Follower将被
踢出ISR - Partition Leader发生故障之后,就会从ISR中选举新的Partition Leader。
-
OSR(out-of-sync-replicas)
- 与leader副本分区同步滞后过多的副本集合
-
AR(Assign Replicas)
- 分区中所有副本统称为AR
简介:Kafka的HighWatermark的作用
背景broker故障后
ACK保障了【生产者】的投递可靠性
partition的多副本保障了【消息存储】的可靠性
备注:重复消费问题需要消费者自己处理
HW(HighWatermark)作用:保证消费数据的一致性和副本数据的一致性
Follower故障:
Follower发生故障后会被临时踢出ISR(动态变化),待该follower恢复后,follower会读取本地的磁盘记录的上次的HW,并将该log文件高于HW的部分截取掉,从HW开始向leader进行同步,等该follower的LEO大于等于该Partition的hw,即follower追上leader后,就可以重新加入ISR
Leader故障:
Leader发生故障后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于hw的部分截掉(新leader自己不会截掉),然后从新的leader同步数据
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)