Kafka的分区机制

Kafka的分区机制,第1张

Kafka在⼀定数量的服务器上对主题分区进⾏复制。
当集群中的⼀个broker宕机后系统可以⾃动故障转移到其他可⽤的副本上,不会造成数据丢失。
--replication-factor 3 1leader+2follower

Follower分区像普通的Kafka消费者⼀样,消费来⾃Leader分区的消息,并将其持久化到⾃⼰的⽇志中。
允许Follower对⽇志条⽬拉取进⾏批处理。
同步节点定义:

下图中
分区P1的Leader是0,ISR是0和1
分区P2的Leader是2,ISR是1和2
分区P3的Leader是1,ISR是0,1,2。

⽣产者和消费者的请求都由Leader副本来处理。Follower副本只负责消费Leader副本的数据和Leader保持同步。
对于P1,如果0宕机会发⽣什么?
Leader副本和Follower副本之间的关系并不是固定不变的,在Leader所在的broker发⽣故障的时候,就需要进⾏
分区的Leader副本和Follower副本之间的切换,需要选举Leader副本。
如何选举?
如果某个分区所在的服务器除了问题,不可⽤,kafka会从该分区的其他的副本中选择⼀个作为新的Leader。之后
所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的Leader。
只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
Kafka会在Zookeeper上针对每个Topic维护⼀个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是
⼀些分区的副本。
只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的⽣产者。
如果这个集合有增减,kafka会更新zookeeper上的记录。
如果某个分区的Leader不可⽤,Kafka就会从ISR集合中选择⼀个副本作为新的Leader。
显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数⽐较⾼。
假设某个topic有N+1个副本,kafka可以容忍N个服务器不可⽤。
为什么不⽤少数服从多数的⽅法
少数服从多数是⼀种⽐较常⻅的⼀致性算发和Leader选举法。
它的含义是只有超过半数的副本同步了,系统才会认为数据已同步;
选择Leader时也是从超过半数的同步的副本中选择。
这种算法需要较⾼的冗余度,跟Kafka⽐起来,浪费资源。
譬如只允许⼀台机器失败,需要有三个副本;⽽如果只容忍两台机器失败,则需要五个副本。
⽽kafka的ISR集合⽅法,分别只需要两个和三个副本。
如果所有的ISR副本都失败了怎么办?
此时有两种⽅法可选,

向已经部署好的Kafka集群⾥⾯添加机器,我们需要从已经部署好的Kafka节点中复制相应的配置⽂件,然后把⾥
⾯的broker id修改成全局唯⼀的,最后启动这个节点即可将它加⼊到现有Kafka集群中。
问题:新添加的Kafka节点并不会⾃动地分配数据,⽆法分担集群的负载,除⾮我们新建⼀个topic。
需要⼿动将部分分区移到新添加的Kafka节点上,Kafka内部提供了相关的⼯具来重新分布某个topic的分区。
在重新分布topic分区之前,我们先来看看现在topic的各个分区的分布位置:

在node11搭建Kafka:
拷⻉JDK并安装

此处不需要zookeeper,切记!!!

让配置⽣效:
/etc/profile
拷⻉node1上安装的Kafka

修改node11上Kafka的配置:

启动Kafka:

注意观察node11上节点启动的时候的ClusterId,看和zookeeper节点上的ClusterId是否⼀致,如果是,证明node11和node1在同⼀个集群中。
node11启动的Cluster ID:

zookeeper节点上的Cluster ID:

然后使⽤ kafka-reassign-partitionssh ⼯具⽣成reassign plan

Proposed partition reassignment configuration下⾯⽣成的就是将分区重新分布到broker 1上的结果。我们将这些内容保存到名为resultjson⽂件⾥⾯(⽂件名不重要,⽂件格式也不⼀定要以json为结尾,只要保证内容是json即可),然后执⾏这些reassign plan:

执⾏计划:

这样Kafka就在执⾏reassign plan,我们可以校验reassign plan是否执⾏完成:

查看主题的细节:

分区的分布的确和 *** 作之前不⼀样了,broker 1上已经有分区分布上去了。使⽤ kafka-reassign�partitionssh ⼯具⽣成的reassign plan只是⼀个建议,⽅便⼤家⽽已。其实我们⾃⼰完全可以编辑⼀个reassignplan,然后执⾏它,如下:

将上⾯的json数据⽂件保存到my-topics-to-executejson⽂件中,然后也是执⾏它:

等这个reassign plan执⾏完,我们再来看看分区的分布:

我们可以在新建主题的时候,⼿动指定主题各个Leader分区以及Follower分区的分配情况,即什么分区副本在哪
个broker节点上。
随着系统的运⾏,broker的宕机重启,会引发Leader分区和Follower分区的⻆⾊转换,最后可能Leader⼤部分都
集中在少数⼏台broker上,由于Leader负责客户端的读写 *** 作,此时集中Leader分区的少数⼏台服务器的⽹络I/O,
CPU,以及内存都会很紧张。
Leader和Follower的⻆⾊转换会引起Leader副本在集群中分布的不均衡,此时我们需要⼀种⼿段,让Leader的分
布重新恢复到⼀个均衡的状态。
执⾏脚本:

上述脚本执⾏的结果是:创建了主题tp_demo_03,有三个分区,每个分区两个副本,Leader副本在列表中第⼀个指定的brokerId上,Follower副本在随后指定的brokerId上。

