kafka消息在分区中是按序一条一条存储的,假如分区中有10条消息,位移就是0-9,
consumer消费了5条消息,那么offset就是5,指向了下一条要消费的记录,consumer
需要向kafka汇报自己的位移数据,因为consumer是能够消费多个分区的,所以offset
的粒度是分区,consumer需要为分配给他的各分区分别提交offset信息。
从用户的角度来说,位移提交分为自动提交和手动提交,在consumer的角度来说,位移
分为同步提交和异步提交。
kafka内部有个topic叫 ‘__consumer_offsets’,offset提交就是往这个topic发送一条消息,
消息格式是key value形式,key是由 groupId、主题名、分区号组成,消息体是位移值
及用户自定义数据和时间戳等。还有2种特殊的格式,一种是用于保存 Consumer Group
信息的消息,用于注册group,另一种是 用于删除 Group 过期位移和删除 Group 的消息。
当kafka集群种第一台consumer启动时,便会创建__consumer_offsets主题,默认50个
分区和3个副本。
当提交方式是自动提交时,就算是当前consumer的offset已经不更新,kafka还是会自动
定期的往__consumer_offsets发送位移消息,所以得对位移主题的消息做定期删除,
假如对于同一个key有2条A和B,A早于B发送,那么A就是属于过期消息。
compact有点类似jvm gc的标记-整理,把过期消息删掉,把剩下的消息排列在一起
Kafka 提供了专门的后台线程定期地巡检待Compact 的主题,看看是否存在满足条件的
可删除数据,这个线程叫Log Cleaner,当我们发现位移主题日志过多的时候,可以
检查一下是否是这个线程挂了导致的
enableautocommit 默认即是true,
autocommitintervalms 默认是5秒,表示kafka每5秒自动提交一次位移信息。
自动提交会有消息重复消费的问题,因为他是间隔时间提交一次,假如在间隔期间,
发生了Rebalance ,在Rebalance 之后所有的消费者必须从当前最新的offset开始
继续消费,那么上一次自动提交到Rebalance 的这段时间消费的数据的位移并没有
提交,所以会重复消费,即时我们通过减少 autocommitintervalms 的值来提高提交频率,
那也仅仅是缩小了重复消费的时间窗口,所以我们看看能不能通过手动提交来避免重复消费。
commitSync()是consumer的同步api,手动提交的好处自然是我们可以控制提交的时机
和频率,由于是同步api,是会阻塞至broker返回结果才会结束这个阻塞状态,对于系统
而言,自然不想发生这种不是由于资源的限制导致的阻塞。
commitAsync()是consumer的异步api,commitAsync()不会阻塞,因此不会影响consumer的
tps,但是他的问题在于他无法重试,因为是异步提交,当因为网络或者系统资源阻塞
导致提交失败,那么他重试的时候,在这期间,consumer可能已经消费好多条消息
并且提交了,所以此时的重试提交的offset已然不是最新值了并没有意义,我们可以通过
异步和同步提交相结合,我们使用同步提交来规避因为网络问题或者broker端的gc导致的
这种瞬时的提交失败,进而通过重试机制而提交offset,使用异步提交来规避提交时的阻塞
前面的commitSync()和commitAsync(),都是consumer poll消息,把这些消息消费完,
再提交最新的offset,如果poll的消息很多呢?消费时间较长,假如中间系统宕机,岂不是
得从头再来一遍,所以kafka提供分段提交的api
commitSync(Map<TopicPartition, OffsetAndMetadata>)
commitAsync(Map<TopicPartition, OffsetAndMetadata>)
假设我们poll了一秒钟的数据,有5000条,我们可以通过计数器,累计到100条,
便通过分段提交api向kafka提交一次offset。
作为一款典型的消息中间件产品,kafka系统仍然由producer、broker、consumer三部分组成。kafka涉及的几个常用概念和组件简单介绍如下:
当consumer group的状态发生变化(如有consumer故障、增减consumer成员等)或consumer group消费的topic状态发生变化(如增加了partition,消费的topic发生变化),kafka集群会自动调整和重新分配consumer消费的partition,这个过程就叫做rebalance(再平衡)。
__consumer_offsets是kafka集群自己维护的一个特殊的topic,它里面存储的是每个consumer group已经消费了每个topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id组成,格式为 {topic name}-${partition id},value值就是consumer提交的已消费的topic partition offset值。__consumer_offsets的分区数和副本数分别由offsetstopicnumpartitions(默认值为50)和offsetstopicreplicationfactor(默认值为1)参数配置。我们通过公式 hash(group id) % offsetstopicnumpartitions 就可以计算出指定consumer group的已提交offset存储的partition。由于consumer group提交的offset消息只有最后一条消息有意义,所以__consumer_offsets是一个compact topic,kafka集群会周期性的对__consumer_offsets执行compact *** 作,只保留最新的一次提交offset。
group coordinator运行在kafka某个broker上,负责consumer group内所有的consumer成员管理、所有的消费的topic的partition的消费关系分配、offset管理、触发rebalance等功能。group coordinator管理partition分配时,会指定consumer group内某个consumer作为group leader执行具体的partition分配任务。存储某个consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是该consumer group的协调器运行的broker。
跟大多数分布式系统一样,集群有一个master角色管理整个集群,协调集群中各个成员的行为。kafka集群中的controller就相当于其它分布式系统的master,用来负责集群topic的分区分配,分区leader选举以及维护集群的所有partition的ISR等集群协调功能。集群中哪个borker是controller也是通过一致性协议选举产生的,28版本之前通过zookeeper进行选主,28版本后通过kafka raft协议进行选举。如果controller崩溃,集群会重新选举一个broker作为新的controller,并增加controller epoch值(相当于zookeeper ZAB协议的epoch,raft协议的term值)
当kafka集群新建了topic或为一个topic新增了partition,controller需要为这些新增加的partition分配到具体的broker上,并把分配结果记录下来,供producer和consumer查询获取。
因为只有partition的leader副本才会处理producer和consumer的读写请求,而partition的其他follower副本需要从相应的leader副本同步消息,为了尽量保证集群中所有broker的负载是均衡的,controller在进行集群全局partition副本分配时需要使partition的分布情况是如下这样的:
在默认情况下,kafka采用轮询(round-robin)的方式分配partition副本。由于partition leader副本承担的流量比follower副本大,kafka会先分配所有topic的partition leader副本,使所有partition leader副本全局尽量平衡,然后再分配各个partition的follower副本。partition第一个follower副本的位置是相应leader副本的下一个可用broker,后面的副本位置依此类推。
举例来说,假设我们有两个topic,每个topic有两个partition,每个partition有两个副本,这些副本分别标记为1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(编码格式为topic-partition-replia,编号均从1开始,第一个replica是leader replica,其他的是follower replica)。共有四个broker,编号是1-4。我们先对broker按broker id进行排序,然后分配leader副本,最后分配foller副本。
1)没有配置brokerrack的情况
现将副本1-1-1分配到broker 1,然后1-2-1分配到broker 2,依此类推,2-2-1会分配到broker 4。partition 1-1的leader副本分配在broker 1上,那么下一个可用节点是broker 2,所以将副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那么下一个可用节点是broker 3,所以将副本1-1-2分配到broker 3上。依此类推分配其他的副本分片。最后分配的结果如下图所示:
2)配置了brokerrack的情况
假设配置了两个rack,broker 1和broker 2属于Rack 1,broker 3和broker 4属于Rack 2。我们对rack和rack内的broker分别排序。然后先将副本1-1-1分配到Rack 1的broker 1,然后将副本1-2-1分配到下一个Rack的第一个broker,即Rack 2的broker 3。其他的parttition leader副本依此类推。然后分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一个可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此类推。最后分配的结果如下图所示:
kafka除了按照集群情况自动分配副本,也提供了reassign工具人工分配和迁移副本到指定broker,这样用户可以根据集群实际的状态和各partition的流量情况分配副本
kafka集群controller的一项功能是在partition的副本中选择一个副本作为leader副本。在topic的partition创建时,controller首先分配的副本就是leader副本,这个副本又叫做preference leader副本。
当leader副本所在broker失效时(宕机或网络分区等),controller需要为在该broker上的有leader副本的所有partition重新选择一个leader,选择方法就是在该partition的ISR中选择第一个副本作为新的leader副本。但是,如果ISR成员只有一个,就是失效的leader自身,其余的副本都落后于leader怎么办?kafka提供了一个uncleanleaderelection配置参数,它的默认值为true。当uncleanleaderelection值为true时,controller还是会在非ISR副本中选择一个作为leader,但是这时候使用者需要承担数据丢失和数据不一致的风险。当uncleanleaderelection值为false时,则不会选择新的leader,该partition处于不可用状态,只能恢复失效的leader使partition重新变为可用。
当preference leader失效后,controller重新选择一个新的leader,但是preference leader又恢复了,而且同步上了新的leader,是ISR的成员,这时候preference leader仍然会成为实际的leader,原先的新leader变为follower。因为在partition leader初始分配时,使按照集群副本均衡规则进行分配的,这样做可以让集群尽量保持平衡。
为了保证topic的高可用,topic的partition往往有多个副本,所有的follower副本像普通的consumer一样不断地从相应的leader副本pull消息。每个partition的leader副本会维护一个ISR列表存储到集群信息库里,follower副本成为ISR成员或者说与leader是同步的,需要满足以下条件:
1)follower副本处于活跃状态,与zookeeper(28之前版本)或kafka raft master之间的心跳正常
2)follower副本最近replicalagtimemaxms(默认是10秒)时间内从leader同步过最新消息。需要注意的是,一定要拉取到最新消息,如果最近replicalagtimemaxms时间内拉取过消息,但不是最新的,比如落后follower在追赶leader过程中,也不会成为ISR。
follower在同步leader过程中,follower和leader都会维护几个参数,来表示他们之间的同步情况。leader和follower都会为自己的消息队列维护LEO(Last End Offset)和HW(High Watermark)。leader还会为每一个follower维护一个LEO。LEO表示leader或follower队列写入的最后一条消息的offset。HW表示的offset对应的消息写入了所有的ISR。当leader发现所有follower的LEO的最小值大于HW时,则会增加HW值到这个最小值LEO。follower拉取leader的消息时,同时能获取到leader维护的HW值,如果follower发现自己维护的HW值小于leader发送过来的HW值,也会增加本地的HW值到leader的HW值。这样我们可以得到一个不等式: follower HW <= leader HW <= follower LEO <= leader LEO 。HW对应的log又叫做committed log,consumer消费partititon的消息时,只能消费到offset值小于或等于HW值的消息的,由于这个原因,kafka系统又称为分布式committed log消息系统。
kafka的消息内容存储在logdirs参数配置的目录下。kafka每个partition的数据存放在本地磁盘logdirs目录下的一个单独的目录下,目录命名规范为 ${topicName}-${partitionId} ,每个partition由多个LogSegment组成,每个LogSegment由一个数据文件(命名规范为: {baseOffset}index)和一个时间戳索引文件(命名规范为:${baseOffset}timeindex)组成,文件名的baseOffset就是相应LogSegment中第一条消息的offset。index文件存储的是消息的offset到该消息在相应log文件中的偏移,便于快速在log文件中快速找到指定offset的消息。index是一个稀疏索引,每隔一定间隔大小的offset才会建立相应的索引(比如每间隔10条消息建立一个索引)。timeindex也是一个稀疏索引文件,这样可以根据消息的时间找到对应的消息。
可以考虑将消息日志存放到多个磁盘中,这样多个磁盘可以并发访问,增加消息读写的吞吐量。这种情况下,logdirs配置的是一个目录列表,kafka会根据每个目录下partition的数量,将新分配的partition放到partition数最少的目录下。如果我们新增了一个磁盘,你会发现新分配的partition都出现在新增的磁盘上。
kafka提供了两个参数logsegmentbytes和logsegmentms来控制LogSegment文件的大小。logsegmentbytes默认值是1GB,当LogSegment大小达到logsegmentbytes规定的阈值时,kafka会关闭当前LogSegment,生成一个新的LogSegment供消息写入,当前供消息写入的LogSegment称为活跃(Active)LogSegment。logsegmentms表示最大多长时间会生成一个新的LogSegment,logsegmentms没有默认值。当这两个参数都配置了值,kafka看哪个阈值先达到,触发生成新的LogSegment。
kafka还提供了logretentionms和logretentionbytes两个参数来控制消息的保留时间。当消息的时间超过了logretentionms配置的阈值(默认是168小时,也就是一周),则会被认为是过期的,会被kafka自动删除。或者是partition的总的消息大小超过了logretentionbytes配置的阈值时,最老的消息也会被kafka自动删除,使相应partition保留的总消息大小维持在logretentionbytes阈值以下。这个地方需要注意的是,kafka并不是以消息为粒度进行删除的,而是以LogSegment为粒度删除的。也就是说,只有当一个LogSegment的最后一条消息的时间超过logretentionms阈值时,该LogSegment才会被删除。这两个参数都配置了值时,也是只要有一个先达到阈值,就会执行相应的删除策略
当我们使用KafkaProducer向kafka发送消息时非常简单,只要构造一个包含消息key、value、接收topic信息的ProducerRecord对象就可以通过KafkaProducer的send()向kafka发送消息了,而且是线程安全的。KafkaProducer支持通过三种消息发送方式
KafkaProducer客户端虽然使用简单,但是一条消息从客户端到topic partition的日志文件,中间需要经历许多的处理过程。KafkaProducer的内部结构如下所示:
从图中可以看出,消息的发送涉及两类线程,一类是调用KafkaProducersend()方法的应用程序线程,因为KafkaProducersend()是多线程安全的,所以这样的线程可以有多个;另一类是与kafka集群通信,实际将消息发送给kafka集群的Sender线程,当我们创建一个KafkaProducer实例时,会创建一个Sender线程,通过该KafkaProducer实例发送的所有消息最终通过该Sender线程发送出去。RecordAccumulator则是一个消息队列,是应用程序线程与Sender线程之间消息传递的桥梁。当我们调用KafkaProducersend()方法时,消息并没有直接发送出去,只是写入了RecordAccumulator中相应的队列中,最终需要Sender线程在适当的时机将消息从RecordAccumulator队列取出来发送给kafka集群。
消息的发送过程如下:
在使用KafkaConsumer实例消费kafka消息时,有一个特性我们要特别注意,就是KafkaConsumer不是多线程安全的,KafkaConsumer方法都在调用KafkaConsumer的应用程序线程中运行(除了consumer向kafka集群发送的心跳,心跳在一个专门的单独线程中发送),所以我们调用KafkaConsumer的所有方法均需要保证在同一个线程中调用,除了KafkaConsumerwakeup()方法,它设计用来通过其它线程向consumer线程发送信号,从而终止consumer执行。
跟producer一样,consumer要与kafka集群通信,消费kafka消息,首先需要获取消费的topic partition leader replica所在的broker地址等信息,这些信息可以通过向kafka集群任意broker发送Metadata请求消息获取。
我们知道,一个consumer group有多个consumer,一个topic有多个partition,而且topic的partition在同一时刻只能被consumer group内的一个consumer消费,那么consumer在消费partition消息前需要先确定消费topic的哪个partition。partition的分配通过group coordinator来实现。基本过程如下:
我们可以通过实现接口orgapachekafkaclientsconsumerinternalsPartitionAssignor自定义partition分配策略,但是kafka已经提供了三种分配策略可以直接使用。
partition分配完后,每个consumer知道了自己消费的topic partition,通过metadata请求可以获取相应partition的leader副本所在的broker信息,然后就可以向broker poll消息了。但是consumer从哪个offset开始poll消息?所以consumer在第一次向broker发送FetchRequest poll消息之前需要向Group Coordinator发送OffsetFetchRequest获取消费消息的起始位置。Group Coordinator会通过key {topic}-${partition}查询 __consumer_offsets topic中是否有offset的有效记录,如果存在,则将consumer所属consumer group最近已提交的offset返回给consumer。如果没有(可能是该partition是第一次分配给该consumer group消费,也可能是该partition长时间没有被该consumer group消费),则根据consumer配置参数autooffsetreset值确定consumer消费的其实offset。如果autooffsetreset值为latest,表示从partition的末尾开始消费,如果值为earliest,则从partition的起始位置开始消费。当然,consumer也可以随时通过KafkaConsumerseek()方法人工设置消费的起始offset。
kafka broker在收到FetchRequest请求后,会使用请求中topic partition的offset查一个skiplist表(该表的节点key值是该partition每个LogSegment中第一条消息的offset值)确定消息所属的LogSegment,然后继续查LogSegment的稀疏索引表(存储在index文件中),确定offset对应的消息在LogSegment文件中的位置。为了提升消息消费的效率,consumer通过参数fetchminbytes和maxpartitionfetchbytes告诉broker每次拉取的消息总的最小值和每个partition的最大值(consumer一次会拉取多个partition的消息)。当kafka中消息较少时,为了让broker及时将消息返回给consumer,consumer通过参数fetchmaxwaitms告诉broker即使消息大小没有达到fetchminbytes值,在收到请求后最多等待fetchmaxwaitms时间后,也将当前消息返回给consumer。fetchminbytes默认值为1MB,待fetchmaxwaitms默认值为500ms。
为了提升消息的传输效率,kafka采用零拷贝技术让内核通过DMA把磁盘中的消息读出来直接发送到网络上。因为kafka写入消息时将消息写入内存中就返回了,如果consumer跟上了producer的写入速度,拉取消息时不需要读磁盘,直接从内存获取消息发送出去就可以了。
为了避免发生再平衡后,consumer重复拉取消息,consumer需要将已经消费完的消息的offset提交给group coordinator。这样发生再平衡后,consumer可以从上次已提交offset出继续拉取消息。
kafka提供了多种offset提交方式
partition offset提交和管理对kafka消息系统效率来说非常关键,它直接影响了再平衡后consumer是否会重复拉取消息以及重复拉取消息的数量。如果offset提交的比较频繁,会增加consumer和kafka broker的消息处理负载,降低消息处理效率;如果offset提交的间隔比较大,再平衡后重复拉取的消息就会比较多。还有比较重要的一点是,kafka只是简单的记录每次提交的offset值,把最后一次提交的offset值作为最新的已提交offset值,作为再平衡后消息的起始offset,而什么时候提交offset,每次提交的offset值具体是多少,kafka几乎不关心(这个offset对应的消息应该存储在kafka中,否则是无效的offset),所以应用程序可以先提交3000,然后提交2000,再平衡后从2000处开始消费,决定权完全在consumer这边。
kafka中的topic partition与consumer group中的consumer的消费关系其实是一种配对关系,当配对双方发生了变化时,kafka会进行再平衡,也就是重新确定这种配对关系,以提升系统效率、高可用性和伸缩性。当然,再平衡也会带来一些负面效果,比如在再平衡期间,consumer不能消费kafka消息,相当于这段时间内系统是不可用的。再平衡后,往往会出现消息的重复拉取和消费的现象。
触发再平衡的条件包括:
需要注意的是,kafka集群broker的增减或者topic partition leader重新选主这类集群状态的变化并不会触发在平衡
有两种情况与日常应用开发比较关系比较密切:
consumer在调用subscribe()方法时,支持传入一个ConsumerRebalanceListener监听器,ConsumerRebalanceListener提供了两个方法,onPartitionRevoked()方法在consumer停止消费之后,再平衡开始之前被执行。可以发现,这个地方是提交offset的好时机。onPartitonAssigned()方法则会在重新进行partition分配好了之后,但是新的consumer还未消费之前被执行。
我们在提到kafka时,首先想到的是它的吞吐量非常大,这也是很多人选择kafka作为消息传输组件的重要原因。
以下是保证kafka吞吐量大的一些设计考虑:
但是kafka是不是总是这么快?我们同时需要看到kafka为了追求快舍弃了一些特性:
所以,kafka在消息独立、允许少量消息丢失或重复、不关心消息顺序的场景下可以保证非常高的吞吐量,但是在需要考虑消息事务、严格保证消息顺序等场景下producer和consumer端需要进行复杂的考虑和处理,可能会比较大的降低kafka的吞吐量,例如对可靠性和保序要求比较高的控制类消息需要非常谨慎的权衡是否适合使用kafka。
我们通过producer向kafka集群发送消息,总是期望消息能被consumer成功消费到。最不能忍的是producer收到了kafka集群消息写入的正常响应,但是consumer仍然没有消费到消息。
kafka提供了一些机制来保证消息的可靠传递,但是有一些因素需要仔细权衡考虑,这些因素往往会影响kafka的吞吐量,需要在可靠性与吞吐量之间求得平衡:
kafka只保证partition消息顺序,不保证topic级别的顺序,而且保证的是partition写入顺序与读取顺序一致,不是业务端到端的保序。
如果对保序要求比较高,topic需要只设置一个partition。这时可以把参数maxinflightrequestsperconnection设置为1,而retries设置为大于1的数。这样即使发生了可恢复型错误,仍然能保证消息顺序,但是如果发生不可恢复错误,应用层进行重试的话,就无法保序了。也可以采用同步发送的方式,但是这样也极大的降低了吞吐量。如果消息携带了表示顺序的字段,可以在接收端对消息进行重新排序以保证最终的有序。
kafka producer将消息发送给broker后,消息日志会被存储在broker的磁盘上,采用顺序写入的方式。顺序写可以加快磁盘访问速度,并且可以将将多个小型的逻辑写合并成一次大型的物理磁盘写入,官方数据显示顺序写比随机写入快6000倍以上。另外, *** 作系统使用内存对磁盘进行缓存即pagecache,pagecache完全由 *** 作系统管理,这也使得写数据变得即简洁也快速。
配置中可以调整过期时间,超过改时间的消息日志将移除,默认值为7天;也可配置文件大小阈值,达到该阈值后,从最旧消息开始删除。配置项为:
从文件到套接字的常见数据传输路径有4步:
1) *** 作系统从磁盘读取数据到内核空间的 pagecache
2)应用程序读取内核空间的数据到用户空间的缓冲区
3)应用程序将数据(用户空间的缓冲区)写回内核空间到套接字缓冲区(内核空间)
4) *** 作系统将数据从套接字缓冲区(内核空间)复制到通过网络发送的 NIC 缓冲区
kafka使用 producer ,broker 和 consumer 都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。kafka采用Linux 中系统调用sendfile的方式,直接将数据从 pagecache 转移到 socket 网络连接中。这种零拷贝方式使得kafka数据传输更加高效。
以前面文章中安装的kafka为例: Mac 安装kafka
kafka本地文件存储目录可以在配置文件serverproperties中设置,参数及默认值为:
进入该目录,可以看到kafka保存的cosumer offset和topic消息:
其中__consumer_offsets开头的为消费的offset信息,test1开头的即为之前创建的topic “test1”,该topic有三个分区,分区编号从0开始,分别是test1-0、test1-1、test1-2。
进入test1-0,查看包含文件如下:
可以看到kafka消息按partition存储的,每个partition一个目录。partition下消息分段(segment)存储,默认每段最大1G,通过参数logsegmentbytes可配置。segment包含索引文件index、消息文件log,分别存储消息的索引和内容,以index和log结尾,文件命名为当前segment第一个消息offset。index文件在log每隔一定数据量之间建立索引,可以通过参数indexintervalbytes配置。
通过kafka命令查看00000000000000000000index内容如下:
00000000000000000000log内容如下:
其中索引文件中包含两个字段:(offset,position),分别表示消息offset和该消息在log文件的偏移量。如上图中offset=0的消息对应的position=0;对应的就是00000000000000000000log中的第一条消息:
其中payload为具体的消息内容。
另外里面还有一个以"timeindex"结尾的文件,查看其内容:
该日志文件是kafka01011加入的,其中保存的为:(消息时间戳,offset)。时间戳是该segment最后一个消息对应的时间戳(与log文件中最后一条记录时间戳一致),kafka也支持根据时间来读取消息。
由上可知消息是按partition来存储的,partition可以配置n个副本followers。多个partition和其follower在broker上是怎么分配的呢?
partition和broker都进行了排序,下标从0开始;
假设有k个broker,第i个partition被分配到到 i%k 个broker上;
第i%k个broker即为partition i 的leader,负责这个partition的读写 ;
partition的followers也进行排序,从leader的后续broker开始分配,第i个partition的第j个副本broker为 (j+ i%k)%k。
一个有3个broker的kafka集群,包含3个partition,每个partition副本数为1的topic如下图:
总结:
kafka将消息日志采用顺序写入的方式存放在broker磁盘中;数据传输通过系统调用sendfile零拷贝方式;消息日志分段存放,可配置清除时间或大小阈值;每段包含消息索引、消息内容两个文件,通过索引实现快速查找;按照/topic/partition的目录结构分开存储,且均匀分布到集群各broker上。
参考:
>
kafka 组成 :consumer/producter/broker
kafka和zk关系:zk存放kafka元信息,比如 topic - partition - replication- brokerIp,数据结构如树状,一层层的结构,叶子节点是brokerIp
思考:老版本的kafka,消费的偏移量也是放在zk的,新版本已经放在kafka的broker中,为什么要迁移?kafka如何记录消息偏移?消息偏移的信息存放在哪儿?
1、目前kafka支持自动提交偏移,比如设置配置 autocommitenable、autocommitintervalms;以及手动设置偏移
2、消息偏移信息存放在kafka内部的一个主题中 __consumer_offsets ,默认的分区数和副本配置是 offsettopicnumpartition、offsettopicreplicationfactor,默认值分别是50和3;
3、__consumer_offsets记录三类消息:
1、group创建,记录元信息
2、消费者消费偏移
消息的结构是key-value 格式,key 值是:<consumer group id + topic + partition number> ,value 是 partition的偏移,以及一些其他后续处理的信息,如果时间戳
3、墓碑消息,当group下的所有的consumer下线 并且 group的消费偏移都已经删除,会发送墓碑消息,会将group下的所有的消费偏移信息全部删除
4、无论是自动提交偏移还是手动提交偏移都会有大量的记录产生,长时间会占满磁盘,kafka是如何处理的?
kafka采用Compact机制,多条同名的key,只保留最新的偏移信息;因为key同名代表某个消费分组在某个topic下的partition的偏移,保留一份即可
1、kafka 分区副本分为leader/follower 类似与mysql的主从,但是不同的是kafka leader承接所有的读写请求,follower注意任务是数据冗余和选主,不接受consumer的读写。kafka为什么要这样设计?
> 分区实现负载;如果kafka想实现负载,可以使用多分区机制,比如kafka集群有5台机器,每台机器一个broker,主题W并发高想充分利用5台服务器,可以采用 5个partition,3个replication,producer采用默认的发送机制即round robin,那么5个分区所在的5台服务器的负载是均衡的
> 优势:read my write 读自己的写,在同一个broker中进行读写,不像mysql存在数据延迟情况; 单掉读,不会出现数据读取不一致的情况,比如follower A 和 B 同步的进度不一致,consumer开始从followerA读取,因为宕机重启后从B读取,数据不一致。
2、副本leader宕机,如何在consistent 和 avaliable 做选择?
通过ISR(In Sync Replication)保存的是同步leader的副本组,进入ISR的标准是根据落后leader的时间(这点不像mysql故障转移是根据落后的位置),落后的时间根据 replicalagmaxms设置,如果在范围内,则ISA扩容加入新成员,否则缩容,减少成员;在ISR的follwer在leader宕机时可以参与选主,保证可用性,但是如果ISR没有follwer成员,说明所有的副本都落后leader的规定时间差,那么会导致不可用,这是保证CAP的C;
那么如何牺牲Consistent而确保Avaliable呢?
可以通过参数uncleanleaderelectionenable=true,表示落后leader的follwer可以参与选主,保证Avaliable但是缺点是会导致数据丢失,所以要在C/A中权衡。
1、生产者 压缩,但是没有 zero copy,因为要校验,过滤掉不满足要求的消息
2、生产者顺序写磁盘 ; 采用 page cache,保证Linxu安全
3、消费者顺序读,采用 page cache,实现zero copy
4、为什么page cache可以提高吞吐,它是什么,有什么机制?
- 背景:kafka只针对已提交的消息做可靠性保证,已提交的消息在broker和producer都有自己的定义
0、topic定义partition的副本数 replicationfactor > 1,数据冗余,保证partition高可用
1、producer如何保证数据不丢失:
producer 开启重试策略,比如: 配置 retries属性 > 0
producer acks=all, 生产者定义已提交的语意,所有的replication都成功写入才算成功
2、broker 如何避免数据不丢失:
- broker 开启 uncleanleaderelectionenable =false ,禁止数据落后的replication 竞选leader,避免数据丢失
- brokder 开启 mininsyncreplicas >1 ,表示消息同步几份副本才算已提交,注意 producer acks=all定义的已提交泛语意(比如之前3个replication 宕机2个,那么提交1个就算已提交),而miniinsyncreplicas = number(一般配置大于1即可) 定义的是已提交的下限。 在有replication downtime场景时作用凸显。尽量不要把mininsyncreplicas的值配置成 replicationfactor 副本数,因为一旦有副本downtime,broker将不可用,一般采用 replicationfactor = mininsyncreplicas +1
3、消费者测如何避免数据不丢失:
- 先消费再手动commit; 避免先commit后消费
1、at most once,至多一次,如果发送失败,会丢失消息
2、at lease once,至少一次,如果出现网络抖动,发送成功了,但是因为网络抖动的原因一直没有收到响应消息,再次发送,会产生多条消息
3、exactly once
exactly once 实现
1、生产者配置enableidempotent,实现幂等姓性生产者,实现消息只发送一次,局限是只针对单个partition / 单次会话(实现原理是,生产者会给消息增加唯一字段,broker根据唯一字段去重)
2、如何实现多个partition/多次会话的exactly once?
采用事物性的producter,大概需要三个步骤:
1、enableidemponent 开启幂等性producer;
2、为producer配置transactionid;
3、开启事物,实现多paritition消息要么全部成功,要么全部失败。
需要注意的是:消费者默认的消费隔离级别是: RU,需要配置isolation level 为RC
五、如何避免消息重复带来的影响?
消息幂等性,比如采用幂等性生产者、再比如消费者根据消息唯一id做消费记录,在消费消息之前做判断(消费重复性解决了,但是性能下降了,需要做具体的权衡)
1、什么情况会出现rebalence?
新增/减少consumer
新增/减少topic,比如正则表达式订阅主题,新增的主题满足正则表达式
新增/减少分区,比如修改topic ,增加/减少分区
2、rebalence的有什么优势和劣势?
优势:消费者消费分区负载均衡
劣势:rebalence时所有的消费者会被占停,如果集群的consumer group下的consumer数过多,rebalence时间会很长;现在的rebalence与partition会重新打乱,会重现发起connnect,这个缺点后续的kafka肯定会优化掉
3、如何避免rebalance
避免coordinate对consumer对下线误判, 合理对设置 sessiontimeoutms (如果consumer在指定的会话时间没有发送心跳包,coordinator会认为consumer下线,引起rebalance); 解决思路: 设置 sessiontimeoutms 是 heartheatintervalms 的倍数,一般数 10:1
coodinator认为消费者消费能力不足引起的 rebalance,例如:consumer在 maxpollintervalms 取出N条数据,再下一个间隔时间仍然没有处理完毕。解决思路,根据consumer实际的情况设置 maxpollintervalms 参数
4、细聊rebalance的过程
背景:kafka的consumer group在coodinator有四种状态: empty、rebalance、rebalanceComplete、stable;
当consumer group 下没有consuner(比如consumer全部下线)时处于empty状态,如果empty状态超过一定当阀值,consumer group之前当位移信息会变成expire offset,coodinator会定期清理expire offset,所以当group当consumer全部下线超过一定的时间,消费偏移丢失,会出现重头开始消费情况!!!!当触发rebalance当条件时,group 会处于rebalance状态,rebalance完成进入rebalanceComplete状态
rebalance过程:触发rebalance条件时broker会像组内所有当成员响应 rebalance 心跳信息,接着所有成员会进入下面两个环节 JoinGroup 、GroupSync
JoinGroup:组的所有成员会向coodinator上报自己订阅的主题信息,并将第一个上报的consumer当作consumer leader(后面会由它来做分配),coordinator收集所有的成员要订阅的主题后,会把消息通知给consumer leader,做完分配后,发送给coodinatoer
GroupSync: 组成员将自己要订阅的信息同步至coodinator后进入GroupSync阶段,consumer会定时向coordinator发送GroupSync心跳信息包,coodinator将consumer leader分配的信息同步至组内成员,JoinGroup结束
总结:从上面可以看到,如果consumer数较少时,rebalance还是很快的,Stop The Wrold时间很短,但是当consumer数很多时,consumer不工作时间长,rebalance弊端就凸显出来了。
思考:针对某个topic,给consumer group 配置多少consumer合适?
1、假如topic 有20个partition,在consumer group下有30个consumer的时候,有效的consumer会是多少?
2、假如某个topic,partition的并发很高,积压很多消息,消费者的能力小于生产者,该如何处理?
不过要注意一些注意事项,对于多个partition和多个consumer
1 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5 High-level接口中获取不到数据的时候是会block的
简单版,
简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置
因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset
Properties props = new Properties();
propsput("autooffsetreset", "smallest"); //必须要加,如果要读旧数据
propsput("zookeeperconnect", "localhost:2181");
propsput("groupid", "pv");
propsput("zookeepersessiontimeoutms", "400");
propsput("zookeepersynctimems", "200");
propsput("autocommitintervalms", "1000");
ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector consumer = kafkaconsumerConsumercreateJavaConsumerConnector(conf);
String topic = "page_visits";
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMapput(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic);
KafkaStream<byte[], byte[]> stream = streamsget(0);
ConsumerIterator<byte[], byte[]> it = streamiterator();
while (ithasNext()){
Systemoutprintln("message: " + new String(itnext()message()));
}
if (consumer != null) consumershutdown(); //其实执行不到,因为上面的hasNext会block
在用high-level的consumer时,两个给力的工具,
1 bin/kafka-run-classsh kafkatoolsConsumerOffsetChecker --group pv
可以看到当前group offset的状况,比如这里看pv的状况,3个partition
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键就是offset,logSize和Lag
这里以前读完了,所以offset=logSize,并且Lag=0
2 bin/kafka-run-classsh kafkatoolsUpdateOffsetsInZK earliest config/consumerproperties page_visits
3个参数,
[earliest | latest],表示将offset置到哪里
consumerproperties ,这里是配置文件的路径
topic,topic名,这里是page_visits
我们对上面的pv group执行完这个 *** 作后,再去check group offset状况,结果如下,
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
可以看到offset已经被清0,Lag=logSize
底下给出原文中多线程consumer的完整代码
import kafkaconsumerConsumerConfig;
import kafkaconsumerKafkaStream;
import kafkajavaapiconsumerConsumerConnector;
import javautilHashMap;
import javautilList;
import javautilMap;
import javautilProperties;
import javautilconcurrentExecutorService;
import javautilconcurrentExecutors;
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafkaconsumerConsumercreateJavaConsumerConnector( // 创建Connector,注意下面对conf的配置
createConsumerConfig(a_zookeeper, a_groupId));
thistopic = a_topic;
}
public void shutdown() {
if (consumer != null) consumershutdown();
if (executor != null) executorshutdown();
}
public void run(int a_numThreads) { // 创建并发的consumers
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMapput(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap); // 创建Streams
List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic); // 每个线程对应于一个KafkaStream
// now launch all the threads
//
executor = ExecutorsnewFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executorsubmit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
propsput("zookeeperconnect", a_zookeeper);
propsput("groupid", a_groupId);
propsput("zookeepersessiontimeoutms", "400");
propsput("zookeepersynctimems", "200");
propsput("autocommitintervalms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = IntegerparseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
examplerun(threads);
try {
Threadsleep(10000);
} catch (InterruptedException ie) {
}
exampleshutdown();
}
}
SimpleConsumer
另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口
参考,
什么时候用这个接口
Read a message multiple times
Consume only a subset of the partitions in a topic in a process
Manage transactions to make sure a message is processed once and only once
当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦
所以不是一定要用,最好别用
You must keep track of the offsets in your application to know where you left off consuming
You must figure out which Broker is the lead Broker for a topic and partition
You must handle Broker leader changes
使用SimpleConsumer的步骤:
Find an active Broker and find out which Broker is the leader for your topic and partition
Determine who the replica Brokers are for your topic and partition
Build the request defining what data you are interested in
Fetch the data
Identify and recover from leader changes
首先,你必须知道读哪个topic的哪个partition
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,自己去写request并fetch数据
最终,还要注意需要识别和处理broker leader的改变
一、kafka的存储机制
1、segment
2、读取数据
二、可靠性保证
1、AR
2、生产者可靠性级别
3、leader选举
4、kafka可靠性的保证
一、kafka的存储机制
kafka通过topic来分主题存放数据,主题内有分区,分区可以有多个副本,分区的内部还细分为若干个segment。所谓的分区其实就是在kafka对应存储目录下创建的文件夹,文件夹的名字是主题名加上分区编号,编号从0开始。
1、segment
所谓的segment其实就是在分区对应的文件夹下产生的文件。一个分区会被划分成大小相等的若干segment,这样一方面保证了分区的数据被划分到多个文件中保证不会产生体积过大的文件;另一方面可以基于这些segment文件进行 历史 数据的删除,提高效率。一个segment又由一个log和一个index文件组成。 1.log log文件为数据文件用来存放数据分段数据。
2.index index为索引文件保存对对应的log文件的索引信息。在index文件中,保存了对对应log文件的索引信息,通过查找index文件可以获知每个存储在当前segment中的offset在log文件中的开始位置,而每条日志有其固定格式,保存了包括offset编号、日志长度、key的长度等相关信息,通过这个固定格式中的数据可以确定出当前offset的结束位置,从而对数据进行读取。
3.命名规则 这两个文件的命名规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充。
2、读取数据
开始读取指定分区中某个offset对应的数据时,先根据offset和当前分区的所有segment的名称做比较,确定出数据在哪个segment中,再查找该segment的索引文件,确定当前offset在数据文件中的开始位置,最后从该位置开始读取数据文件,在根据数据格式判断结果,获取完整数据。
二、可靠性保证
1、AR
在Kafka中维护了一个AR列表,包括所有的分区的副本。AR又分为ISR和OSR。AR = ISR + OSR。AR、ISR、OSR、LEO、HW这些信息都被保存在Zookeeper中。
1.ISR
ISR中的副本都要同步leader中的数据,只有都同步完成了数据才认为是成功提交了,成功提交之后才能供外界访问。在这个同步的过程中,数据即使已经写入也不能被外界访问,这个过程是通过LEO-HW机制来实现的。
2.OSR
OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。最开始所有的副本都在ISR中,在kafka工作的过程中,如果某个副本同步速度慢于replicalagtimemaxms指定的阈值,则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中。
3.LEO
LogEndOffset:分区的最新的数据的offset,当数据写入leader后,LEO就立即执行该最新数据。相当于最新数据标识位。
4.HW
HighWatermark:只有写入的数据被同步到所有的ISR中的副本后,数据才认为已提交,HW更新到该位置,HW之前的数据才可以被消费者访问,保证没有同步完成的数据不会被消费者访问到。相当于所有副本同步数据标识位。在leader宕机后,只能从ISR列表中选取新的leader,无论ISR中哪个副本被选为新的leader,它都知道HW之前的数据,可以保证在切换了leader后,消费者可以继续看到HW之前已经提交的数据。所以LEO代表已经写入的最新数据位置,而HW表示已经同步完成的数据,只有HW之前的数据才能被外界访问。
5.HW截断机制
如果leader宕机,选出了新的leader,而新的leader并不能保证已经完全同步了之前leader的所有数据,只能保证HW之前的数据是同步过的,此时所有的follower都要将数据截断到HW的位置,再和新的leader同步数据,来保证数据一致。当宕机的leader恢复,发现新的leader中的数据和自己持有的数据不一致,此时宕机的leader会将自己的数据截断到宕机之前的hw位置,然后同步新leader的数据。宕机的leader活过来也像follower一样同步数据,来保证数据的一致性。
2、生产者可靠性级别
通过以上的讲解,已经可以保证kafka集群内部的可靠性,但是在生产者向kafka集群发送时,数据经过网络传输,也是不可靠的,可能因为网络延迟、闪断等原因造成数据的丢失。kafka为生产者提供了如下的三种可靠性级别,通过不同策略保证不同的可靠性保障。其实此策略配置的就是leader将成功接收消息信息响应给客户端的时机。通过requestrequiredacks参数配置:1:生产者发送数据给leader,leader收到数据后发送成功信息,生产者收到后认为发送数据成功,如果一直收不到成功消息,则生产者认为发送数据失败会自动重发数据。当leader宕机时,可能丢失数据。0:生产者不停向leader发送数据,而不需要leader反馈成功消息。这种模式效率最高,可靠性最低。可能在发送过程中丢失数据,也可能在leader宕机时丢失数据。-1:生产者发送数据给leader,leader收到数据后要等到ISR列表中的所有副本都同步数据完成后,才向生产者发送成功消息,如果一只收不到成功消息,则认为发送数据失败会自动重发数据。这种模式下可靠性很高,但是当ISR列表中只剩下leader时,当leader宕机让然有可能丢数据。此时可以配置mininsyncreplicas指定要求观察ISR中至少要有指定数量的副本,默认该值为1,需要改为大于等于2的值这样当生产者发送数据给leader但是发现ISR中只有leader自己时,会收到异常表明数据写入失败,此时无法写入数据,保证了数据绝对不丢。虽然不丢但是可能会产生冗余数据,例如生产者发送数据给leader,leader同步数据给ISR中的follower,同步到一半leader宕机,此时选出新的leader,可能具有部分此次提交的数据,而生产者收到失败消息重发数据,新的leader接受数据则数据重复了。
3、leader选举
当leader宕机时会选择ISR中的一个follower成为新的leader,如果ISR中的所有副本都宕机,怎么办?有如下配置可以解决此问题:uncleanleaderelectionenable=false策略1:必须等待ISR列表中的副本活过来才选择其成为leader继续工作。uncleanleaderelectionenable=true策略2:选择任何一个活过来的副本,成为leader继续工作,此follower可能不在ISR中。策略1,可靠性有保证,但是可用性低,只有最后挂了leader活过来kafka才能恢复。策略2,可用性高,可靠性没有保证,任何一个副本活过来就可以继续工作,但是有可能存在数据不一致的情况。
4、kafka可靠性的保证
At most once:消息可能会丢,但绝不会重复传输。At least once:消息绝不会丢,但可能会重复传输。Exactly once:每条消息肯定会被传输一次且仅传输一次。kafka最多保证At least once,可以保证不丢,但是可能会重复,为了解决重复需要引入唯一标识和去重机制,kafka提供了GUID实现了唯一标识,但是并没有提供自带的去重机制,需要开发人员基于业务规则自己去重。
offset即消费消息的偏移值,记录了kafka每个consumergroup的下一个需要读取消费位置,保障其消息的消费可靠性。
kafka0811以前,offset保存在zk中,存放在/consumers节点下。但是由于频繁访问zk,zk需要一个一个节点更新offset,不能批量或分组更新,导致offset更新成了瓶颈。后续两个过渡版本增加了参数“offsetsstorage”,该参数可配置为“zookeeper”或“kafka”分别表示offset的保持位置在zk或是broker,默认保留在zk,09版本以后offset就默认保存在broker下。若配置的“kafka”,当设置了“dualcommitenabled”参数时,offset仍然可以提交到zk。
zk中保存offset结构为:
注意:由于kafka对客户端client向下兼容,低版本的client仍然能够通过链接zk消费数据,并提交offset数据,即使broker版本高于09,提交的offset仍然保存在zk;此时仍然存在offset更新瓶颈问题,所以建议尽量使用高版本client,通过链接broker方式消费数据。
例如:kafka broker版本260,consumer版本0821:
构建consumer:
启动消费者消费全部10条历史消息,查看zk下/consumer节点的消费者信息:
可以看到group “test_group1”对topic “test1”的3个partition消费情况,offset分别为6,2,2。
这里kafka只记录了每个group的消费情况,没有对某一个consumer做单独记录。早期版本/ids节点记录consumer id信息,owner节点记录各个partition所属consumer信息
如上所述,新版本中offset由broker维护,offset信息由一个特殊的topic “ __consumer_offsets”来保存,offset以消息形式发送到该topic并保存在broker中。这样consumer提交offset时,只需连接到broker,不用访问zk,避免了zk节点更新瓶颈。
broker消息保存目录在配置文件serverproperties中:
该目录下默认包含50个以__consumer_offsets开头的目录,用于存放offset:
offset的存放位置决定于groupid的hash值,其获取方式:
其中numPartitions由offsetstopicnumpartitions参数决定,默认值即50。以groupid “test-group”为例,计数其存储位置为:__consumer_offsets-12,当其消费全部10条数据后,使用命令查看该目录下消息记录:kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --partition 12 --from-beginning --formatter 'kafkacoordinatorgroupGroupMetadataManager$OffsetsMessageFormatter'
该数据结构为以groupid-topic-partition作为key,value为OffsetAndMetadata,其中包含了offset信息。可以看到group“test-group”在topic“test1”的三个partition下offset值分别为6,2,2。同保存在zk数据一样,offset只记录groupid的消费情况,对于具体consumer是透明的。
那么offset具体被发送给哪个broker保存呢?
由上文可知,offset的存储分区是通过groupid的hash值取得的,那么offset发送的broker就是该分区的leader broker,这也符合kafka普通消息的发生逻辑。所以,每个group的offset都将发生到一个broker,broker中存在一个offset manager 实例负责接收处理offset提交请求,并返回提交 *** 作结果。
参考:
>
以上就是关于kafka consumer offset机制全部的内容,包括:kafka consumer offset机制、kafka原理分析、kafka消息的管理等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)