Kafka数据消费

Kafka数据消费,第1张

消费者负责从订阅的主题上拉取消息,消费组是逻辑概念。一个消费者只属于一个消费组,一个消费组包一个或多个消费者。当消息发布到主题后,会被投递到每个消费组,但每个消费组中只有一个消费者能消费给消息。

消费者如何知道该消费哪个分区?当消费组内消费者个数发生变化时,分区分配是如何变化的呢?

按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配, 以保证分区尽可能均匀地分配给所有的消费者。对于 每一个主题 该策略会将消费组内所有的消费者按照名称的字典序排序然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1分区,后面的每个消费者分配n个分区。

如图所示主题中共有7个分区,此时消费组内只有一个消费者C0,C0订阅7个分区。

随着消费组内消费者不断加入,分区逐渐从C0分配到C1~C6,当最后一个消费者C7加入后,此时总共有8个消费者但是只有7个分区,因此C7由于分配不到分区而无法消费任何消息。

消费者并非越多越好,消费者数量应小于等于分区数量,否则会造成资源的浪费

缺点:

当一个消费组订阅两个分别包含四个分区的主题时,分区分配结果如下,比较均匀。

但当两个主题各有3个分区时,则会出现如下分区不均的问题。类似情况扩大的话,可能出现消费者过载问题。

将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式将分区依次分配给每个消费者。如果消费组内消费者的订阅信息都是相同的,那么分区分配会比较均匀。如一个消费组两个消费者,分别订阅两个都有3的分区的主题,如图。

但是当消费组内消费者的订阅信息不同时,则会出现分配不均问题。如图,假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,分区结果将会如下图所示。

后来引入的策略,主要目的:

假设三个消费者,订阅了4个主题,每个主题有两个分区,那么初始分区分配结果如下:

乍一看,跟RoundRobin分配策略结果相同,但此时如果C1下线,那么消费组会执行再均衡 *** 作,重新分配消息分区。如果是RoundRobin策略,分配结果如下:

而如果是Sticky分配策略,则结果如下:

StickyAssignor保留了上一次对C0和C2的分配结果,将C1的分区分配给C0和C2使其均衡。

如果发生分区重分配,那么对于同一个分区而 ,有可能之前的消费者和新指派的消费者不是同一个,之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,造成重复消费。StickyAssignor分配策略如同其名称中的"sticky"一 样,让分配策略具备的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。

再来看下,消费者订阅信息不相同的情况,拿RoundRobinAssignor中的实例来说。

假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,RoundRobinAssignor分区结果将会如下图所示。

而采用StickyAssignor时,分区分配结果如下:

若此时C0下线,RoundRobinAssignor重分配的结果如下:

而StickyAssignor重分配结果如下:

综上:

StickyAssignor分配策略的优点就是可以使分区重分配具备 “黏性”,减少不必要的分区移动(一个分区剥离之前的消费者 ,转而分配给另一个新的消费者)。

Kafka中的消息消费是基于拉模式。

Kafka每次拉取一组消息,每条消息的格式如下:

在每次拉取方法时,它返回的是还没有被消费过的消息集。要实现这个功能,就需要知道上次消费时的消费位移,消费者在消费完消息后要进行消费位移提交动作,且消费位移要进行持久化,消费位移保存在__consumer_offsets主题中。

当前拉取消息的最大offset为x,消费者消费完成提交位移的是offset其实为x+1,表示下次拉取消息的起始位置。

自动提交

默认采用自动提交,默认每隔5s会将拉取到的每个分区的最大的消息位移进行提交。真正的提交动作是在拉取消息的逻辑完成,每次拉取消息前会判断是否可以进行位移提交,如果可以则提交上一次的位移。这里会有两个问题,如下图所示。

重复消费:当前拉取消息x+2,x+7,当前消费到X+5,在提交消费位移前,消费者宕机;新的消费者还是会从X+2开始拉取消息, 因此导致重复消费。

消息丢失:当前拉取消息x+2,x+7,当前消费X+5,到下次拉取的时候,消费位移已经提交为X+8,若此时消费者宕机,新的消费者会从X+8处开始消费,导致X+5 ~ X+7的消息没有被消费,导致消息的丢失。

