Kafka系列-主要参数详解

Kafka系列-主要参数详解,第1张

每个kafka broker中配置文件serverproperties默认必须配置的属性如下:

#唯一标识在集群中的ID,要求是正数。

brokerid=0

#服务端口,默认9092

port=9092

#监听地址

hostname=debugo01

# 处理网络请求的最大线程数

numnetworkthreads=2

# 处理磁盘I/O的线程数

numiothreads=8

# 一些后台线程数

backgroundthreads = 4

# 等待IO线程处理的请求队列最大数

queuedmaxrequests = 500

# socket的发送缓冲区(SO_SNDBUF)

socketsendbufferbytes=1048576

# socket的接收缓冲区 (SO_RCVBUF)

socketreceivebufferbytes=1048576

# socket请求的最大字节数。为了防止内存溢出,messagemaxbytes必然要小于

socketrequestmaxbytes = 104857600

# 每个topic的分区个数,更多的partition会产生更多的segment file

numpartitions=2

# 是否允许自动创建topic ,若是false,就需要通过命令创建topic

autocreatetopicsenable =true

# 一个topic ,默认分区的replication个数 ,不能大于集群中broker的个数。

defaultreplicationfactor =1

# 消息体的最大大小,单位是字节

messagemaxbytes = 1000000

# Zookeeper quorum设置。如果有多个使用逗号分割

zookeeperconnect=debugo01:2181,debugo02,debugo03

# 连接zk的超时时间

zookeeperconnectiontimeoutms=1000000

# ZooKeeper集群中leader和follower之间的同步实际

zookeepersynctimems = 2000

#日志存放目录,多个目录使用逗号分割

logdirs=/var/log/kafka

# 日志清理策略(delete|compact)

logcleanuppolicy = delete

# 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。

logretentionhours=168

# 日志数据存储的最大字节数。超过这个时间会根据policy处理数据。

#logretentionbytes=1073741824

# 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)

logsegmentbytes=536870912

# 当达到下面时间,会强制新建一个segment

logrollhours = 247

# 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(logretentionhours或logretentionbytes)

logretentioncheckintervalms=60000

# 是否开启压缩

logcleanerenable=false

# 对于压缩的日志保留的最长时间

logcleanerdeleteretentionms = 1 day

# 对于segment日志的索引文件大小限制

logindexsizemaxbytes = 10 1024 1024

#y索引计算的一个缓冲区,一般不需要设置。

logindexintervalbytes = 4096

# 是否自动平衡broker之间的分配策略

autoleaderrebalanceenable = false

# leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡

leaderimbalanceperbrokerpercentage = 10

# 检查leader是否不平衡的时间间隔

leaderimbalancecheckintervalseconds = 300

# 客户端保留offset信息的最大空间大小

offsetmetadatamaxbytes = 1024

# Consumer端核心的配置是groupid、zookeeperconnect

# 决定该Consumer归属的唯一组ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group

groupid

# 消费者的ID,若是没有设置的话,会自增

consumerid

# 一个用于跟踪调查的ID ,最好同groupid相同

clientid = <group_id>

# socket的超时时间,实际的超时时间为maxfetchwait + sockettimeoutms

sockettimeoutms= 30 1000

# socket的接收缓存空间大小

socketreceivebufferbytes=64 1024

#从每个分区fetch的消息大小限制

fetchmessagemaxbytes = 1024 1024

# true时,Consumer会在消费消息后将offset同步到zookeeper,这样当Consumer失败后,新的consumer就能从zookeeper获取最新的offset

autocommitenable = true

# 自动提交的时间间隔

autocommitintervalms = 60 1000

# 用于消费的最大数量的消息块缓冲大小,每个块可以等同于fetchmessagemaxbytes中数值

queuedmaxmessagechunks = 10

# 当有新的consumer加入到group时,将尝试reblance,将partitions的消费端迁移到新的consumer中, 该设置是尝试的次数

rebalancemaxretries = 4

# 每次reblance的时间间隔

rebalancebackoffms = 2000

# 每次重新选举leader的时间

refreshleaderbackoffms

# server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。

fetchminbytes = 1

# 若是不满足fetchminbytes时,等待消费端请求的最长等待时间

fetchwaitmaxms = 100

# 如果指定时间内没有新消息可用于消费,就抛出异常,默认-1表示不受限

consumertimeoutms = -1

# 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip

metadatabrokerlist

#消息的确认模式

# 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP

# 1:发送消息,并会等待leader 收到确认后,一定的可靠性

# -1:发送消息,等待leader收到确认,并进行复制 *** 作后,才返回,最高的可靠性

requestrequiredacks = 0

# 异步模式下缓冲数据的最大时间。例如设置为100则会集合100ms内的消息后发送,这样会提高吞吐量,但是会增加消息发送的延时

queuebufferingmaxms = 5000

# 异步模式下缓冲的最大消息数,同上

queuebufferingmaxmessages = 10000

# 异步模式下,消息进入队列的等待时间。若是设置为0,则消息不等待,如果进入不了队列,则直接被抛弃

queueenqueuetimeoutms = -1

# 异步模式下,每次发送的消息数,当queuebufferingmaxmessages或queuebufferingmaxms满足条件之一时producer会触发发送。

batchnummessages=200

