kafka消费者分区分配策略

kafka消费者分区分配策略,第1张

有两种种策略,range strategy策略和roundRobin strategy策略,默认是range strategy策略。
1range strategy策略
先把主题里的分区按照序号排序,然后把消费者按照字母排序,把分区的总数除以消费者总数得到每个消费者消费的分区数,如果除不断就把分区分配给前面的消费者。
排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C2-1。
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区
2roundRobin strategy策略
把主题和分区组成topicAndPartition列表,再把列表按照hashcode排序,轮询分配给消费者。
加入按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区;
只能通过partitionassignmentstrategy参数选择 range 或 roundrobin。

Kafka到底是个啥?用来干嘛的?

官方定义如下:

翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠!

实时数据处理 ,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。

这些中间件,最大的特点主要有两个:

在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。

但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。

随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。

采用这种模型,本质就是将要推送的数据,不在存放在当前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。

消息中间件 :主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。

应用程序 :只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可!

引入消息中间件之后,整个服务开发会变得更加简单,各负其责。

Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。

LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统, Kafka 由此诞生

在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。

先来看一张图, 下面这张图就是 kafka 生产与消费的核心架构模型

如果你看不懂这些概念没关系,我会带着大家一起梳理一遍!

简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:

与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个分区 Partition 的概念。

这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分配到 5 个分区中,其中每个分区存储 200 条数据。

这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快!

这是 kafka 与其他的消息系统最大的不同!

和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower ,即使主分区挂了,也不会影响服务的正常运行。

那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则:

与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。

这里我们需要重点了解一个名词: 消费组

考虑到多个消费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题!

但是不同的组,可以消费同一个分区的数据!

你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。

但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。

如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。

因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致!

光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。

kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。

zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk

下载zookeeper,并解压文件包

创建数据、日志目录

配置zookeeper

重新配置 dataDir 和 dataLogDir 的存储路径

最后,启动 Zookeeper 服务

到官网 >如何决定kafka集群中topic,partition的数量,这是许多kafka用户经常遇到的问题。本文列举阐述几个重要的决定因素,以提供一些参考。
分区多吞吐量更高
一个话题topic的各个分区partiton之间是并行的。在producer和broker方面,写不同的分区是完全并行的。因此一些昂贵的 *** 作比如压缩,可以获得更多的资源,因为有多个进程。在consumer方面,一个分区的数据可以由一个consumer线程在拉去数据。分区多,并行的consumer(同一个消费组)也可以多。因此通常,分区越多吞吐量越高。
基于吞吐量可以获得一个粗略的计算公式。先测量得到在只有一个分区的情况下,Producer的吞吐量(P)和Consumer的吞吐量(C)。那如果总的目标吞吐量是T的话,max(T/P,T/C)就是需要的最小分区数。在单分区的情况下,Producer的吞吐量可以通过一些配置参数,比如bath的大小、副本的数量、压缩格式、ack类型来测得。而Consumer的吞吐量通常取决于应用程序处理每一天消息逻辑。这些都是需要切合实际测量。
随着时间推移数据量的增长可能会需要增加分区。有一点需要注意的是,Producer者发布消息通过key取哈希后映射分发到一个指定的分区,当分区数发生变化后,会带来key和分区映射关系发生变化。可能某些应用程序依赖key和分区映射关系,映射关系变化了,程序就需要做相应的调整。为了避免这种key和分区关系带来的应用程序修改。所以在分区的时候尽量提前考虑,未来一年或两年的对分区数据量的要求。
除了吞吐量,还有一些其他的因素,在定分区的数目时是值得考虑的。在某些情况下,太多的分区也可能会产生负面影响。
分区多需要的打开的文件句柄也多
每个分区都映射到broker上的一个目录,每个log片段都会有两个文件(一个是索引文件,另一个是实际的数据文件)。分区越多所需要的文件句柄也就越多,可以通过配置 *** 作系统的参数增加打开文件句柄数。
分区多增加了不可用风险
kafka支持主备复制,具备更高的可用性和持久性。一个分区(partition)可以有多个副本,这些副本保存在不同的broker上。每个分区的副本中都会有一个作为Leader。当一个broker失败时,Leader在这台broker上的分区都会变得不可用,kafka会自动移除Leader,再其他副本中选一个作为新的Leader。Producer和Consumer都只会与Leader相连。
一般情况下,当一个broker被正常关机时,controller主动地将Leader从正在关机的broker上移除。移动一个Leader只需要几毫秒。然当broker出现异常导致关机时,不可用会与分区数成正比。假设一个boker上有2000个分区,每个分区有2个副本,那这样一个boker大约有1000个Leader,当boker异常宕机,会同时有1000个分区变得不可用。假设恢复一个分区需要5ms,1000个分区就要5s。
分区越多,在broker异常宕机的情况,恢复所需时间会越长,不可用风险会增加。
分区多会增加点到点的延迟
这个延迟需要体现在两个boker间主备数据同步。在默认情况下,两个boker只有一个线程负责数据的复制。
根据经验,每个boker上的分区限制在100br内(b指集群内boker的数量,r指副本数量)。
分区多会增加客户端的内存消耗
kafka082后有个比较好的特色,新的Producer可以允许用户设置一个缓冲区,缓存一定量的数据。当缓冲区数据到达设定量或者到时间,数据会从缓存区删除发往broker。如果分区很多,每个分区都缓存一定量的数据量在缓冲区,很可能会占用大量的内存,甚至超过系统内存。
Consumer也存在同样的问题,会从每个分区拉一批数据回来,分区越多,所需内存也就越大。
根据经验,应该给每个分区分配至少几十KB的内存。
总结
在通常情况下,增加分区可以提供kafka集群的吞吐量。然而,也应该意识到集群的总分区数或是单台服务器上的分区数过多,会增加不可用及延迟的风险。