手动提交

同步提交和异步提交。

同步提交默认提交本次拉取分区消息的最大偏移量,如本次拉取X+2,X+7的消息,同步提交默认提交X+8的位置;当时同步提交也可指定提交的偏移量,比如消费一条提交1次,因为提交本身为同步 *** 作,所以会耗费一定的性能。

同步提交也会导致重复消费的问题,如消费完成后,提交前消费者宕机。

异步提交消费者线程不会被阻塞,使性能得到增强,但异步提交失败重试可能会导致提交位移被覆盖的问题,如本次异步提交offset=X失败,下次异步提交offset=X+y成功;此时前一次提交重试再次提交offset=x,如果业务上没有重试校验,会导致offset被覆盖,最终导致重复消费。

当新的消费组建立、消费者订阅新的主题或之前提交的位移信息因为过期被删除等,此时查不到纪录的消费位移。Kafka可配置从最新或从最早处开始消费。

Kafka还支持从特定位移处开始消费,可以实现回溯消费,Kafka内部提供了Seek()方法,来重置消费位移。

当需要回溯指定时间后的消息时,可先用offsetsForTimes方法查到指定时间后第一条消息的位移,然后再用seek重置位移。

分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除或添加消费者。

Kfaka提供了组协调器(GroupCoordinator)和消费者协调器(ConsumerCoordinator),前者负责管理消费组,后者负责与前者交互,两者最重要的职责就是负责再均衡的 *** 作。

举例说明,当消费者加入消费组时,消费者、消费组和组协调器之间一般会经历以下几个阶段。

第一阶段(FIND COORDINATOR)

消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker并创建与该broker 相互通信的网络连接。

消费者会向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器。

Kafka根据请求中的coordinator_key(也就是groupld )的哈希值计算__consumer_offsets中的分区编号,如下图所示。找到对应的分区之后,在寻找此分区leader副本所在的broker节点,该节点即为当前消费组所在的组协调器节点。

消费组最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给该broker节点。该broker节点既扮演GroupCoordinato的角色又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。

第二阶段(JOIN GROUP)

在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的 消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

组协调器内部主要做了以下几件事:

选举消费组的leader

如果当前组内没有leader,那么第一个加入消费组的则为leader。如果leader挂掉,组协调器会从内部维护的HashMap(消费者信息,key为member_id)中选择第一个key作为新的leader。

选举分区分配策略

前面说的每个消费者可能会上报多个分区分配策略,选举过程如下:

第三阶段(SYNC GROUP)

leader消费者根据在第二阶段中得到的分区分配策略来实施分区分配,然后将分配结果同步到组协调器。各个消费者会向组协调器发送SyncGroupRequest请求来同步分配方案。

请求结构如图,leader发送的请求才会有group_assignment。

其中包含了各个消费者对应的具体分配方案,member_id表示消费者的唯一标识,而 member_assignment是与消费者对应的分配方案,如图

消费者收到具体的分区分配方案后,会开启心跳任务,定期向组协调器发送心跳请求确定彼此在线。

第四阶段(HEARTBEAT)

在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交成功,那么消费者会请求获取上次提交的消费位移并从此处继续消费。

心跳线程是一个独立的线程,可以在轮询消息的空档发送。如果消费者停发送心跳的时间足够长,组协调器会认为这个消费者已经死亡,则触发一次再均衡行为。

1 日志存储格式

最新版本的kafka日志是以批为单位进行日志存储的,所谓的批指的是kafka会将多条日志压缩到同一个batch中,然后以batch为单位进行后续的诸如索引的创建和消息的查询等工作。

对于每个批次而言,其默认大小为4KB,并且保存了整个批次的起始位移和时间戳等元数据信息,而对于每条消息而言,其位移和时间戳等元数据存储的则是相对于整个批次的元数据的增量,通过这种方式,kafka能够减少每条消息中数据占用的磁盘空间。

这里我们首先展示一下每个批次的数据格式:

图中消息批次的每个元数据都有固定的长度大小,而只有最后面的消息个数的是可变的。如下是batch中主要的属性的含义:

起始位移:占用8字节,其存储了当前batch中第一条消息的位移;

