在kafka中,每个日志分段文件都对应了两个索引文件—— 偏移量索引文件和时间戳索引文件 (还有其它的诸如事务日志索引文件就不细表了),主要用来 提高查找消息的效率 。
偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。
Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。
每当写入一定量 (由 broker 端参数 logindexintervalbytes 指定,默认值为 4096,即 4KB) 的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小 logindexintervalbytes 的值,对应地可以缩小或增加索引项的密度。
稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。
偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。
时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。
稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。
以偏移量索引文件来做具体分析。偏移量索引项的格式如下图所示。
每个索引项占用 8 个字节,分为两个部分:
(1) relativeOffset : 相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节,当前索引文件的文件名即为 baseOffset 的值。
(2) position : 物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4 个字节。
消息的偏移量(offset)占用 8 个字节,也可以称为绝对偏移量。
索引项中没有直接使用绝对偏移量而改为只占用 4 个字节的相对偏移量(relativeOffset = offset - baseOffset),这样可以减小索引文件占用的空间。
举个例子,一个日志分段的 baseOffset 为 32,那么其文件名就是 00000000000000000032log,offset 为 35 的消息在索引文件中的 relativeOffset 的值为 35-32=3。
如果我们要查找偏移量为 23 的消息,那么应该怎么做呢首先通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即[22, 656],然后从日志分段文件中的物理位置 656 开始顺序查找偏移量为 23 的消息。
以上是最简单的一种情况。参考上图,如果要查找偏移量为 268 的消息,那么应该怎么办呢
首先肯定是定位到baseOffset为251的日志分段,然后计算相对偏移量relativeOffset = 268 - 251 = 17,之后再在对应的索引文件中找到不大于 17 的索引项,最后根据索引项中的 position 定位到具体的日志分段文件位置开始查找目标消息。
那么又是如何查找 baseOffset 为 251 的日志分段的呢
这里并不是顺序查找,而是用了跳跃表的结构。
Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 key,这样可以根据指定偏移量来快速定位到消息所在的日志分段。
在Kafka中要定位一条消息,那么首先根据 offset 从 ConcurrentSkipListMap 中来查找到到对应(baseOffset)日志分段的索引文件,然后读取偏移量索引索引文件,之后使用二分法在偏移量索引文件中找到不大于 offset - baseOffset z的最大索引项,接着再读取日志分段文件并且从日志分段文件中顺序查找relativeOffset对应的消息。
Kafka中通过offset查询消息内容的整个流程我们可以简化成下图:
Kafka中消息的offset可以类比成InnoDB中的主键,前者是通过offset检索出整条Record的数据,后者是通过主键检索出整条Record的数据。
InnoDB中通过主键查询数据内容的整个流程建议简化成下图(下半部分)。
Kafka中通过时间戳索引文件去检索消息的方式可以类比于InnoDB中的辅助索引的检索方式:
前者是通过timestamp去找offset,后者是通过索引去找主键,后面两者的过程就和上面的陈述相同。
Kafka中当有新的索引文件建立的时候ConcurrentSkipListMap才会更新,而不是每次有数据写入时就会更新,这块的维护量基本可以忽略
B+树中数据有插入、更新、删除的时候都需要更新索引,还会引来“页分裂”等相对耗时的 *** 作。Kafka中的索引文件也是顺序追加文件的 *** 作,和B+树比起来工作量要小很多。
说到底还是应用场景不同所决定的。MySQL中需要频繁地执行CRUD的 *** 作,CRUD是MySQL的主要工作内容,而为了支撑这个 *** 作需要使用维护量大很多的B+树去支撑。
Kafka中的消息一般都是顺序写入磁盘,再到从磁盘顺序读出(不深入探讨page cache等),他的主要工作内容就是:写入+读取,很少有检索查询的 *** 作
换句话说, 检索查询只是Kafka的一个辅助功能,不需要为了这个功能而去花费特别太的代价去维护一个高level的索引。
前面也说过,Kafka中的这种方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。
kafka是一个 分布式 的、支持 分区的(partition )、多副本的 (replica ),基于 zookeeper 协调的 分布式消息系统。
从上面的描述中我们可以知道kafka的核心知识点:partition、replica
一个topic可以认为一个一类消息,每个topic将被分成多个partition。
在上图中我们的生产者会决定发送到哪个 Partition:
如果没有 Key 值则进行轮询发送。
如果有 Key 值,对 Key 值进行 Hash,然后对分区数量取余,保证了同一个 Key 值的会被路由到同一个分区。(所有系统的partition都是同一个路数)
在上图我们也可以看到,offset是跟partition走的,每个partition都有自己的offset。
总所周知,topic在物理层面以partition为分组,一个topic可以分成若干个partition,那么topic以及partition又是怎么存储的呢
其实partition还可以细分为logSegment,一个partition物理上由多个logSegment组成,那么这些segment又是什么呢
LogSegment 文件由两部分组成,分别为“index”文件和“log”文件,分别表示为 Segment 索引文件和数据文件。
这两个文件的命令规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下:
如上图,“index”索引文件存储大量的元数据,“log”数据文件存储大量的消息,索引文件中的元数据指向对应数据文件中message的物理偏移地址。其中以“index”索引文件中的元数据[3, 348]为例,在“log”数据文件表示第3个消息,即在全局partition中表示170410+3=170413个消息,该消息的物理偏移地址为348。
那么如何从partition中通过offset查找message呢
以上图为例,读取offset=170418的消息,首先查找segment文件,其中00000000000000000000index为最开始的文件,第二个文件为00000000000000170410index(起始偏移为170410+1=170411),而第三个文件为00000000000000239430index(起始偏移为239430+1=239431),所以这个offset=170418就落到了第二个文件之中。其他后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。其次根据00000000000000170410index文件中的[8,1325]定位到00000000000000170410log文件中的1325的位置进行读取。
要是读取offset=170418的消息,从00000000000000170410log文件中的1325的位置进行读取,那么怎么知道何时读完本条消息,否则就读到下一条消息的内容了
这个就需要联系到消息的物理结构了,消息都具有固定的物理结构,包括:offset(8 Bytes)、消息体的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等字段,可以确定一条消息的大小,即读取到哪里截止。
Kafka 的副本机制是多个服务端节点对其他节点的主题分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫 Reblance)。
Kafka 每个主题的每个分区都有一个主副本以及 0 个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。
当producer向leader发送数据时,可以通过requestrequiredacks参数来设置数据可靠性的级别:
在kafka系统中,会涉及到多处选举机制,主要有这三方面:
kafka是一个高性能、低延迟的分布式发布-订阅消息系统,适用于在线、离线消息消费,为了防止数据丢失,kafka将消息持久化到磁盘上并在集群内复制
在深入了解kafka之前,先介绍kafka体系架构中的一些组件,包括Topic、Producer、Consumer、Consumer Group、Broker、Partition、Leader、Follower。
Topic
消息被发送到kafak中都有一个类别,这个类别叫做Topic,kafka中的消息都是通过主题进行组织的,一个Topic可以有1个或多个Partition。
Producer
生产者,即是将消息发送到kafka的过程,发送消息是需要指定Topic,并且可以指定Partition。Broker接收到消息后,将消息存放在partition中。
Consumer
消费者,从broker topic中读取消息,可以订阅一个或多个topic。
Consumer Group
消费者组由一个或多个消费者组成,消费者组中的消费者共同消费一个主题的分区,主题中每个分区只能被同一个消费者组中的一个消费者消费。
Broker
kafka集群包括一个或多个节点,每个节点就叫做Broker。
Partition
Topic中的数据可以分割为一个或多个Partition,Partition在底层逻辑是log文件,每个Partition由多个Segment组成,任何发送到kafka的消息最终都是会被追加到log文件的尾部。
Leader
Topic的Partition允许有若干个副本,每个分区都一个leader和多个follower,leader负责所有的读写 *** 作。
Follower
Follower追随Leader,所有的读写请求都是通过Leader路由,请求会广播给所有的Follower,Follower和Leader保持数据同步。如果Leader失效,通过Follower选举一个新的Leader
下面通过一张简单的UML图简要说明组件之间的交互和关联关系
主要关系说明如下
- kafka集群可以有1个或多个Broker
- Broker 可以包含多个副本(每个分区可以包含多个副本,通常每个分区副本数不会多于Broker数量,一个broker中包含很多个分区)
- Topic可以有1个或多个分区
- broker中的每个partition可以有0个或1个副本
- 一个partition有一个leader副本和0个或多个follower副本
- partition的每个副本都必须位于单独的broker上
- 每个partition副本位于一个broker上,并且一个partition不能划分多个broker。
Kafka架构
下面重点介绍Producer、Topic、Consumer的关系,一个简单生产消费的过程例子如下图所示
在这个例子中,一个生产者将消息发送给单个topic
上面这个图中,1个生产者发布消息到1个topic,一个消费者消费1个Topic,如上图中的Producer 1和Consumer 1;一个Topic可以是由多个生产者发布消息,如Topic4;1个消费者可以消费多个Topic,如图中的Consumer 2。
如上图的例子,一个生产者可以给多个Topic发布消息。一个消费者同一时间只能给一个topic发布消息,但是可以使用异步发布消息,生产者可以一次将多个消息发送给多个Topic
生产者负责将每条消息发送到分区,默认分区由消息key通过hash算法确定,如果没有指定消息key,则通过循环轮询来确定分区。但是在实际业务场景中,默认的分区行为并不能满足业务需要,比如需要确保消息的顺序或需要将消息平均分配给消费者等等。因此,生产者在发布消息的时候可以使用自定义分区方式,为消息指定分区key、重写分区算法或手动设置分区等方式将消息发布到特定分区。
kafka内部运作的基本逻辑大概为:每个主题都有1个或多个分区,这些分区分不在1个或多个Broker上,为了提高消息的可靠性不会丢失,可以配置多个副本因子,这样每个分区可以被复制到一个或多个Broker上,每个分区对应一个log文件,只能被一个消费组中的一个消费者消费,用于提高Topic的并发性。因此一般将消费组消费者数量设置为小于或者等于topic的分区数量,如果要增加消费者也相应的增加对应的分区数量。
同一个分区内的消息是由顺序的,每个分区仅能被同一个消费组中的一个消费者顺序消费。但是,当多个消费组订阅了同一个topic,则每个组中的消费者都会收到消息。
下面例子说明多分区情况下,消费者组和消费者消费的几种情况。
分区数和消费者数相同,如下图所示
这种情况,同一个消费组的每个消费者只消费一个分区。
另外一种情况,消费组中的消费者数量多于分区数,如下图所示
消费者数量多于分区数,则某些消费者就处于空闲状态,当有消费者down掉或添加新的分区情况时,空闲消费者将发挥作用。
另外一种情况,消费者数比分区数少,如下图所示
这种情况,导致某些分区需要负责更多的分区,处理更多的消息。
最后,多个消费组消费了同一个topic
topic消息被广播到每个消费组,每个消费组都可以接受同一个消息。这是kafka实现一个Topic广播的方式,一个Topic可以被多个Conumse Group的消费者同时消费;同一个消息只能被一个消费者组中的一个消费者消费。
Kafka 09版本正式使用Java版本的producer替换了原Scala版本的producer。
注:ProducerRecord允许用户在创建消息对象的时候就直接指定要发送的分区,这样producer后续发送该消息时可以直接发送到指定分区,而不用先通过Partitioner计算目标分区了。另外,我们还可以直接指定消息的时间戳——但一定要慎重使用这个功能,因为它有可能会令时间戳索引机制失效。
流程描述:
用户首先构建待发送的消息对象ProducerRecord,然后调用KafkaProducer#send方法进行发送。KafkaProducer接收到消息后首先对其进行序列化,然后结合本地缓存的元数据信息一起发送给partitioner去确定目标分区,最后追加写入到内存中的消息缓冲池(accumulator)。此时KafkaProducer#send方法成功返回。同时,KafkaProducer中还有一个专门的Sender IO线程负责将缓冲池中的消息分批次发送给对应的broker,完成真正的消息发送逻辑。
新版本的producer从设计上来说具有以下几个特点:
总共创建两个线程:执行KafkaPrducer#send逻辑的线程——我们称之为“用户主线程”;执行发送逻辑的IO线程——我们称之为“Sender线程”。
不同于Scala老版本的producer,新版本producer完全异步发送消息,并提供了回调机制(callback)供用户判断消息是否成功发送。
batching机制——“分批发送“机制。每个批次(batch)中包含了若干个PRODUCE请求,因此具有更高的吞吐量。
更加合理的默认分区策略:对于无key消息而言,Scala版本分区策略是一段时间内(默认是10分钟)将消息发往固定的目标分区,这容易造成消息分布的不均匀,而新版本的producer采用轮询的方式均匀地将消息分发到不同的分区。
底层统一使用基于Selector的网络客户端实现,结合Java提供的Future实现完整地提供了更加健壮和优雅的生命周期管理。
关键参数
batchsize 我把它列在了首位,因为该参数对于调优producer至关重要。之前提到过新版producer采用分批发送机制,该参数即控制一个batch的大小。默认是16KB
acks 关乎到消息持久性(durability)的一个参数。高吞吐量和高持久性很多时候是相矛盾的,需要先明确我们的目标是什么? 高吞吐量?高持久性?亦或是中等?因此该参数也有对应的三个取值:0, -1和1
lingerms 减少网络IO,节省带宽之用。原理就是把原本需要多次发送的小batch,通过引入延时的方式合并成大batch发送,减少了网络传输的压力,从而提升吞吐量。当然,也会引入延时
compressiontype producer 所使用的压缩器,目前支持gzip, snappy和lz4。压缩是在用户主线程完成的,通常都需要花费大量的CPU时间,但对于减少网络IO来说确实利器。生产环境中可以结合压力测试进行适当配置
maxinflightrequestsperconnection 关乎消息乱序的一个配置参数。它指定了Sender线程在单个Socket连接上能够发送未应答PRODUCE请求的最大请求数。适当增加此值通常会增大吞吐量,从而整体上提升producer的性能。不过笔者始终觉得其效果不如调节batchsize来得明显,所以请谨慎使用。另外如果开启了重试机制,配置该参数大于1可能造成消息发送的乱序(先发送A,然后发送B,但B却先行被broker接收)
retries 重试机制,对于瞬时失败的消息发送,开启重试后KafkaProducer会尝试再次发送消息。对于有强烈无消息丢失需求的用户来说,开启重试机制是必选项。
当用户调用KafkaProducersend(ProducerRecord, Callback)时Kafka内部流程分析:
这是KafkaProducer#send逻辑的第一步,即为待发送消息进行序列化并计算目标分区,如下图所示:
如上图所示,一条所属topic是"test",消息体是"message"的消息被序列化之后结合KafkaProducer缓存的元数据(比如该topic分区数信息等)共同传给后面的Partitioner实现类进行目标分区的计算。
producer创建时会创建一个默认32MB(由buffermemory参数指定)的accumulator缓冲区,专门保存待发送的消息。除了之前在“关键参数”段落中提到的lingerms和batchsize等参数之外,该数据结构中还包含了一个特别重要的集合信息:消息批次信息(batches)。该集合本质上是一个HashMap,里面分别保存了每个topic分区下的batch队列,即前面说的批次是按照topic分区进行分组的。这样发往不同分区的消息保存在对应分区下的batch队列中。举个简单的例子,假设消息M1, M2被发送到test的0分区但属于不同的batch,M3分送到test的1分区,那么batches中包含的信息就是:{"test-0" -> [batch1, batch2], "test-1" -> [batch3]}。
单个topic分区下的batch队列中保存的是若干个消息批次。每个batch中最重要的3个组件包括:
compressor: 负责执行追加写入 *** 作
batch缓冲区:由batchsize参数控制,消息被真正追加写入到的地方
thunks:保存消息回调逻辑的集合
这一步的目的就是将待发送的消息写入消息缓冲池中,具体流程如下图所示:
这一步执行完毕之后理论上讲KafkaProducersend方法就执行完毕了,用户主线程所做的事情就是等待Sender线程发送消息并执行返回结果了。
此时,该Sender线程登场了。严格来说,Sender线程自KafkaProducer创建后就一直都在运行着 。它的工作流程基本上是这样的:
不断轮询缓冲区寻找 已做好发送准备的分区 ;
将轮询获得的各个batch按照目标分区所在的leader broker进行分组;
将分组后的batch通过底层创建的 Socket连接 发送给各个broker;
等待服务器端发送response回来。
为了说明上的方便,我还是基于图的方式来解释Sender线程的工作原理:
上图中Sender线程会发送PRODUCE请求给对应的broker,broker处理完毕之后发送对应的PRODUCE response。一旦Sender线程接收到response将依次(按照消息发送顺序)调用batch中的回调方法,如下图所示:
refer:
>
kafka的配置分为 broker、producter、consumer三个不同的配置
一 BROKER 的全局配置
最为核心的三个配置 brokerid、logdir、zookeeperconnect 。
------------------------------------------- 系统 相关 -------------------------------------------
brokerid =1
logdirs = /tmp/kafka-logs
port =6667
messagemaxbytes =1000000
numnetworkthreads =3
numiothreads =8
backgroundthreads =4
queuedmaxrequests =500
hostname
advertisedhostname
advertisedport
socketsendbufferbytes =1001024
socketreceivebufferbytes =1001024
socketrequestmaxbytes =100 1024 1024
------------------------------------------- LOG 相关 -------------------------------------------
logsegmentbytes =1024 1024 1024
logrollhours =247
logcleanuppolicy = delete
logretentionminutes=7days
指定日志每隔多久检查看是否可以被删除,默认1分钟
logcleanupintervalmins=1
logretentionbytes=-1
logretentioncheckintervalms=5minutes
logcleanerenable=false
logcleanerthreads =1
logcleaneriomaxbytespersecond=None
logcleanerdedupebuffersize=500 1024 1024
logcleaneriobuffersize=5121024
logcleaneriobufferloadfactor =09
logcleanerbackoffms =15000
logcleanermincleanableratio=05
logcleanerdeleteretentionms =1day
logindexsizemaxbytes =10 1024 1024
logindexintervalbytes =4096
logflushintervalmessages=None
logflushschedulerintervalms =3000
logflushintervalms = None
logdeletedelayms =60000
logflushoffsetcheckpointintervalms =60000
------------------------------------------- TOPIC 相关 -------------------------------------------
autocreatetopicsenable =true
defaultreplicationfactor =1
numpartitions =1
实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。
----------------------------------复制(Leader、replicas) 相关 ----------------------------------
controllersockettimeoutms =30000
controllermessagequeuesize=10
replicalagtimemaxms =10000
replicalagmaxmessages =4000
replicasockettimeoutms=301000
replicasocketreceivebufferbytes=641024
replicafetchmaxbytes =10241024
replicafetchwaitmaxms =500
replicafetchminbytes =1
numreplicafetchers=1
replicahighwatermarkcheckpointintervalms =5000
controlledshutdownenable =false
controlledshutdownmaxretries =3
controlledshutdownretrybackoffms =5000
autoleaderrebalanceenable =false
leaderimbalanceperbrokerpercentage =10
leaderimbalancecheckintervalseconds =300
offsetmetadatamaxbytes
----------------------------------ZooKeeper 相关----------------------------------
zookeeperconnect = localhost:2181
zookeepersessiontimeoutms=6000
zookeeperconnectiontimeoutms =6000
zookeepersynctimems =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topicssh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config maxmessagebytes=64000--config flushmessages=1
修改配置
bin/kafka-topicssh --zookeeper localhost:2181--alter --topic my-topic --config maxmessagebytes=128000
删除配置 :
bin/kafka-topicssh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig maxmessagebytes
二 CONSUMER 配置
最为核心的配置是groupid、zookeeperconnect
groupid
consumerid
clientid = group id value
zookeeperconnect=localhost:2182
zookeepersessiontimeoutms =6000
zookeeperconnectiontimeoutms =6000
zookeepersynctimems =2000
autooffsetreset = largest
sockettimeoutms=301000
socketreceivebufferbytes=641024
fetchmessagemaxbytes =10241024
autocommitenable =true
autocommitintervalms =601000
queuedmaxmessagechunks =10
rebalancemaxretries =4
rebalancebackoffms =2000
refreshleaderbackoffms
fetchminbytes =1
fetchwaitmaxms =100
consumertimeoutms = -1
三 PRODUCER 的配置
比较核心的配置:metadatabrokerlist、requestrequiredacks、producertype、serializerclass
metadatabrokerlist
requestrequiredacks =0
requesttimeoutms =10000
sendbufferbytes=1001024
keyserializerclass
partitionerclass=kafkaproducerDefaultPartitioner
compressioncodec = none
compressedtopics=null
messagesendmaxretries =3
retrybackoffms =100
topicmetadatarefreshintervalms =6001000
clientid=""
------------------------------------------- 消息模式 相关 -------------------------------------------
producertype=sync
queuebufferingmaxms =5000
queuebufferingmaxmessages =10000
queueenqueuetimeoutms = -1
batchnummessages=200
serializerclass= kafkaserializerDefaultEncoder
消费者负责从订阅的主题上拉取消息,消费组是逻辑概念。一个消费者只属于一个消费组,一个消费组包一个或多个消费者。当消息发布到主题后,会被投递到每个消费组,但每个消费组中只有一个消费者能消费给消息。
消费者如何知道该消费哪个分区?当消费组内消费者个数发生变化时,分区分配是如何变化的呢?
按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配, 以保证分区尽可能均匀地分配给所有的消费者。对于 每一个主题 该策略会将消费组内所有的消费者按照名称的字典序排序然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1分区,后面的每个消费者分配n个分区。
如图所示主题中共有7个分区,此时消费组内只有一个消费者C0,C0订阅7个分区。
随着消费组内消费者不断加入,分区逐渐从C0分配到C1~C6,当最后一个消费者C7加入后,此时总共有8个消费者但是只有7个分区,因此C7由于分配不到分区而无法消费任何消息。
消费者并非越多越好,消费者数量应小于等于分区数量,否则会造成资源的浪费
缺点:
当一个消费组订阅两个分别包含四个分区的主题时,分区分配结果如下,比较均匀。
但当两个主题各有3个分区时,则会出现如下分区不均的问题。类似情况扩大的话,可能出现消费者过载问题。
将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式将分区依次分配给每个消费者。如果消费组内消费者的订阅信息都是相同的,那么分区分配会比较均匀。如一个消费组两个消费者,分别订阅两个都有3的分区的主题,如图。
但是当消费组内消费者的订阅信息不同时,则会出现分配不均问题。如图,假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,分区结果将会如下图所示。
后来引入的策略,主要目的:
假设三个消费者,订阅了4个主题,每个主题有两个分区,那么初始分区分配结果如下:
乍一看,跟RoundRobin分配策略结果相同,但此时如果C1下线,那么消费组会执行再均衡 *** 作,重新分配消息分区。如果是RoundRobin策略,分配结果如下:
而如果是Sticky分配策略,则结果如下:
StickyAssignor保留了上一次对C0和C2的分配结果,将C1的分区分配给C0和C2使其均衡。
如果发生分区重分配,那么对于同一个分区而 ,有可能之前的消费者和新指派的消费者不是同一个,之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,造成重复消费。StickyAssignor分配策略如同其名称中的"sticky"一 样,让分配策略具备的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。
再来看下,消费者订阅信息不相同的情况,拿RoundRobinAssignor中的实例来说。
假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,RoundRobinAssignor分区结果将会如下图所示。
而采用StickyAssignor时,分区分配结果如下:
若此时C0下线,RoundRobinAssignor重分配的结果如下:
而StickyAssignor重分配结果如下:
综上:
StickyAssignor分配策略的优点就是可以使分区重分配具备 “黏性”,减少不必要的分区移动(一个分区剥离之前的消费者 ,转而分配给另一个新的消费者)。
Kafka中的消息消费是基于拉模式。
Kafka每次拉取一组消息,每条消息的格式如下:
在每次拉取方法时,它返回的是还没有被消费过的消息集。要实现这个功能,就需要知道上次消费时的消费位移,消费者在消费完消息后要进行消费位移提交动作,且消费位移要进行持久化,消费位移保存在__consumer_offsets主题中。
当前拉取消息的最大offset为x,消费者消费完成提交位移的是offset其实为x+1,表示下次拉取消息的起始位置。
自动提交
默认采用自动提交,默认每隔5s会将拉取到的每个分区的最大的消息位移进行提交。真正的提交动作是在拉取消息的逻辑完成,每次拉取消息前会判断是否可以进行位移提交,如果可以则提交上一次的位移。这里会有两个问题,如下图所示。
重复消费:当前拉取消息x+2,x+7,当前消费到X+5,在提交消费位移前,消费者宕机;新的消费者还是会从X+2开始拉取消息, 因此导致重复消费。
消息丢失:当前拉取消息x+2,x+7,当前消费X+5,到下次拉取的时候,消费位移已经提交为X+8,若此时消费者宕机,新的消费者会从X+8处开始消费,导致X+5 ~ X+7的消息没有被消费,导致消息的丢失。
手动提交
同步提交和异步提交。
同步提交默认提交本次拉取分区消息的最大偏移量,如本次拉取X+2,X+7的消息,同步提交默认提交X+8的位置;当时同步提交也可指定提交的偏移量,比如消费一条提交1次,因为提交本身为同步 *** 作,所以会耗费一定的性能。
同步提交也会导致重复消费的问题,如消费完成后,提交前消费者宕机。
异步提交消费者线程不会被阻塞,使性能得到增强,但异步提交失败重试可能会导致提交位移被覆盖的问题,如本次异步提交offset=X失败,下次异步提交offset=X+y成功;此时前一次提交重试再次提交offset=x,如果业务上没有重试校验,会导致offset被覆盖,最终导致重复消费。
当新的消费组建立、消费者订阅新的主题或之前提交的位移信息因为过期被删除等,此时查不到纪录的消费位移。Kafka可配置从最新或从最早处开始消费。
Kafka还支持从特定位移处开始消费,可以实现回溯消费,Kafka内部提供了Seek()方法,来重置消费位移。
当需要回溯指定时间后的消息时,可先用offsetsForTimes方法查到指定时间后第一条消息的位移,然后再用seek重置位移。
分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除或添加消费者。
Kfaka提供了组协调器(GroupCoordinator)和消费者协调器(ConsumerCoordinator),前者负责管理消费组,后者负责与前者交互,两者最重要的职责就是负责再均衡的 *** 作。
举例说明,当消费者加入消费组时,消费者、消费组和组协调器之间一般会经历以下几个阶段。
第一阶段(FIND COORDINATOR)
消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker并创建与该broker 相互通信的网络连接。
消费者会向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器。
Kafka根据请求中的coordinator_key(也就是groupld )的哈希值计算__consumer_offsets中的分区编号,如下图所示。找到对应的分区之后,在寻找此分区leader副本所在的broker节点,该节点即为当前消费组所在的组协调器节点。
消费组最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给该broker节点。该broker节点既扮演GroupCoordinato的角色又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。
第二阶段(JOIN GROUP)
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的 消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。
组协调器内部主要做了以下几件事:
选举消费组的leader
如果当前组内没有leader,那么第一个加入消费组的则为leader。如果leader挂掉,组协调器会从内部维护的HashMap(消费者信息,key为member_id)中选择第一个key作为新的leader。
选举分区分配策略
前面说的每个消费者可能会上报多个分区分配策略,选举过程如下:
第三阶段(SYNC GROUP)
leader消费者根据在第二阶段中得到的分区分配策略来实施分区分配,然后将分配结果同步到组协调器。各个消费者会向组协调器发送SyncGroupRequest请求来同步分配方案。
请求结构如图,leader发送的请求才会有group_assignment。
其中包含了各个消费者对应的具体分配方案,member_id表示消费者的唯一标识,而 member_assignment是与消费者对应的分配方案,如图
消费者收到具体的分区分配方案后,会开启心跳任务,定期向组协调器发送心跳请求确定彼此在线。
第四阶段(HEARTBEAT)
在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交成功,那么消费者会请求获取上次提交的消费位移并从此处继续消费。
心跳线程是一个独立的线程,可以在轮询消息的空档发送。如果消费者停发送心跳的时间足够长,组协调器会认为这个消费者已经死亡,则触发一次再均衡行为。
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member This means that the time between subsequent calls to poll() was longer than the configured sessiontimeoutms, which typically implies that the poll loop is spending too much time message processing You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with maxpollrecords
造成的问题:假如consumerproperties配置中maxpollrecords=40 (一次最多拉取40条数据) sessiontimeoutms=30000 (会话时间)
假设kafka此时一次拉取了40条数据,但在处理第31条的时候抛出了如上的异常,就会导致,本次offset不会提交,完了这40条消息都会在接下来的某刻被再次消费,这其中就包含了其实已经消费了的30条数据
原因:the poll loop is spending too much time message processing, the time between subsequent calls to poll() was longer than the configured sessiontimeoutms,好吧其实是一个意思!
意思就是说poll下来数据后,处理这些数据的时间比 sessiontimeoutms配置的时间要长,从而导致the group has already rebalanced
解决办法是最后一句话:You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with maxpollrecords
即要不增大 sessiontimeoutms,要不减小maxpollrecords ,至于具体配置为多少,得看你处理一条消息花费多长时间 x,需要满足 x乘以maxpollrecords < sessiontimeoutms
另一种解决思路:
解决此类重复消费的方式:将能够唯一标识消息的信息存储在其他系统,比如redis,什么能够唯一标识消息呢?就是consumergroup+topic+partition+offset,更准确的应该是consumergroup+" "+topic+" "+partition+"_"+offset组成的key,value可以是处理时间存放在redis中,每次处理kafka消息时先从redis中根据key获取value,如果value为空,则表明该消息是第一次被消费,不为空则表示时已经被消费过的消息;
参考: >
以上就是关于Kafka中的索引机制全部的内容,包括:Kafka中的索引机制、kafka 架构及其原理、kafka架构详解等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)