然后模拟broker0宕机的情况:

是否有⼀种⽅式,可以让Kafka⾃动帮我们进⾏修改?改为初始的副本分配?
此时,⽤到了Kafka提供的⾃动再均衡脚本: kafka-preferred-replica-electionsh
先看介绍:

该⼯具会让每个分区的Leader副本分配在合适的位置,让Leader分区和Follower分区在服务器之间均衡分配。
如果该脚本仅指定zookeeper地址,则会对集群中所有的主题进⾏ *** 作,⾃动再平衡。
具体 *** 作:

执⾏ *** 作:

查看 *** 作的结果:

恢复到最初的分配情况。
之所以是这样的分配,是因为我们在创建主题的时候:

在逗号分割的每个数值对中排在前⾯的是Leader分区,后⾯的是副本分区。那么所谓的preferred replica,就是排在前⾯的数字就是Leader副本应该在的brokerId。

实际项目中,我们可能由于主题的副本因子设置的问题,需要重新设置副本因子。
或者由于集群的扩展,需要重新设置副本因子。
topic⼀旦使用又不能轻易删除重建,因此动态增加副本因子就成为最终的选择。

说明:kafka 10版本配置⽂件默认没有defaultreplicationfactor=x, 因此如果创建topic时,不指定–replication-factor 想, 默认副本因⼦为1 我们可以在⾃⼰的 serverproperties 中配置上常⽤的副本因⼦,省去⼿动调整。例如设置defaultreplicationfactor=3, 详细内容可参考官⽅⽂档 >

详细安装访问: >producer 是生产者,负责消息生产,上游程序中按照标准的消息格式组装(按照每个消息事件的字段定义)发送到指定的topic。producer生产消息的时候,不会因为consumer处理能力不够,而阻塞producer的生产。consumer会从指定的topic 拉取消息,然后处理消费,并提交offset(消息处理偏移量,消费掉的消息并不会主动删除,而是kafka系统根据保存周期自动消除)。

topic是消费分类存储的队列,可以按照消息类型来分topic存储。

replication是topic复制副本个数,用于解决数据丢失,防止leader topic宕机后,其他副本可以快代替。

broker是缓存代理,Kafka集群中的一台或多台服务器统称broker,用来保存producer发送的消息。Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。

partition是topic的物理分组,在创建topic的时候,可以指定partition 数量。每个partition是逻辑有序的,保证每个消息都是顺序插入的,而且每个消息的offset在不同partition的是唯一不同的

偏移量。kafka为每条在分区的消息保存一个偏移量offset,这也是消费者在分区的位置。比如一个偏移量是5的消费者,表示已经消费了从0-4偏移量的消息,下一个要消费的消息的偏移量是5。每次消息处理完后,要么主动提交offset,要么自动提交,把offset偏移到下一位,如处理offset=6消息。在kafka配置中,如果enable_auto_commit=True和auto_commit_interval_ms=xx,那表示每xx 毫秒自动提交偏移量

分组。是指在消费同一topic的不同consumer。每个consumer都有唯一的groupId,同一groupId 属于同一个group。不同groupId的consumer相互不影响。对于一个topic,同一个group的consumer数量不能超过 partition数量。比如,Topic A 有 16个partition,某一个group下有2个consumer,那2个consumer分别消费8个partition,而这个group的consumer数量最多不能超过16个。

kafka的配置主要分四类,分别是zookeeper、server、consumer、producer。其他的配置可以忽略。

zk的配置比较简单,也可以默认不改dataDir是zk存储节点配置的目录地址,clientPort是zk启动的端口,默认2181,maxClientCnxns是限制ip的连接此处,设置0表示无连接次数,一般情况根据业务部署情况,配置合理的值。

建议从头阅读:
银行系统中的消息分发利器Kafka(一)
银行系统中的消息分发利器Kafka(二)

6、Partition
上次我们说到,Kafka可以存储数据,而且数据按照Topic进行分类。
这些存储的数据可能会很大,这可能会给Kafka的Broker带来很大的存储压力。
一个好的解决办法就是把这些数据拆成一个或多个Partition:

然后,把这多个Partition分发到不同的服务器上。
Kafka是一个分布式系统,所以对数据文件的Partition进行分布式管理是很方便的。
随之,另外一个问题来了,我们要把数据分成多少个Partition呢?

在每一个Partition 中,第一个消息的Offset就是0,第二个就是1,以此类推。另外,Offset并不是一个全局的ID,它只作用于所属的Partition。所以,在同一个Partition中,不会有相同的Offset。
结合上面的知识,我们可以知道,如果要在Kafka中定位一个消息信息,就是先找到Topic,然后找到Partition,最后找到Offset。

8、Consumer Group
先把前面的场景复习一下。
首先我们有很多节点的数据要收集,于是我们通过Kafka来实现:

然后我们为每一个节点创建一个Producer:

这时你会发现,处理压力跑到Conumser那里了,于是我们就需要一个Consumer Group了。

Kafka的几个重要的概念就介绍完了。后面我会逐步深入的介绍Kafka的一些细节,欢迎关注~


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/13461876.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-08-11
下一篇 2023-08-11

发表评论

登录后才能评论

评论列表(0条)

保存