长度:占用了4个字节,其存储了整个batch所占用的磁盘空间的大小,通过该字段,kafka在进行消息遍历的时候,可以快速的跳跃到下一个batch进行数据读取;

分区leader版本号:记录了当前消息所在分区的leader的服务器版本,主要用于进行一些数据版本的校验和转换工作;

CRC:对当前整个batch的数据的CRC校验码,主要是用于对数据进行差错校验的;

属性:占用2个字节,这个字段的最低3位记录了当前batch中消息的压缩方式,现在主要有GZIP、LZ4和Snappy三种。第4位记录了时间戳的类型,第5和6位记录了新版本引入的事务类型和控制类型;

最大位移增量:最新的消息的位移相对于第一条消息的唯一增量;

起始时间戳:占用8个字节,记录了batch中第一条消息的时间戳;

最大时间戳:占用8个字节,记录了batch中最新的一条消息的时间戳;

PID、producer epoch和起始序列号:这三个参数主要是为了实现事务和幂等性而使用的,其中PID和producer epoch用于确定当前producer是否合法,而起始序列号则主要用于进行消息的幂等校验;

消息个数:占用4个字节,记录当前batch中所有消息的个数;

通过上面的介绍可以看出,每个batch的头部数据中占用的字节数固定为61个字节,可变部分主要是与具体的消息有关,下面我们来看一下batch中每条消息的格式:

这里的消息的头部数据就与batch的大不相同,可以看到,其大部分数据的长度都是可变的。既然是可变的,这里我们需要强调两个问题:

1、对于数字的存储,kafka采用的是Zig-Zag的存储方式,也即负数并不会使用补码的方式进行编码,而是将其转换为对应的正整数,比如-1映射为1、1映射为2、-2映射为3、2映射为4,关系图如下所示:

通过图可以看出,在对数据反编码的时候,我们只需要将对应的整数转换成其原始值即可;

2、在使用Zig-Zag编码方式的时候,每个字节最大为128,而其中一半要存储正数,一半要存储负数,还有一个0,也就是说每个字节能够表示的最大整数为64,此时如果有大于64的数字,kafka就会使用多个字节进行存储。

而这多个字节的表征方式是通过将每个字节的最大位作为保留位来实现的,如果最高位为1,则表示需要与后续字节共同表征目标数字,如果最高位为0,则表示当前位即可表示目标数字。

kafka使用这种编码方式的优点在于,大部分的数据增量都是非常小的数字,因此使用一个字节即可保存,这比直接使用原始类型的数据要节约大概七倍的内存。

对于上面的每条消息的格式,除了消息key和value相关的字段,其还有属性字段和header,属性字段的主要作用是存储当前消息key和value的压缩方式,而header则供给用户进行添加一些动态的属性,从而实现一些定制化的工作。

通过对kafka消息日志的存储格式我们可以看出,其使用batch的方式将一些公共信息进行提取,从而保证其只需要存储一份,虽然看起来每个batch的头部信息比较多,但其平摊到每条消息上之后使用的字节更少了;

在消息层面,kafka使用了数据增量的方式和Zig-Zag编码方式对数据进行的压缩,从而极大地减少其占用的字节数。总体而言,这种存储方式极大的减少了kafka占用的磁盘空间大小。

2 日志存储方式

在使用kafka时,消息都是推送到某个topic中,然后由producer计算当前消息会发送到哪个partition,在partition中,kafka会为每条消息设置一个偏移量,也就是说,如果要唯一定位一条消息,使用<topic, partition, offset>三元组即可。

基于kafka的架构模式,其会将各个分区平均分配到每个broker中,也就是说每个broker会被分配用来提供一个或多个分区的日志存储服务。在broker服务器上,kafka的日志也是按照partition进行存储的,其会在指定的日志存储目录中为每个topic的partition分别创建一个目录,目录中存储的就是这些分区的日志数据,而目录的名称则会以<topic-patition>的格式进行创建。如下是kafka日志的存储目录示意图:

这里我们需要注意的是,图中对于分区日志的存储,当前broker只会存储分配给其的分区的日志,比如图中的connect-status就只有分区1和分区4的目录,而没有分区2和分区3的目录,这是因为这些分区被分配在了集群的其他节点上。