生产者向broker发送消息,消费者接收消息,broker是物理概念,部署几个kafka即几个broker,topic是逻辑概念,往topic里发送消息会发送到设置好的几个partion上,每个partion存储作为不同队列存储不同数据,partion有leader和follower备份机制,消息发送时会轮循发送到不同broker的不同partion中,同一消费者只能消费同一分区,通过offset记录消费位置,消费者组可以访问一个topic的不同partion

启动镜像

启动kafka可以带上参数,这样会自动修改kafka里的配置文件(/opt/kafka_版本/conf/serverproperties),否则不带参数需要自己进入进行手动修改 带参数版启动可参考

其中1721703需要改成自己docker的网桥连接地址

查看已启动容器

查看所有容器

启动未启动的容器

进入kafka容器

创建主题

主题和分区可以理解为:topic是逻辑划分,kafka通过topic进行区分消息,topic的数据会被存储到日志中,如果数据量太大可以引入partion(同时提高读写吞吐量)来分段存储数据。其中replication-factor作用是将任意分区复制到broker上,broker是物理概念,部署了一个kafka可认为broker数为1,我本机只有一个kafka所以这里replication-factor超过1会报错。 综上几个概念可以理解为:集群中有多个broker,创建主题时可以指明topic有多个partitions(消息拆分到不同分区进行存储,一个partion只能被一个消费者消费--partion内部保证接收数据顺序),可以为分区创建多个副本replication,不同副本在不同的broker中(作为备份使用,这里有leader和flower的区分)

查看topic信息

集群部署
可以通过compose集群化部署过es,这里通过创建另一个composeyml文件来部署kafka,配置文件参考 docker-compose集群部署

生产者:

消费者:
方式一:从当前主题的迁移量位置+1开始取数据

方式二:从当前主题第一条消息开始消费

生产者将消息发送broker,broker将消息保存到本地日志中,消息的保存时有序的
单播消息:
当存在一个生产者,一个消费者组的时候,一个消费者组中只有一个消费者会收到消息

多播消息:
当存在一个生产者,多个消费组,不同消费组只有一个消费者收到消息

查看消费组详细信息:

CURRENT-OFFSET:最后被消费的偏移量
LOG-END-OFFSET:消息总量(最后一条消息的偏移量)
LAG :积压了多少条消息

常见问题:
1、如何防止消息丢失
生产者:使用同步消息发送;ack设置为1/all;设置同步分区数>=2
消费者:把自动提交改成手动提交
2、如何防止消息的重复消费
针对网络抖动导致的生产者重试(发送消息),可以设置消费者加锁解决;
3、消息积压
消费者使用多线程异步处理接收数据;创建多个消费者组部署到其他机器上;通过业务架构设计,提升业务层面消费性能。

ps:
缓冲区:kafka默认会创建一个消息缓冲区去存放要发送的消息,大小是32M,每次本地线程会去缓冲区拉16K数据发送到broker,如果不到16K等待10ms也会将数据发送到broker

参考链接:
1、kafka安装教程--推荐
2、kafka配置文件serverproperties参数说明
3、创建主题分区数
4、解决docker容器启动不了的问题
5、通过docker-compose集群部署
6、学习视频

来自《kafka权威指南》第2章
numpartitions参数指定了新创建的主题将包含多少个分区。如果启用了主题自动创建功能(该功能默认是启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。所以,如果要让一个主题的分区个数少于 numpartitions 指定的值,需要手动创建该主题。
Kafka 集群通过分区对主题进行横向扩展,所以当有新的 broker 加入集群时,可以通过分区个数来实现集群的负载均衡。当然,这并不是说,在存在多个主题的情况下(它们分布在多个 broker 上),为了能让分区分布到所有 broker 上,主题分区的个数必须要大于 broker 的个数。不过,拥有大量消息的主题如果要进行负载分散,就需要大量的分区。

为主题选定分区数量并不是一件可有可无的事情,在进行数量选择时,需要考虑如下几个因素。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/13406280.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-07-30
下一篇 2023-07-30

发表评论

登录后才能评论

评论列表(0条)

保存