一次golang sarama kafka内存占用大的排查经历

一次golang sarama kafka内存占用大的排查经历,第1张

环境:

现象:golang微服务内存占用超过1G,查看日志发现大量kafka相关错误日志,继而查看kafka集群,其中一个kafka节点容器挂掉了。
疑问 为什么kafka集群只有一个broker挂了,客户端就大量报错呢

通过beego admin页面获取 mem-1memprof

可以看到调用栈为 withRecover > backgroundMetadataUpdataer > refreshMeaatdata > RefreshMetada > tryRefreshMetadata >

sarama-cluster: NewClient

为什么kafka集群只有一个broker,但是NewClient确失败了?
在kafka容器里查看topic, 发现Replicas和Isr只有一个,找到kafka官方配置说明,自动生成的topic需要配置defaultreplicationfactor这个参数,才会生成3副本。

Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。
每个主题⼜可以分为⼀个或多个分区
每个分区各⾃存在⼀个记录消息数据的日志文件。

图中,创建了⼀个 tp_demo_01 主题,其存在6个 Parition,对应的每个Parition下存在⼀个 [Topic-Parition] 命名的消息⽇志⽂件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区日志文件中,你会发现很多类型的⽂件,比如: index、timestamp、log、snapshot 等。
其中,文件名⼀致的⽂件集合就称为 LogSement。

当满⾜如下⼏个条件中的其中之⼀,就会触发文件的切分:

偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。
文件:
查看⼀个topic分区目录下的内容,发现有log、index和timeindex三个⽂件:

创建主题:

创建消息⽂件:

将⽂本消息⽣产到主题中:

查看存储⽂件:

如果想查看这些文件,可以使⽤kafka提供的shell来完成,几个关键信息如下:
(1)offset是逐渐增加的整数,每个offset对应⼀个消息的偏移量。
(2)position:消息批字节数,用于计算物理地址。
(3)CreateTime:时间戳。
(4)magic:2代表这个消息类型是V2,如果是0则代表是V0类型,1代表V1类型。
(5)compresscodec:None说明没有指定压缩类型,kafka目前提供了4种可选择,0-None、1-GZIP、2-snappy、3-lz4。
(6)crc:对所有字段进行校验后的crc值。

在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 01100 以后,消息元数据中存在若⼲的时间戳信息。如果 broker 端参数logmessagetimestamptype 设置为 LogAppendTIme ,那么时间戳必定能保持单调增⻓。反之如果是CreateTime 则⽆法保证顺序。
注意:timestamp文件中的 offset 与 index ⽂件中的 relativeOffset 不是⼀⼀对应的。因为数据的写⼊是各自追加。
思考:如何查看偏移量为23的消息?
Kafka 中存在⼀个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在00000000000000000000index ,通过二分法在偏移量索引文件中找到不⼤于 23 的最⼤索引项,即 offset 20 那栏,然后从⽇志分段⽂件中的物理位置为320 开始顺序查找偏移量为 23 的消息。

在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引⽂件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 01100 以后,消息信息中存在若⼲的时间戳信息。
如果 broker 端参数logmessagetimestamptype 设置为 LogAppendTIme ,那么时间戳必定能保持单调增长。反之如果是CreateTime 则无法保证顺序。
通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。
时间戳索引索引格式:前⼋个字节表示时间戳,后四个字节表示偏移量。

思考:查找时间戳为 1557554753430 开始的消息?

Kafka 提供两种⽇志清理策略:
⽇志删除:按照⼀定的删除策略,将不满⾜条件的数据进⾏数据删除
⽇志压缩:针对每个消息的 Key 进⾏整合,对于有相同 Key 的不同 Value 值,只保留最后⼀个版本。
Kafka 提供 logcleanuppolicy 参数进⾏相应配置,默认值: delete ,还可以选择 compact 。
主题级别的配置项是 cleanuppolicy 。

基于时间
⽇志删除任务会根据 logretentionhours/logretentionminutes/logretentionms 设定⽇志保留的
时间节点。如果超过该设定值,就需要进⾏删除。默认是 7 天, logretentionms 优先级最⾼。
Kafka 依据⽇志分段中最⼤的时间戳进⾏定位。
⾸先要查询该⽇志分段所对应的时间戳索引⽂件,查找时间戳索引⽂件中最后⼀条索引项,若最后⼀条索引项的时间戳字段值⼤于 0,则取该值,否则取最近修改时间。
为什么不直接选最近修改时间呢?
因为日志文件可以有意⽆意的被修改,并不能真实的反应日志分段的最⼤时间信息。
删除过程