在每个分区日志目录中,存在有三种类型的日志文件,即后缀分别为log、index和timeindex的文件。其中log文件就是真正存储消息日志的文件,index文件存储的是消息的位移索引数据,而timeindex文件则存储的是时间索引数据。

如下图所示为一个分区的消息日志数据:

从图中可以看出,每种类型的日志文件都是分段的,这里关于分段的规则主要有如下几点需要说明:

在为日志进行分段时,每个文件的文件名都是以该段中第一条消息的位移的偏移量来命名的;

kafka会在每个log文件的大小达到1G的时候关闭该文件,而新开一个文件进行数据的写入。可以看到,图中除了最新的log文件外,其余的log文件的大小都是1G;

对于index文件和timeindex文件,在每个log文件进行分段之后,这两个索引文件也会进行分段,这也就是它们的文件名与log文件一致的原因;

kafka日志的留存时间默认是7天,也就是说,kafka会删除存储时间超过7天的日志,但是对于某些文件,其部分日志存储时间未达到7天,部分达到了7天,此时还是会保留该文件,直至其所有的消息都超过留存时间;

3 索引文件

kafka主要有两种类型的索引文件:位移索引文件和时间戳索引文件。位移索引文件中存储的是消息的位移与该位移所对应的消息的物理地址;时间戳索引文件中则存储的是消息的时间戳与该消息的位移值。

也就是说,如果需要通过时间戳查询消息记录,那么其首先会通过时间戳索引文件查询该时间戳对应的位移值,然后通过位移值在位移索引文件中查询消息具体的物理地址。关于位移索引文件,这里有两点需要说明:

1、由于kafka消息都是以batch的形式进行存储,因而索引文件中索引元素的最小单元是batch,也就是说,通过位移索引文件能够定位到消息所在的batch,而没法定位到消息在batch中的具体位置,查找消息的时候,还需要进一步对batch进行遍历;

2、位移索引文件中记录的位移值并不是消息真正的位移值,而是该位移相对于该位移索引文件的起始位移的偏移量,通过这种方式能够极大的减小位移索引文件的大小。

如下图所示为一个位移索引文件的格式示意图:

如下则是具体的位移索引文件的示例:

关于时间戳索引文件,由于时间戳的变化比位移的变化幅度要大一些,其即使采用了增量的方式存储时间戳索引,但也没法有效地使用Zig-Zag方式对数据进行编码,因而时间戳索引文件是直接存储的消息的时间戳数据,

但是对于时间戳索引文件中存储的位移数据,由于其变化幅度不大,因而其还是使用相对位移的方式进行的存储,并且这种存储方式也可以直接映射到位移索引文件中而无需进行计算。如下图所示为时间戳索引文件的格式图:

如下则是时间戳索引文件的一个存储示例:

可以看到,如果需要通过时间戳来定位消息,就需要首先在时间戳索引文件中定位到具体的位移,然后通过位移在位移索引文件中定位到消息的具体物理地址。

4 日志压缩

所谓的日志压缩功能,其主要是针对这样的场景的,比如对某个用户的邮箱数据进行修改,其总共修改了三次,修改过程如下:

email=john@gmailcom

email=john@yahoocomcn

email=john@163com

在这么进行修改之后,很明显,我们主要需要关心的是最后一次修改,因为其是最终数据记录,但是如果我们按顺序处理上述消息,则需要处理三次消息。

kafka的日志压缩就是为了解决这个问题而存在的,对于使用相同key的消息,其会只保留最新的一条消息的记录,而中间过程的消息都会被kafka cleaner给清理掉。

但是需要注意的是,kafka并不会清理当前处于活跃状态的日志文件中的消息记录。所谓当前处于活跃状态的日志文件,也就是当前正在写入数据的日志文件。如下图所示为一个kafka进行日志压缩的示例图:

图中K1的数据有V1、V3和V4,经过压缩之后只有V4保留了下来,K2的数据则有V2、V6和V10,压缩之后也只有V10保留了下来;同理可推断其他的Key的数据。

