消费者负责从订阅的主题上拉取消息,消费组是逻辑概念。一个消费者只属于一个消费组,一个消费组包一个或多个消费者。当消息发布到主题后,会被投递到每个消费组,但每个消费组中只有一个消费者能消费给消息。
消费者如何知道该消费哪个分区?当消费组内消费者个数发生变化时,分区分配是如何变化的呢?
按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配, 以保证分区尽可能均匀地分配给所有的消费者。对于 每一个主题 该策略会将消费组内所有的消费者按照名称的字典序排序然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1分区,后面的每个消费者分配n个分区。
如图所示主题中共有7个分区,此时消费组内只有一个消费者C0,C0订阅7个分区。
随着消费组内消费者不断加入,分区逐渐从C0分配到C1~C6,当最后一个消费者C7加入后,此时总共有8个消费者但是只有7个分区,因此C7由于分配不到分区而无法消费任何消息。
消费者并非越多越好,消费者数量应小于等于分区数量,否则会造成资源的浪费
缺点:
当一个消费组订阅两个分别包含四个分区的主题时,分区分配结果如下,比较均匀。
但当两个主题各有3个分区时,则会出现如下分区不均的问题。类似情况扩大的话,可能出现消费者过载问题。
将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式将分区依次分配给每个消费者。如果消费组内消费者的订阅信息都是相同的,那么分区分配会比较均匀。如一个消费组两个消费者,分别订阅两个都有3的分区的主题,如图。
但是当消费组内消费者的订阅信息不同时,则会出现分配不均问题。如图,假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,分区结果将会如下图所示。
后来引入的策略,主要目的:
假设三个消费者,订阅了4个主题,每个主题有两个分区,那么初始分区分配结果如下:
乍一看,跟RoundRobin分配策略结果相同,但此时如果C1下线,那么消费组会执行再均衡 *** 作,重新分配消息分区。如果是RoundRobin策略,分配结果如下:
而如果是Sticky分配策略,则结果如下:
StickyAssignor保留了上一次对C0和C2的分配结果,将C1的分区分配给C0和C2使其均衡。
如果发生分区重分配,那么对于同一个分区而 ,有可能之前的消费者和新指派的消费者不是同一个,之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,造成重复消费。StickyAssignor分配策略如同其名称中的"sticky"一 样,让分配策略具备的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。
再来看下,消费者订阅信息不相同的情况,拿RoundRobinAssignor中的实例来说。
假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,RoundRobinAssignor分区结果将会如下图所示。
而采用StickyAssignor时,分区分配结果如下:
若此时C0下线,RoundRobinAssignor重分配的结果如下:
而StickyAssignor重分配结果如下:
综上:
StickyAssignor分配策略的优点就是可以使分区重分配具备 “黏性”,减少不必要的分区移动(一个分区剥离之前的消费者 ,转而分配给另一个新的消费者)。
Kafka中的消息消费是基于拉模式。
Kafka每次拉取一组消息,每条消息的格式如下:
在每次拉取方法时,它返回的是还没有被消费过的消息集。要实现这个功能,就需要知道上次消费时的消费位移,消费者在消费完消息后要进行消费位移提交动作,且消费位移要进行持久化,消费位移保存在__consumer_offsets主题中。
当前拉取消息的最大offset为x,消费者消费完成提交位移的是offset其实为x+1,表示下次拉取消息的起始位置。
自动提交
默认采用自动提交,默认每隔5s会将拉取到的每个分区的最大的消息位移进行提交。真正的提交动作是在拉取消息的逻辑完成,每次拉取消息前会判断是否可以进行位移提交,如果可以则提交上一次的位移。这里会有两个问题,如下图所示。
重复消费:当前拉取消息x+2,x+7,当前消费到X+5,在提交消费位移前,消费者宕机;新的消费者还是会从X+2开始拉取消息, 因此导致重复消费。
消息丢失:当前拉取消息x+2,x+7,当前消费X+5,到下次拉取的时候,消费位移已经提交为X+8,若此时消费者宕机,新的消费者会从X+8处开始消费,导致X+5 ~ X+7的消息没有被消费,导致消息的丢失。
手动提交
同步提交和异步提交。
同步提交默认提交本次拉取分区消息的最大偏移量,如本次拉取X+2,X+7的消息,同步提交默认提交X+8的位置;当时同步提交也可指定提交的偏移量,比如消费一条提交1次,因为提交本身为同步 *** 作,所以会耗费一定的性能。
同步提交也会导致重复消费的问题,如消费完成后,提交前消费者宕机。
异步提交消费者线程不会被阻塞,使性能得到增强,但异步提交失败重试可能会导致提交位移被覆盖的问题,如本次异步提交offset=X失败,下次异步提交offset=X+y成功;此时前一次提交重试再次提交offset=x,如果业务上没有重试校验,会导致offset被覆盖,最终导致重复消费。
当新的消费组建立、消费者订阅新的主题或之前提交的位移信息因为过期被删除等,此时查不到纪录的消费位移。Kafka可配置从最新或从最早处开始消费。
Kafka还支持从特定位移处开始消费,可以实现回溯消费,Kafka内部提供了Seek()方法,来重置消费位移。
当需要回溯指定时间后的消息时,可先用offsetsForTimes方法查到指定时间后第一条消息的位移,然后再用seek重置位移。
分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除或添加消费者。
Kfaka提供了组协调器(GroupCoordinator)和消费者协调器(ConsumerCoordinator),前者负责管理消费组,后者负责与前者交互,两者最重要的职责就是负责再均衡的 *** 作。
举例说明,当消费者加入消费组时,消费者、消费组和组协调器之间一般会经历以下几个阶段。
第一阶段(FIND COORDINATOR)
消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker并创建与该broker 相互通信的网络连接。
消费者会向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器。
Kafka根据请求中的coordinator_key(也就是groupld )的哈希值计算__consumer_offsets中的分区编号,如下图所示。找到对应的分区之后,在寻找此分区leader副本所在的broker节点,该节点即为当前消费组所在的组协调器节点。
消费组最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给该broker节点。该broker节点既扮演GroupCoordinato的角色又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。
第二阶段(JOIN GROUP)
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的 消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。
组协调器内部主要做了以下几件事:
选举消费组的leader
如果当前组内没有leader,那么第一个加入消费组的则为leader。如果leader挂掉,组协调器会从内部维护的HashMap(消费者信息,key为member_id)中选择第一个key作为新的leader。
选举分区分配策略
前面说的每个消费者可能会上报多个分区分配策略,选举过程如下:
第三阶段(SYNC GROUP)
leader消费者根据在第二阶段中得到的分区分配策略来实施分区分配,然后将分配结果同步到组协调器。各个消费者会向组协调器发送SyncGroupRequest请求来同步分配方案。
请求结构如图,leader发送的请求才会有group_assignment。
其中包含了各个消费者对应的具体分配方案,member_id表示消费者的唯一标识,而 member_assignment是与消费者对应的分配方案,如图
消费者收到具体的分区分配方案后,会开启心跳任务,定期向组协调器发送心跳请求确定彼此在线。
第四阶段(HEARTBEAT)
在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交成功,那么消费者会请求获取上次提交的消费位移并从此处继续消费。
心跳线程是一个独立的线程,可以在轮询消息的空档发送。如果消费者停发送心跳的时间足够长,组协调器会认为这个消费者已经死亡,则触发一次再均衡行为。
Kafka最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大特性就是可以实时处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低时延的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务水平和最关键指标之一。
基本工作流程如上图所示,其中:
我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候 永远的找leader ,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图:
发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证 同一分区 内的数据是有序的!写入示意图如下:
上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:
保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为 0 、 1 、 all 。
最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。
Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的 *** 作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含index文件、log文件、timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!
消息存储在log文件后,消费者就可以进行消费了。在讲消息队列通信的两种模式的时候讲到过点对点模式和发布订阅模式。Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是 找leader 去拉取。
多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!如下图:
图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议 消费者组的consumer的数量与partition的数量一致 !
kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性且无论任何OS下,对文件系统本身的优化是非常艰难的文件缓存/直接内存映射等是常用的手段因为kafka是对日志文件进行append *** 作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升
除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息不过消息量的大小可以通过配置文件来指定对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy)
其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑可以将任何在网络上传输的消息都经过压缩kafka支持gzip/snappy等多种压缩方式
kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息) 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层"
异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。
其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态这就要求JMS broker需要太多额外的工作在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset由此可见,consumer客户端也很轻量级。
kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息是批量(通常以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险
Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。
最少1次:可能会重传数据,有可能出现数据被重复处理的情况;
最多1次:可能会出现数据丢失情况;
恰好1次:并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。
at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是"at most once"
at least once: 消费者fetch消息,然后处理消息,然后保存offset如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset *** 作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once"
"Kafka Cluster"到消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:
最少1次+消费者的输出中额外增加已处理消息最大编号:由于已处理消息最大编号的存在,不会出现重复处理消息的情况。
kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可
选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader
每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置每个partition在物理存储层面,有多个log file组成(称为segment)segment file的命名为"最小offset"kafka例如"00000000000kafka";其中"最小offset"表示此segment中起始消息的offset
获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数)根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可
kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)
Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除
Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode
Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡"一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上
Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息
Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费
Partition Owner registry: 用来标记partition正在被哪个consumer消费临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)
当consumer启动时,所触发的 *** 作:
A) 首先进行"Consumer id Registry";
B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡(比如一个consumer失效,那么其他consumer接管partitions)
C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance
总结:
Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leaderKafka并不是使用这种方法。
Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leaderISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR这种leader的选择方式是非常快速的,适合kafka的应用场景。
一个邪恶的想法:如果所有节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都down了,这个就不能保证了。
实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:
这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。
这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。
以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader
优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller
对于某个分区来说,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会完全复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。一般情况下,一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区+备份分区”的总数量,关于这个配置,不同主题可以有不同的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通信。
Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topic配置副本的数量。Kafka会自动在每个副本上备份数据,所以当一个节点down掉时数据依然是可用的。
Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。
创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers所有的读写 *** 作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。
许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数replicalagmaxmessages决定的,怎样算是卡住了,怎是由参数replicalagtimemaxms决定的。
只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数acks决定的。
Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。
我先写了一个kafka的生产者程序,然后写了一个kafka的消费者程序,一切正常。
生产者程序生成5条数据,消费者能够读取到5条数据。然后我将kafka的消费者程序替换成使用spark的读取kafka的程序,重复多次发现每次都是读取1号分区的数据,而其余的0号和2号2个分区的数据都没有读到。请哪位大侠出手帮助一下。
我使用了三台虚拟机slave122,slave123,slave124作为kafka集群和zk集群;然后生产者和消费者程序以及spark消费者程序都是在myeclipse上完成。
软件版本为:kafka_211-01010,spark-streaming-kafka-0-10_211-210,zookeeper-349
spark消费者程序主要代码如下:
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParamsput("bootstrapservers", "slave124:9092,slave122:9092,slave123:9092");
kafkaParamsput("keydeserializer", "orgapachekafkacommonserializationStringDeserializer");
kafkaParamsput("valuedeserializer","orgapachekafkacommonserializationStringDeserializer");
kafkaParamsput("groupid", "ssgroup");
kafkaParamsput("autooffsetreset", "earliest"); //update mykafka,"earliest" from the beginning,"latest" from the rear of topic
kafkaParamsput("enableautocommit", "true"); //messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics
kafkaParamsput("autocommitintervalms", "5000");
// Create a local StreamingContext with two working thread and batch interval of 2 second
SparkConf conf = new SparkConf();
//conf被set后,返回新的SparkConf实例,所以多个set必须连续,不能拆开。
confsetMaster("local[1]")setAppName("streaming word count")setJars(new String[]{"D:\\Workspaces\\MyEclipse 2015\\MyFirstHadoop\\bin\\MyFirstHadoopjar"});;
try{
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durationsseconds(5));
Collection<String> topics = new HashSet<>(ArraysasList("order"));
JavaInputDStream<ConsumerRecord<String, String>> oJInputStream = KafkaUtilscreateDirectStream(
jssc,
LocationStrategiesPreferConsistent(),
ConsumerStrategies<String, String>Subscribe(topics, kafkaParams)
);
JavaPairDStream<String, String> pairs = oJInputStreammapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
try {
BufferedWriter oBWriter = new BufferedWriter(new FileWriter("D:\\Workspaces\\MyEclipse 2015\\MyFirstHadoop\\bin\\mysparkstream\\MyFirstHadoopout",true));
String strLog = "^^^^^^^^^^^ " + SystemcurrentTimeMillis() / 1000 + " mapToPair:topic:" + recordtopic() + ",key:" + recordkey() + ",value:" + recordvalue() + ",partition id:" + recordpartition() + ",offset:" + recordoffset() + "\n";
Systemoutprintln(strLog);
oBWriterwrite(strLog);
oBWriterclose();
} catch (IOException e) {
// TODO Auto-generated catch block
eprintStackTrace();
}
return new Tuple2<>(recordkey(), recordvalue());
}
}
);
pairsprint();
jsscstart(); //start here in fact
jsscawaitTermination();
jsscclose();
}catch(Exception e){
// TODO Auto-generated catch block
Systemoutprintln("Exception:throw one exception");
eprintStackTrace();
}
在kafka中,每个日志分段文件都对应了两个索引文件—— 偏移量索引文件和时间戳索引文件 (还有其它的诸如事务日志索引文件就不细表了),主要用来 提高查找消息的效率 。
偏移量索引文件用来建立消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置;时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。
Kafka 中的索引文件以稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。
每当写入一定量 (由 broker 端参数 logindexintervalbytes 指定,默认值为 4096,即 4KB) 的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳索引项,增大或减小 logindexintervalbytes 的值,对应地可以缩小或增加索引项的密度。
稀疏索引通过 MappedByteBuffer 将索引文件映射到内存中,以加快索引的查询速度。
偏移量索引文件中的偏移量是单调递增的,查询指定偏移量时,使用二分查找法来快速定位偏移量的位置,如果指定的偏移量不在索引文件中,则会返回小于指定偏移量的最大偏移量。
时间戳索引文件中的时间戳也保持严格的单调递增,查询指定时间戳时,也根据二分查找法来查找不大于该时间戳的最大偏移量,至于要找到对应的物理文件位置还需要根据偏移量索引文件来进行再次定位。
稀疏索引的方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。
以偏移量索引文件来做具体分析。偏移量索引项的格式如下图所示。
每个索引项占用 8 个字节,分为两个部分:
(1) relativeOffset : 相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节,当前索引文件的文件名即为 baseOffset 的值。
(2) position : 物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4 个字节。
消息的偏移量(offset)占用 8 个字节,也可以称为绝对偏移量。
索引项中没有直接使用绝对偏移量而改为只占用 4 个字节的相对偏移量(relativeOffset = offset - baseOffset),这样可以减小索引文件占用的空间。
举个例子,一个日志分段的 baseOffset 为 32,那么其文件名就是 00000000000000000032log,offset 为 35 的消息在索引文件中的 relativeOffset 的值为 35-32=3。
如果我们要查找偏移量为 23 的消息,那么应该怎么做呢首先通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即[22, 656],然后从日志分段文件中的物理位置 656 开始顺序查找偏移量为 23 的消息。
以上是最简单的一种情况。参考上图,如果要查找偏移量为 268 的消息,那么应该怎么办呢
首先肯定是定位到baseOffset为251的日志分段,然后计算相对偏移量relativeOffset = 268 - 251 = 17,之后再在对应的索引文件中找到不大于 17 的索引项,最后根据索引项中的 position 定位到具体的日志分段文件位置开始查找目标消息。
那么又是如何查找 baseOffset 为 251 的日志分段的呢
这里并不是顺序查找,而是用了跳跃表的结构。
Kafka 的每个日志对象中使用了 ConcurrentSkipListMap 来保存各个日志分段,每个日志分段的 baseOffset 作为 key,这样可以根据指定偏移量来快速定位到消息所在的日志分段。
在Kafka中要定位一条消息,那么首先根据 offset 从 ConcurrentSkipListMap 中来查找到到对应(baseOffset)日志分段的索引文件,然后读取偏移量索引索引文件,之后使用二分法在偏移量索引文件中找到不大于 offset - baseOffset z的最大索引项,接着再读取日志分段文件并且从日志分段文件中顺序查找relativeOffset对应的消息。
Kafka中通过offset查询消息内容的整个流程我们可以简化成下图:
Kafka中消息的offset可以类比成InnoDB中的主键,前者是通过offset检索出整条Record的数据,后者是通过主键检索出整条Record的数据。
InnoDB中通过主键查询数据内容的整个流程建议简化成下图(下半部分)。
Kafka中通过时间戳索引文件去检索消息的方式可以类比于InnoDB中的辅助索引的检索方式:
前者是通过timestamp去找offset,后者是通过索引去找主键,后面两者的过程就和上面的陈述相同。
Kafka中当有新的索引文件建立的时候ConcurrentSkipListMap才会更新,而不是每次有数据写入时就会更新,这块的维护量基本可以忽略
B+树中数据有插入、更新、删除的时候都需要更新索引,还会引来“页分裂”等相对耗时的 *** 作。Kafka中的索引文件也是顺序追加文件的 *** 作,和B+树比起来工作量要小很多。
说到底还是应用场景不同所决定的。MySQL中需要频繁地执行CRUD的 *** 作,CRUD是MySQL的主要工作内容,而为了支撑这个 *** 作需要使用维护量大很多的B+树去支撑。
Kafka中的消息一般都是顺序写入磁盘,再到从磁盘顺序读出(不深入探讨page cache等),他的主要工作内容就是:写入+读取,很少有检索查询的 *** 作
换句话说, 检索查询只是Kafka的一个辅助功能,不需要为了这个功能而去花费特别太的代价去维护一个高level的索引。
前面也说过,Kafka中的这种方式是在磁盘空间、内存空间、查找时间等多方面之间的一个折中。
有时需要简单的用shell去检查一个topic下边某一个partition的某个offset的消息。
之前一直用 kafkatoolsConsoleConsumer, 用的心酸。
后来发现一个稍强大的工具。
>
Checkpointing 是 Flink 故障恢复的内部机制。一个 checkpoint 就是 Flink应用程序产生的状态的一个副本。如果 Flink 任务发生故障,它会从 checkpoint 中载入之前的状态来恢复任务,就好像故障没有发生一样。
Checkpoints 是 Flink 容错的基础,并且确保了 Flink 流式应用在失败时的完整性。Checkpoints 可以通过 Flink 设置定时触发。
Flink Kafka consumer 使用 Flink 的 checkpoint 机制来存储 Kafka 每个分区的位点到 state。当 Flink 执行 checkpoint 时,Kafka 的每个分区的位点都被存储到 checkpoint 指定的 filesystem 中。Flink 的 checkpoint 机制确保了所有任务算子的状态是一致的,也就是说这些状态具有相同的数据输入。当所有的任务算子成功存储他们自己的状态后,代表一次 checkpoint 的完成。因此,当任务从故障中恢复时,Flink 保证了exactly-once。
下面将一步一步的演示 Flink 是如何通过 checkpoint 来管理 Kafka 的 offset 的。
下面的例子从两个分区的 Kafka topic 中读取数据,每个分区的数据是 “A”, “B”, “C”, ”D”, “E”。假设每个分区都是从 0 开始读取。
假设 Flink Kafka consumer 从分区 0 开始读取数据 “A”,那么此时第一个 consumer 的位点从 0 变成 1。如下图所示。
此时数据 “A” 到达 Flink Job 中的 Map Task。两个 consumer 继续读取数据 (从分区 0 读取数据 “B” ,从分区 1 读取数据 “A”)。 offsets 分别被更新成 2 和 1。与此同时,假设 Flink 从 source 端开始执行 checkpoint。
到这里,Flink Kafka consumer tasks 已经执行了一次快照,offsets也保存到了 state 中(“offset = 2, 1”) 。此时 source tasks 在 数据 “B” 和 “A” 后面,向下游发送一个 checkpoint barrier。checkpoint barriers 是 Flink 用来对齐每个任务算子的 checkpoint,以确保整个 checkpoint 的一致性。分区 1 的数据 “A” 到达 Flink Map Task, 与此同时分区 0 的 consumer 继续读取下一个消息(message “C”)。
Flink Map Task 收到上游两个 source 的 checkpoint barriers 然后开始执行 checkpoint ,把 state 保存到 filesystem。同时,消费者继续从Kafka分区读取更多事件。
假设 Flink Map Task 是 Flink Job 的最末端,那么当它完成 checkpoint 后,就会立马通知 Flink Job Master。当 job 的所有 task 都确认其 state 已经 “checkpointed”,Job Master将完成这次的整个 checkpoint。 之后,checkpoint 可以用于故障恢复。
如果发生故障(例如,worker 挂掉),则所有任务将重启,并且它们的状态将被重置为最近一次的 checkpoint 的状态。 如下图所示。
source 任务将分别从 offset 2 和 1 开始消费。当任务重启完成, 将会正常运行,就像之前没发生故障一样。
PS: 文中提到的 checkpoint 对齐,我说下我的理解,假设一个 Flink Job 有 Source -> Map -> Sink,其中Sink有多个输入。那么当一次checkpoint的 barrier从source发出时,到sink这里,多个输入需要等待其它的输入的barrier已经到达,经过对齐后,sink才会继续处理消息。这里就是exactly-once和at-least-once的区别。
The End
原文链接: How Apache Flink manages Kafka consumer offsets
以上就是关于Kafka数据消费全部的内容,包括:Kafka数据消费、什么是kafka、请教一个关于使用spark 读取kafka只能读取一个分区数据的问题等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)