⽇志压缩是Kafka的⼀种机制,可以提供较为细粒度的记录保留,⽽不是基于粗粒度的基于时间的保留。
对于具有相同的Key,⽽数据不同,只保留最后⼀条数据,前⾯的数据在合适的情况下删除。

⽇志压缩特性,就实时计算来说,可以在异常容灾⽅⾯有很好的应⽤途径。⽐如,我们在Spark、Flink中做实时
计算时,需要⻓期在内存⾥⾯维护⼀些数据,这些数据可能是通过聚合了⼀天或者⼀周的⽇志得到的,这些数据⼀旦
由于异常因素(内存、⽹络、磁盘等)崩溃了,从头开始计算需要很⻓的时间。⼀个⽐较有效可⾏的⽅式就是定时将
内存⾥的数据备份到外部存储介质中,当崩溃出现时,再从外部存储介质中恢复并继续计算。
使⽤⽇志压缩来替代这些外部存储有哪些优势及好处呢?这⾥为⼤家列举并总结了⼏点:
Kafka即是数据源⼜是存储⼯具,可以简化技术栈,降低维护成本
使⽤外部存储介质的话,需要将存储的Key记录下来,恢复的时候再使⽤这些Key将数据取回,实现起来有⼀定的⼯程难度和复杂度。使⽤Kafka的⽇志压缩特性,只需要把数据写进Kafka,等异常出现恢复任务时再读
回到内存就可以了
Kafka对于磁盘的读写做了⼤量的优化⼯作,⽐如磁盘顺序读写。相对于外部存储介质没有索引查询等⼯作
量的负担,可以实现⾼性能。同时,Kafka的⽇志压缩机制可以充分利⽤廉价的磁盘,不⽤依赖昂贵的内存
来处理,在性能相似的情况下,实现⾮常⾼的性价⽐(这个观点仅仅针对于异常处理和容灾的场景来说)

主题的 cleanuppolicy 需要设置为compact。
Kafka的后台线程会定时将Topic遍历两次:

Kafka最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大特性就是可以实时处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低时延的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务水平和最关键指标之一。

基本工作流程如上图所示,其中:

我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候 永远的找leader ,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图:

发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证 同一分区 内的数据是有序的!写入示意图如下:

上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:

熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:

保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为 0 1 all

最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的 *** 作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。

前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含index文件、log文件、timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:

无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?

需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!

消息存储在log文件后,消费者就可以进行消费了。在讲消息队列通信的两种模式的时候讲到过点对点模式和发布订阅模式。Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是 找leader 去拉取。

多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!如下图:

图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议 消费者组的consumer的数量与partition的数量一致

kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性且无论任何OS下,对文件系统本身的优化是非常艰难的文件缓存/直接内存映射等是常用的手段因为kafka是对日志文件进行append *** 作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升

除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息不过消息量的大小可以通过配置文件来指定对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy)

其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑可以将任何在网络上传输的消息都经过压缩kafka支持gzip/snappy等多种压缩方式

kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息) 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层"

异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。

其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态这就要求JMS broker需要太多额外的工作在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset由此可见,consumer客户端也很轻量级。

kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息是批量(通常以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险

Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。

最少1次:可能会重传数据,有可能出现数据被重复处理的情况;

最多1次:可能会出现数据丢失情况;

恰好1次:并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。

at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是"at most once"

at least once: 消费者fetch消息,然后处理消息,然后保存offset如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset *** 作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once"

"Kafka Cluster"到消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:

最少1次+消费者的输出中额外增加已处理消息最大编号:由于已处理消息最大编号的存在,不会出现重复处理消息的情况。

kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可

选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader

每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置每个partition在物理存储层面,有多个log file组成(称为segment)segment file的命名为"最小offset"kafka例如"00000000000kafka";其中"最小offset"表示此segment中起始消息的offset

获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数)根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可

kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)

Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除

Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode

Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡"一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上

Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息

Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费

Partition Owner registry: 用来标记partition正在被哪个consumer消费临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)

当consumer启动时,所触发的 *** 作:

A) 首先进行"Consumer id Registry";

B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡(比如一个consumer失效,那么其他consumer接管partitions)

C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance

总结:

Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。

如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leaderKafka并不是使用这种方法。

Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leaderISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR这种leader的选择方式是非常快速的,适合kafka的应用场景。

一个邪恶的想法:如果所有节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都down了,这个就不能保证了。

实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:

这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。

这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。

以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader

优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller

对于某个分区来说,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会完全复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。一般情况下,一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区+备份分区”的总数量,关于这个配置,不同主题可以有不同的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通信。

Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topic配置副本的数量。Kafka会自动在每个副本上备份数据,所以当一个节点down掉时数据依然是可用的。

Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。

创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers所有的读写 *** 作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。

许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:

符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数replicalagmaxmessages决定的,怎样算是卡住了,怎是由参数replicalagtimemaxms决定的。

只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数acks决定的。

Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。

Kafka is a distributed, partitioned, replicated commit log service It provides the functionality of a messaging system, but with a unique design(Kafka是一个分布式的、可分区的(partitioned)、基于备份的(replicated)和commit-log存储的服务。它提供了类似于messaging system的特性,但是在设计实现上完全不同)。kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:(1)、通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
(2)、高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
(3)、支持通过kafka服务器和消费机集群来分区消息。
(4)、支持Hadoop并行数据加载。
一、用Kafka里面自带的脚本进行编译
下载好了Kafka源码,里面自带了一个gradlew的脚本,我们可以利用这个编译Kafka源码:

Producer: 生产者,发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka 服务器上。

Consumer: 消费者,接受消息的一方。消费者连接到 Kafka 服务器上并接收消息,进而进行相应的业务逻辑处理。

Consumer Group: 消费者集合,一个消费者组可以包含一个或者多个消费者。使用多分区 + 多消费者的方式,可以极大提高下游系统处理速度。同一消费者组中的消费者不会重复消费消息,不同的消费者组之间不会互相影响,都能收到全部消息。kafka就是通过消费组来实现P2P模式和广播模式的。

Broker: Kafka 服务器。

Topic: Kafka中的消息维度,一个Topic类似一个queue。生产者将消息发送到特定的Topic,消费者通过Topic进行消费。

Partition: 分区,分区是属于Topic逻辑概念下的一个分区,每个分区只属于一个Topic,一个Topic通常有多个分区,每个分区包含的消息是不同的,分区在存储层面可以看做一个可追加的日志文件,消息在被追加到分区日志文件时,会分配一个特定的便宜了(offset)。

Offset: 分区中的消息的唯一标识,用它来保证消息在分区内的顺序性,offset不跨分区,也就是说,Kafka保证消息在分区内的有序性,不保证消息在Topic下的有序性

Replication: 副本,是Kafka保证数据高可用的方式。同一Partition的数据可以在多个Broker(kafka服务器)上存在多个副本,通常只有主副本提供读写服务,当主副本发生故障,Kafka会在Controller的管理下,选择新的副本作为主副本提供读写服务

Follower: 从副本,相对于主副本,从副本只同步主副本数据,不提供读写服务。

Record: 写入kafka中的消息,每个消息包含了key、value和timestamp。

生产者-消费者是一种设计模式,是在生产者和消费者之间添加一个中间件来达到解耦的目的。

Zookeeper是一个成熟的分布式协调服务,它可以为分布式服务提供分布式配置服务、同步服务和命名注册等能力。任何分布式服务都需要一种协调任务的方法,Kafka使用Zookeeper来进行任务协调,也有一些其他技术具有自己的内置任务协调机制。

Kafka将Broker、Topic和Partitin的元数据存储在Zookeeper上。

Kafka使用Zookeeper完成以下功能:

Controller是从Broker中选举出来的,负责分区 Leader 和 Follower 的管理。当某个分区的 leader 副本发生变化,由Controller负责为该分区选举新的 leader 副本。当某个分区的同步副本集合发生变化时,由Controller负责通知所有Broker更新元数据信息。

Controller的选举依赖Zookeeper,成功竞选为控制器的Broker会在Zookeeper中创建一个/controller临时节点。

选举过程: Broker首先尝试读取/controller节点中的brokerid值,如果brokerid值不为-1,表示已经存在Broker当选Controller,否则尝试创建/controller节点,创建成功后将当前brokerid写入/controller节点,作为 activeControllerId

主要职责: controller选举出来作为整个Broker集群的管理者,管理所有集群信息和元数据。

Kafka 的网络通信模型是基于 NIO 的Reactor 多线程模型来设计的。其中包含一个Acceptor线程用于处理连接,多个 Processor 线程 select 和 read socket 请求,一个Processor 由包含多个 Handler 线程处理请求并响应。

顺序写:

零拷贝:

PageCache: producer 生成消息到 Broker 时,Broker 会使用 pwrite() 系统调用,按偏移量写入数据。写入时,会先写入 page cache。Consumer 消费消息时,Broker会使用sendfile() 系统调用,零拷贝的将数据从 page cache 传输到 Broker 的 Socket Buffer,通过网络传输。因此当Kafka的生产速率和消费速率相差不大时,就能几乎只靠 page cache 的读写完成整个生产-消费过程,磁盘访问非常少

网络模型: Kafka基于NIO,采用Reactor线程模型,实现了自己的RPC通信。 一个Acceptor线程处理新的连接,多个Processor线程select 和 read socket请求,多个Handler线程处理请求并响应(I/O多路复用)。

批量与压缩: Kafka Producer 向 Broker 发送消息不是一条一条发送,而是按批发送。且roducer、Broker 和 Consumer 使用相同的压缩算法,在 producer 向 Broker 写入数据,Consumer 向 Broker 读取数据时甚至可以不用解压缩,最终在 Consumer Poll 到消息时才解压,这样节省了大量的网络和磁盘开销。

分区并发: Kafka 的 Topic 可以分成多个 Partition,每个 Paritition 类似于一个队列,保证数据有序。同一个 Group 下的不同 Consumer 并发消费 Paritition,分区实际上是调优 Kafka 并行度的最小单元,因此,可以说,每增加一个 Paritition 就增加了一个消费并发。

文件结构:

Kafka 消息是以 Topic 为单位进行归类,各个 Topic 之间是彼此独立的,互不影响。每个 Topic 又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。

Kafka 每个分区日志在物理上实际按大小被分成多个 Segment。

index 采用稀疏索引,这样每个 index 文件大小有限,Kafka 采用mmap的方式,直接将 index 文件映射到内存,这样对 index 的 *** 作就不需要 *** 作磁盘 IO。

Kafka 充分利用二分法来查找对应 offset 的消息位置

和其他消息队列相比,Kafka的优势在哪里?

队列模型了解吗?Kafka 的消息模型知道吗?

Kafka 如何保证消息不重复消费?

kafka出现消息重复消费的原因:

解决方案:

参考1: Kafka性能篇:为何Kafka这么"快"?

参考2: Kafka原理篇:图解kakfa架构原理

kafka producer将消息发送给broker后,消息日志会被存储在broker的磁盘上,采用顺序写入的方式。顺序写可以加快磁盘访问速度,并且可以将将多个小型的逻辑写合并成一次大型的物理磁盘写入,官方数据显示顺序写比随机写入快6000倍以上。另外, *** 作系统使用内存对磁盘进行缓存即pagecache,pagecache完全由 *** 作系统管理,这也使得写数据变得即简洁也快速。
配置中可以调整过期时间,超过改时间的消息日志将移除,默认值为7天;也可配置文件大小阈值,达到该阈值后,从最旧消息开始删除。配置项为:

从文件到套接字的常见数据传输路径有4步:
1) *** 作系统从磁盘读取数据到内核空间的 pagecache
2)应用程序读取内核空间的数据到用户空间的缓冲区
3)应用程序将数据(用户空间的缓冲区)写回内核空间到套接字缓冲区(内核空间)
4) *** 作系统将数据从套接字缓冲区(内核空间)复制到通过网络发送的 NIC 缓冲区

kafka使用 producer ,broker 和 consumer 都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。kafka采用Linux 中系统调用sendfile的方式,直接将数据从 pagecache 转移到 socket 网络连接中。这种零拷贝方式使得kafka数据传输更加高效。

以前面文章中安装的kafka为例: Mac 安装kafka
kafka本地文件存储目录可以在配置文件serverproperties中设置,参数及默认值为:

进入该目录,可以看到kafka保存的cosumer offset和topic消息:

其中__consumer_offsets开头的为消费的offset信息,test1开头的即为之前创建的topic “test1”,该topic有三个分区,分区编号从0开始,分别是test1-0、test1-1、test1-2。
进入test1-0,查看包含文件如下:

可以看到kafka消息按partition存储的,每个partition一个目录。partition下消息分段(segment)存储,默认每段最大1G,通过参数logsegmentbytes可配置。segment包含索引文件index、消息文件log,分别存储消息的索引和内容,以index和log结尾,文件命名为当前segment第一个消息offset。index文件在log每隔一定数据量之间建立索引,可以通过参数indexintervalbytes配置。
通过kafka命令查看00000000000000000000index内容如下:

00000000000000000000log内容如下:

其中索引文件中包含两个字段:(offset,position),分别表示消息offset和该消息在log文件的偏移量。如上图中offset=0的消息对应的position=0;对应的就是00000000000000000000log中的第一条消息:

其中payload为具体的消息内容。
另外里面还有一个以"timeindex"结尾的文件,查看其内容:

该日志文件是kafka01011加入的,其中保存的为:(消息时间戳,offset)。时间戳是该segment最后一个消息对应的时间戳(与log文件中最后一条记录时间戳一致),kafka也支持根据时间来读取消息。

由上可知消息是按partition来存储的,partition可以配置n个副本followers。多个partition和其follower在broker上是怎么分配的呢?
partition和broker都进行了排序,下标从0开始;
假设有k个broker,第i个partition被分配到到 i%k 个broker上;
第i%k个broker即为partition i 的leader,负责这个partition的读写
partition的followers也进行排序,从leader的后续broker开始分配,第i个partition的第j个副本broker为 (j+ i%k)%k。
一个有3个broker的kafka集群,包含3个partition,每个partition副本数为1的topic如下图:

总结:
kafka将消息日志采用顺序写入的方式存放在broker磁盘中;数据传输通过系统调用sendfile零拷贝方式;消息日志分段存放,可配置清除时间或大小阈值;每段包含消息索引、消息内容两个文件,通过索引实现快速查找;按照/topic/partition的目录结构分开存储,且均匀分布到集群各broker上。

参考:
>1、2Rx8表示内存是双面8颗内存颗粒,R是英语Row(排)的意思,诸如此类还有:2Rx16是双面16颗;1Rx8是单面8颗,一般说来,双面的兼容性好些,单面的超频性能好一点。
2、PC3-10600S说明该内存为DDR3,后面的10600S表明内存频率为1333MB。
3、09-10-f2表示批次前面09-10应该是产品设计日期为2009年10月。
4、M471B5673FHO-CH9有点复杂,M是Memory的缩写实际上就是内存;4代表SODIMM表示是笔记本内存(3代表台式机);71这两个字符表示数据位宽及内存模组类型;B表示这是DDR3的,DDR2的是“T”;56两个字符表示内存数据深度,这是构成内存容量大。

# kafka

springkafkabootstrap-servers=101257041:9092,101257035:9092,101257036:9092

#client-id

springkafkaclient-id=group1
生产者参数

# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。

# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。

# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应

springkafkaproduceracks=1

#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。

springkafkaproducerbatch-size=16384

# 发生错误后,消息重发的次数。

springkafkaproducerretries=3

# 设置生产者内存缓冲区的大小。

springkafkaproducerbuffer-memory=33554432

springkafkaproducerkey-serializer=orgapachekafkacommonserializationStringSerializer

springkafkaproducervalue-serializer=orgapachekafkacommonserializationStringSerializer
消费者参数

# 自动提交的时间间隔

springkafkaconsumerauto-commit-interval=1000

# offset的消费位置

springkafkaconsumerauto-offset-reset=latest

# 是否自动提交

springkafkaconsumerenable-auto-commit=false

# 最大拉取间隔时间

springkafkaconsumermaxpollintervalms =600000

# 会话超时时间

springkafkaconsumersessiontimeoutms =10000

springkafkaconsumerkey-deserializer=orgapachekafkacommonserializationStringDeserializer

springkafkaconsumervalue-deserializer=orgapachekafkacommonserializationStringDeserializer

# 消费组名称

springkafkaconsumergroupId=dmsdecision

# 最大拉取条数

springkafkaconsumermax-poll-records=30

# 心跳时间

springkafkaconsumerheartbeat-interval=3000

# kafka springkafkapropertiesparsefileContainerFactory_concurrency监听线程数未设置时,本参数生效

springkafkalistenerconcurrency=30

#MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgmentacknowledge()后提交

#MANUAL_IMMEDIATE 手动调用Acknowledgmentacknowledge()后立即提交

#RECORD 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交

#BATCH 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交

#TIME 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交

#COUNT 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交

#COUNT_TIME TIME或COUNT 有一个条件满足时提交

# ack_mode为COUNT/COUNT_TIME 时配置

springkafkalistenerack-mode=manual_immediate

# ack_mode为COUNT/COUNT_TIME 时配置

springkafkalistenerack-count=

# ack_mode为/COUNT_TIME 时配置

springkafkalistenerack-time=

# poll拉取数据超时时间

springkafkalistenerpoll-timeout=


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/yw/13329730.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-07-16
下一篇 2023-07-16

发表评论

登录后才能评论

评论列表(0条)

保存