另外需要注意的是,kafka开启日志压缩使用的是logcleanuppolicy,其默认值为delete,也即我们正常使用的策略,如果将其设置为compaction,则开启了日志压缩策略,但是需要注意的是,开启了日志压缩策略并不代表kafka会清理历史数据,只有将logcleanerenable设置为true才会定时清理历史数据。

在kafka中,其本身也在使用日志压缩策略,主要体现在kafka消息的偏移量存储。在旧版本中,kafka将每个consumer分组当前消费的偏移量信息保存在zookeeper中,但是由于zookeeper是一款分布式协调工具,其对于读 *** 作具有非常高的性能,但是对于写 *** 作性能比较低,而consumer的位移提交动作是非常频繁的,这势必会导致zookeeper成为kafka消息消费的瓶颈。

因而在最新版本中,kafka将分组消费的位移数据存储在了一个特殊的topic中,即__consumer_offsets,由于每个分组group的位移信息都会提交到该topic,因而kafka默认为其设置了非常多的分区,也即50个分区。

另外,consumer在提交位移时,使用的key为groupId+topic+partition,而值则为当前提交的位移,也就是说,对于每一个分组所消费的topic的partition,其都只会保留最新的位移。如果consumer需要读取位移,那么只需要按照上述格式组装key,然后在该topic中读取最新的消息数据即可。

kafka消息在分区中是按序一条一条存储的,假如分区中有10条消息,位移就是0-9,

consumer消费了5条消息,那么offset就是5,指向了下一条要消费的记录,consumer

需要向kafka汇报自己的位移数据,因为consumer是能够消费多个分区的,所以offset

的粒度是分区,consumer需要为分配给他的各分区分别提交offset信息。

从用户的角度来说,位移提交分为自动提交和手动提交,在consumer的角度来说,位移

分为同步提交和异步提交。

kafka内部有个topic叫 ‘__consumer_offsets’,offset提交就是往这个topic发送一条消息,

消息格式是key value形式,key是由 groupId、主题名、分区号组成,消息体是位移值

及用户自定义数据和时间戳等。还有2种特殊的格式,一种是用于保存 Consumer Group 

信息的消息,用于注册group,另一种是 用于删除 Group 过期位移和删除 Group 的消息。

当kafka集群种第一台consumer启动时,便会创建__consumer_offsets主题,默认50个

分区和3个副本。

当提交方式是自动提交时,就算是当前consumer的offset已经不更新,kafka还是会自动

定期的往__consumer_offsets发送位移消息,所以得对位移主题的消息做定期删除,

假如对于同一个key有2条A和B,A早于B发送,那么A就是属于过期消息。

compact有点类似jvm gc的标记-整理,把过期消息删掉,把剩下的消息排列在一起

Kafka 提供了专门的后台线程定期地巡检待Compact 的主题,看看是否存在满足条件的

可删除数据,这个线程叫Log Cleaner,当我们发现位移主题日志过多的时候,可以

检查一下是否是这个线程挂了导致的

enableautocommit 默认即是true,

autocommitintervalms 默认是5秒,表示kafka每5秒自动提交一次位移信息。

自动提交会有消息重复消费的问题,因为他是间隔时间提交一次,假如在间隔期间,

发生了Rebalance ,在Rebalance 之后所有的消费者必须从当前最新的offset开始

继续消费,那么上一次自动提交到Rebalance 的这段时间消费的数据的位移并没有

提交,所以会重复消费,即时我们通过减少 autocommitintervalms 的值来提高提交频率,

那也仅仅是缩小了重复消费的时间窗口,所以我们看看能不能通过手动提交来避免重复消费。

commitSync()是consumer的同步api,手动提交的好处自然是我们可以控制提交的时机

和频率,由于是同步api,是会阻塞至broker返回结果才会结束这个阻塞状态,对于系统

而言,自然不想发生这种不是由于资源的限制导致的阻塞。

commitAsync()是consumer的异步api,commitAsync()不会阻塞,因此不会影响consumer的

tps,但是他的问题在于他无法重试,因为是异步提交,当因为网络或者系统资源阻塞

导致提交失败,那么他重试的时候,在这期间,consumer可能已经消费好多条消息

并且提交了,所以此时的重试提交的offset已然不是最新值了并没有意义,我们可以通过