kafka采用发布订阅模式:一对多。发布订阅模式又分两种:

Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了队列模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。 一个消费者组中消费者订阅同一个Topic,每个消费者接受Topic的一部分分区的消息,从而实现对消费者的横向扩展,对消息进行分流。

注意:当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。

消费者组的概念就是:当有多个应用程序都需要从Kafka获取消息时,让每个app对应一个消费者组,从而使每个应用程序都能获取一个或多个Topic的全部消息;在每个消费者组中,往消费者组中添加消费者来伸缩读取能力和处理能力,消费者组中的每个消费者只处理每个Topic的一部分的消息,每个消费者对应一个线程。

在同一个群组中,无法让一个线程运行多个消费者,也无法让多线线程安全地共享一个消费者。按照规则,一个消费者使用一个线程,如果要在同一个消费者组中运行多个消费者,需要让每个消费者运行在自己的线程中。最好把消费者的逻辑封装在自己的对象中,然后使用java的ExecutorService启动多个线程,使每个消费者运行在自己的线程上,可参考 >

消息中间价,首选Kafka,大厂开源,稳定更新,性能优越,顺便介绍kafka的相关知识。

一、kafka是什么?

ApacheKafka是一套开源的消息系统,它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式,分区化,可复制的提交日志服务。现在,LinkedIn公司有三个同事离职创业,继续开发kafka。

二、关键配置项解读

出于性能和实际集群部署情况,我们还是需要讲解一些重要的配置项。除此之外,如果对某个默认参数存在质疑,在详细了解改参数的作用前,建议采用默认配置。

advertisedhostname

注册到zk供用户使用的主机名。内网环境通常无需配置,而IaaS一般需要配置为公网地址。默认为“hostname”,可以通过javanetInetAddress()接口获取该值。

advertisedport

注册到zk供用户使用的服务端口,通常在IaaS环境需要额外配置。

numpartitions

自动创建topic的默认partition数量。默认是1,为了获得更好的性能,建议修改为更大。最优取值参考后文。

defaultreplicationfactor

自动创建topic的默认副本数量,官方建议修改为2;但通常一个副本就足够了。

mininsyncreplicas

ISR提交生成者请求的最小副本数。

uncleanleaderelectionenable

是否允许不具备ISR资格的replicas选举为leader作为不得已的措施,甚至不惜牺牲部分数据。默认允许。建议允许。数据异常重要的情况例外。

controlledshutdownenable

在kafka收到stop命令或者异常终止时,允许自动同步数据。建议开启。

三、调优考量

配置合适的partitons数量。

这似乎是kafka新手必问得问题。partiton是kafka的并行单元。从procer和broker的视角看,向不同的partition写入是完全并行的;而对于consumer,并发数完全取决于partition的数量,即,如果consumer数量大于partition数量,则必有consumer闲置。所以,我们可以认为kafka的吞吐与partition时线性关系。partition的数量要根据吞吐来推断,假定p代表生产者写入单个partition的最大吞吐,c代表消费者从单个partition消费的最大吞吐,我们的目标吞吐是t,那么partition的数量应该是t/p和t/c中较大的那一个。实际情况中,p的影响因素有批处理的规模,压缩算法,确认机制和副本数等,然而,多次benchmark的结果表明,单个partition的最大写入吞吐在10MB/sec左右;c的影响因素是逻辑算法,需要在不同场景下实测得出。

这个结论似乎太书生气和不实用。我们通常建议partition的数量一定要大于等于消费者的数量来实现最大并发。官方曾测试过1万个partition的情况,所以不需要太担心partition过多的问题。我建议的做法是,如果是3个broker的集群,有5个消费者,那么建议partition的数量是15,也就是broker和consumer数量的最小公倍数。当然,也可以是一个大于消费者的broker数量的倍数,比如6或者9,还请读者自行根据实际环境裁定。

Kafka到底是个啥?用来干嘛的?

官方定义如下:

翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠!

实时数据处理 ,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。

这些中间件,最大的特点主要有两个:

在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。

但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。

随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。

采用这种模型,本质就是将要推送的数据,不在存放在当前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。

消息中间件 :主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。

应用程序 :只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可!

引入消息中间件之后,整个服务开发会变得更加简单,各负其责。

Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。

LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统, Kafka 由此诞生

在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。

先来看一张图, 下面这张图就是 kafka 生产与消费的核心架构模型

如果你看不懂这些概念没关系,我会带着大家一起梳理一遍!

简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:

与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个分区 Partition 的概念。

这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分配到 5 个分区中,其中每个分区存储 200 条数据。

这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快!

这是 kafka 与其他的消息系统最大的不同!

和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower ,即使主分区挂了,也不会影响服务的正常运行。

那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则:

与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。

这里我们需要重点了解一个名词: 消费组

考虑到多个消费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题!

但是不同的组,可以消费同一个分区的数据!

你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。

但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。

如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。

因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致!

光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。

kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。

zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk

下载zookeeper,并解压文件包

创建数据、日志目录

配置zookeeper

重新配置 dataDir 和 dataLogDir 的存储路径

最后,启动 Zookeeper 服务

到官网 >

以上就是关于Kafka系列-主要参数详解全部的内容,包括:Kafka系列-主要参数详解、kafka——消费者原理解析、ApacheKafka开源消息系统_kafka源码分析等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/10132703.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-05
下一篇 2023-05-05

发表评论

登录后才能评论

评论列表(0条)

保存