异步和同步提交相结合,我们使用同步提交来规避因为网络问题或者broker端的gc导致的

这种瞬时的提交失败,进而通过重试机制而提交offset,使用异步提交来规避提交时的阻塞

前面的commitSync()和commitAsync(),都是consumer poll消息,把这些消息消费完,

再提交最新的offset,如果poll的消息很多呢?消费时间较长,假如中间系统宕机,岂不是

得从头再来一遍,所以kafka提供分段提交的api

commitSync(Map<TopicPartition, OffsetAndMetadata>) 

 commitAsync(Map<TopicPartition, OffsetAndMetadata>)

假设我们poll了一秒钟的数据,有5000条,我们可以通过计数器,累计到100条,

便通过分段提交api向kafka提交一次offset。

当你编写kafka Producer时, 会生成KeyedMessage对象。KeyedMessage<K, V> keyedMessage = new KeyedMessage<>(topicName, key, message)

这里的key值可以为空,在这种情况下, kafka会将这个消息发送到哪个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区:

The third property "partitionerclass" defines what class to use to determine which Partition in the Topic the message is to be sent to This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme More about the implementation of this class later If you include a value for the key but haven't defined a partitionerclass Kafka will use the default partitioner If the key is null, then the Producer will assign the message to a random Partition

但是这句话相当的误导人。

(一)消费者和消费者组

1、消费者:订阅并消费kafka消息,从属于消费者组

2、消费者组:一个群组里的消费者订阅的是同一个主题,每个消费者接受主题一部分分区的消息。

注:同一个消费者可以消费不同的partition,但是同一个partition不能被不同消费者消费。

(二)消费者群组和分区再均衡

1、再均衡:分区的消费所有权从一个消费者转移到另一个消费者称为再均衡,为消费者组带来了高可用性和可伸缩性。

注:分区何时重新分配:加入消费者或者消费者崩溃等

2、如何判断消费者崩溃:消费者通过向群组协调器(某broker,不同群组可以有不同的群组协调器)发送心跳(一般在拉取消息或者提交偏移量的时候)表示自己仍旧存活,如果长时间不发送心跳则协调器认为期死亡并进行再均衡。

注:在0101版本中,心跳行为不再和获取消息和提交偏移量绑定在一起,有一个单独的心跳线程。

3、分配分区:消费者加入消费者组是,会像群组协调器发送请求,第一个加入的成为“群主”。群主从协调器那里获取成员列表,并负责给每一个消费者分配分区。完毕之后,将分配结果发送给协调器,协调器再将消息发送给所有的消费者,每个消费者只能看到自己的分配信息。只有群主知道所有的消费信息。

(三)参数配置

1、bootstrapserver:host:port

2、keyserializer:键序列化器

3、valueserializer:值序列化器

注:以上为必须设置的

4、groupid:从属的消费者组

5、fetchminbytes:消费者从服务器获取记录的最小字节数。

6、fetchmaxwaitms:消费者等待消费消息的最大时间

7、maxpartitionfetchbytes:服务器从每个分区返回给消费者的最大字节数(需要比broker的设置maxmessagesize属性配置大,否则有些消息无法消费)

8、sessiontimeoutms:指定该消费者在被认为死亡之前可以与服务器断开连接的时间,默认3秒

9、heartbeatintervalms:制定了poll方法向协调器发送心跳的频率。

注:一般9是8的三分之一

10、autooffsetreset:消费者在读取一个没有偏移量分区或者无效偏移量分区的情况下如何处理(latest:从最新记录开始读取,earliest:从最早的记录开始读取)

11、enableauthcommit:消费者是否自动提交偏移量,默认为true

12、autocommitintervalms:自动提交偏移量的时间间隔

13、partitionassignmentstrategy:分区分配给消费者的策略:

(1)range:会把主题若干个连续分区分配给消费者

(2)roundRobin:会把主题的所有分区逐个分配给消费者

14、clientid:任意字符串,broker用来区分客户端发来的消息

15:maxpollrecords:控制poll方法返回的最大记录数

16:receivebufferbytes/sendbufferbytes:tcp缓冲池读写大小

(四)订阅主题

consumersubscribe(list)

(五)轮训(消费者API的核心)

1、轮训作用: 只要消费者订阅了主题,轮训就会处理所有的细节(群组协调、分区再均衡、发送心跳、获取数据)

(1)获取数据

(2)第一次执行poll时,负责查找协调器,然后加入群组,接受分配的分区

(3)心跳的发送

(4)再均衡也是在轮训期间进行的

2、方法:poll(),消费者缓冲区没有数据时会发生阻塞,可以传一个阻塞时间,避免无限等待。0表示立即返回。

3、关闭:close(),网络连接随之关闭,立即触发再均衡。

4、线程安全:无法让一个线程运行多个消费者,也无法让多个线程公用一个消费者。

(六)提交和偏移量

1、提交:更新分区当前位置的 *** 作

2、如何提交:消费者往一个特殊主题(_consumer_offset)发送消息,消息中包含每个分区中的偏移量。

3、偏移量:分区数据被消费的位置。

4、偏移量作用:当发生再均衡时,消费者可能会分配到不一样的分区,为了继续工作,消费者需要读取到每个分区最后一次提交的偏移量,然后从偏移量的地方继续处理。

5、提交偏移量的方式

(1)自动提交:经过一个时间间隔,提交上一次poll方法返回的偏移量。每次轮训都会检测是否应该提交偏移量。缺陷:可能导致重复消费

(2)手动提交:commitSysn()提交迁移量,最简单也最可靠,提交由poll方法返回的最新偏移量。缺点:忘了提交可能会丢数据,再均衡可能会重复消费

(3)异步提交:同步提交在提交过程中必须阻塞

(4)同步异步提交组合

(5)提交特定的偏移量

(七)再均衡监听器

(八)从特定偏移量读取数据(seek)

1、从分区开始:seekToBegining

2、从分区结束:seekToEnd

3、ConsumerRebalanceListener和seek结合使用

(九)如何退出

1、前言:wakeup方法是唯一安全退出轮训的方法,从poll方法中退出并抛出wakeupException异常。如果没有碰上轮训,则在下一次poll调用时抛出。

2、退出轮训

(1)另一个线程调用consumerwakeup方法

(2)如果循环在主线程里可以在ShutdownHook里面调用该方法

3、退出之前调用close方法:告知协调器自己要离开,出发再均衡,不必等到超时。

(十)独立消费者(assign为自己分配分区)

1、根据分区申请内存块

2、构造batch对象加入Dequeue

3、将key、value记录到buffer中

4、sender线程发送批次

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member This means that the time between subsequent calls to poll() was longer than the configured sessiontimeoutms, which typically implies that the poll loop is spending too much time message processing You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with maxpollrecords

造成的问题:假如consumerproperties配置中maxpollrecords=40 (一次最多拉取40条数据) sessiontimeoutms=30000 (会话时间)

假设kafka此时一次拉取了40条数据,但在处理第31条的时候抛出了如上的异常,就会导致,本次offset不会提交,完了这40条消息都会在接下来的某刻被再次消费,这其中就包含了其实已经消费了的30条数据

原因:the poll loop is spending too much time message processing, the time between subsequent calls to poll() was longer than the configured sessiontimeoutms,好吧其实是一个意思!

意思就是说poll下来数据后,处理这些数据的时间比 sessiontimeoutms配置的时间要长,从而导致the group has already rebalanced

解决办法是最后一句话:You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with maxpollrecords

即要不增大 sessiontimeoutms,要不减小maxpollrecords ,至于具体配置为多少,得看你处理一条消息花费多长时间 x,需要满足 x乘以maxpollrecords < sessiontimeoutms

另一种解决思路:

解决此类重复消费的方式:将能够唯一标识消息的信息存储在其他系统,比如redis,什么能够唯一标识消息呢?就是consumergroup+topic+partition+offset,更准确的应该是consumergroup+" "+topic+" "+partition+"_"+offset组成的key,value可以是处理时间存放在redis中,每次处理kafka消息时先从redis中根据key获取value,如果value为空,则表明该消息是第一次被消费,不为空则表示时已经被消费过的消息;

参考: >

在kafka中,每个日志分段文件都对应了两个索引文件—— 偏移量索引文件和时间戳索引文件 (还有其它的诸如事务日志索引文件就不细表了),主要用来 提高查找消息的效率

偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。

Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。

每当写入一定量 (由 broker 端参数 logindexintervalbytes 指定,默认值为 4096,即 4KB) 的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小 logindexintervalbytes 的值,对应地可以缩小或增加索引项的密度。

稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。

偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。

时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。

稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。

以偏移量索引文件来做具体分析。偏移量索引项的格式如下图所示。

每个索引项占用 8 个字节,分为两个部分:

(1) relativeOffset : 相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节,当前索引文件的文件名即为 baseOffset 的值。

(2) position : 物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4 个字节。

消息的偏移量(offset)占用 8 个字节,也可以称为绝对偏移量。

索引项中没有直接使用绝对偏移量而改为只占用 4 个字节的相对偏移量(relativeOffset = offset - baseOffset),这样可以减小索引文件占用的空间。

举个例子,一个日志分段的 baseOffset 为 32,那么其文件名就是 00000000000000000032log,offset 为 35 的消息在索引文件中的 relativeOffset 的值为 35-32=3。

如果我们要查找偏移量为 23 的消息,那么应该怎么做呢首先通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即[22, 656],然后从日志分段文件中的物理位置 656 开始顺序查找偏移量为 23 的消息。

以上是最简单的一种情况。参考上图,如果要查找偏移量为 268 的消息,那么应该怎么办呢

首先肯定是定位到baseOffset为251的日志分段,然后计算相对偏移量relativeOffset = 268 - 251 = 17,之后再在对应的索引文件中找到不大于 17 的索引项,最后根据索引项中的 position 定位到具体的日志分段文件位置开始查找目标消息。

那么又是如何查找 baseOffset 为 251 的日志分段的呢

这里并不是顺序查找,而是用了跳跃表的结构。

Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 key,这样可以根据指定偏移量来快速定位到消息所在的日志分段。

在Kafka中要定位一条消息,那么首先根据 offset 从 ConcurrentSkipListMap 中来查找到到对应(baseOffset)日志分段的索引文件,然后读取偏移量索引索引文件,之后使用二分法在偏移量索引文件中找到不大于 offset - baseOffset z的最大索引项,接着再读取日志分段文件并且从日志分段文件中顺序查找relativeOffset对应的消息。

Kafka中通过offset查询消息内容的整个流程我们可以简化成下图:

Kafka中消息的offset可以类比成InnoDB中的主键,前者是通过offset检索出整条Record的数据,后者是通过主键检索出整条Record的数据。

InnoDB中通过主键查询数据内容的整个流程建议简化成下图(下半部分)。

Kafka中通过时间戳索引文件去检索消息的方式可以类比于InnoDB中的辅助索引的检索方式:

前者是通过timestamp去找offset,后者是通过索引去找主键,后面两者的过程就和上面的陈述相同。

Kafka中当有新的索引文件建立的时候ConcurrentSkipListMap才会更新,而不是每次有数据写入时就会更新,这块的维护量基本可以忽略

B+树中数据有插入、更新、删除的时候都需要更新索引,还会引来“页分裂”等相对耗时的 *** 作。Kafka中的索引文件也是顺序追加文件的 *** 作,和B+树比起来工作量要小很多。

说到底还是应用场景不同所决定的。MySQL中需要频繁地执行CRUD的 *** 作,CRUD是MySQL的主要工作内容,而为了支撑这个 *** 作需要使用维护量大很多的B+树去支撑。

Kafka中的消息一般都是顺序写入磁盘,再到从磁盘顺序读出(不深入探讨page cache等),他的主要工作内容就是:写入+读取,很少有检索查询的 *** 作

换句话说, 检索查询只是Kafka的一个辅助功能,不需要为了这个功能而去花费特别太的代价去维护一个高level的索引。

前面也说过,Kafka中的这种方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。

以上就是关于Kafka数据消费全部的内容,包括:Kafka数据消费、kafka低版本的怎么用java查询给定broker上所有的日志目录信息、kafka consumer offset机制等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9704935.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-01
下一篇 2023-05-01

发表评论

登录后才能评论

评论列表(0条)

保存