每个kafka broker中配置文件serverproperties默认必须配置的属性如下:
#唯一标识在集群中的ID,要求是正数。
brokerid=0
#服务端口,默认9092
port=9092
#监听地址
hostname=debugo01
# 处理网络请求的最大线程数
numnetworkthreads=2
# 处理磁盘I/O的线程数
numiothreads=8
# 一些后台线程数
backgroundthreads = 4
# 等待IO线程处理的请求队列最大数
queuedmaxrequests = 500
# socket的发送缓冲区(SO_SNDBUF)
socketsendbufferbytes=1048576
# socket的接收缓冲区 (SO_RCVBUF)
socketreceivebufferbytes=1048576
# socket请求的最大字节数。为了防止内存溢出,messagemaxbytes必然要小于
socketrequestmaxbytes = 104857600
# 每个topic的分区个数,更多的partition会产生更多的segment file
numpartitions=2
# 是否允许自动创建topic ,若是false,就需要通过命令创建topic
autocreatetopicsenable =true
# 一个topic ,默认分区的replication个数 ,不能大于集群中broker的个数。
defaultreplicationfactor =1
# 消息体的最大大小,单位是字节
messagemaxbytes = 1000000
# Zookeeper quorum设置。如果有多个使用逗号分割
zookeeperconnect=debugo01:2181,debugo02,debugo03
# 连接zk的超时时间
zookeeperconnectiontimeoutms=1000000
# ZooKeeper集群中leader和follower之间的同步实际
zookeepersynctimems = 2000
#日志存放目录,多个目录使用逗号分割
logdirs=/var/log/kafka
# 日志清理策略(delete|compact)
logcleanuppolicy = delete
# 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
logretentionhours=168
# 日志数据存储的最大字节数。超过这个时间会根据policy处理数据。
#logretentionbytes=1073741824
# 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
logsegmentbytes=536870912
# 当达到下面时间,会强制新建一个segment
logrollhours = 247
# 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(logretentionhours或logretentionbytes)
logretentioncheckintervalms=60000
# 是否开启压缩
logcleanerenable=false
# 对于压缩的日志保留的最长时间
logcleanerdeleteretentionms = 1 day
# 对于segment日志的索引文件大小限制
logindexsizemaxbytes = 10 1024 1024
#y索引计算的一个缓冲区,一般不需要设置。
logindexintervalbytes = 4096
# 是否自动平衡broker之间的分配策略
autoleaderrebalanceenable = false
# leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leaderimbalanceperbrokerpercentage = 10
# 检查leader是否不平衡的时间间隔
leaderimbalancecheckintervalseconds = 300
# 客户端保留offset信息的最大空间大小
offsetmetadatamaxbytes = 1024
# Consumer端核心的配置是groupid、zookeeperconnect
# 决定该Consumer归属的唯一组ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group
groupid
# 消费者的ID,若是没有设置的话,会自增
consumerid
# 一个用于跟踪调查的ID ,最好同groupid相同
clientid = <group_id>
# socket的超时时间,实际的超时时间为maxfetchwait + sockettimeoutms
sockettimeoutms= 30 1000
# socket的接收缓存空间大小
socketreceivebufferbytes=64 1024
#从每个分区fetch的消息大小限制
fetchmessagemaxbytes = 1024 1024
# true时,Consumer会在消费消息后将offset同步到zookeeper,这样当Consumer失败后,新的consumer就能从zookeeper获取最新的offset
autocommitenable = true
# 自动提交的时间间隔
autocommitintervalms = 60 1000
# 用于消费的最大数量的消息块缓冲大小,每个块可以等同于fetchmessagemaxbytes中数值
queuedmaxmessagechunks = 10
# 当有新的consumer加入到group时,将尝试reblance,将partitions的消费端迁移到新的consumer中, 该设置是尝试的次数
rebalancemaxretries = 4
# 每次reblance的时间间隔
rebalancebackoffms = 2000
# 每次重新选举leader的时间
refreshleaderbackoffms
# server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。
fetchminbytes = 1
# 若是不满足fetchminbytes时,等待消费端请求的最长等待时间
fetchwaitmaxms = 100
# 如果指定时间内没有新消息可用于消费,就抛出异常,默认-1表示不受限
consumertimeoutms = -1
# 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip
metadatabrokerlist
#消息的确认模式
# 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
# 1:发送消息,并会等待leader 收到确认后,一定的可靠性
# -1:发送消息,等待leader收到确认,并进行复制 *** 作后,才返回,最高的可靠性
requestrequiredacks = 0
# 异步模式下缓冲数据的最大时间。例如设置为100则会集合100ms内的消息后发送,这样会提高吞吐量,但是会增加消息发送的延时
queuebufferingmaxms = 5000
# 异步模式下缓冲的最大消息数,同上
queuebufferingmaxmessages = 10000
# 异步模式下,消息进入队列的等待时间。若是设置为0,则消息不等待,如果进入不了队列,则直接被抛弃
queueenqueuetimeoutms = -1
# 异步模式下,每次发送的消息数,当queuebufferingmaxmessages或queuebufferingmaxms满足条件之一时producer会触发发送。
batchnummessages=200
kafka采用发布订阅模式:一对多。发布订阅模式又分两种:
Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了队列模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。 一个消费者组中消费者订阅同一个Topic,每个消费者接受Topic的一部分分区的消息,从而实现对消费者的横向扩展,对消息进行分流。
注意:当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。
消费者组的概念就是:当有多个应用程序都需要从Kafka获取消息时,让每个app对应一个消费者组,从而使每个应用程序都能获取一个或多个Topic的全部消息;在每个消费者组中,往消费者组中添加消费者来伸缩读取能力和处理能力,消费者组中的每个消费者只处理每个Topic的一部分的消息,每个消费者对应一个线程。
在同一个群组中,无法让一个线程运行多个消费者,也无法让多线线程安全地共享一个消费者。按照规则,一个消费者使用一个线程,如果要在同一个消费者组中运行多个消费者,需要让每个消费者运行在自己的线程中。最好把消费者的逻辑封装在自己的对象中,然后使用java的ExecutorService启动多个线程,使每个消费者运行在自己的线程上,可参考 >
消息中间价,首选Kafka,大厂开源,稳定更新,性能优越,顺便介绍kafka的相关知识。
一、kafka是什么?
ApacheKafka是一套开源的消息系统,它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式,分区化,可复制的提交日志服务。现在,LinkedIn公司有三个同事离职创业,继续开发kafka。
二、关键配置项解读
出于性能和实际集群部署情况,我们还是需要讲解一些重要的配置项。除此之外,如果对某个默认参数存在质疑,在详细了解改参数的作用前,建议采用默认配置。
advertisedhostname
注册到zk供用户使用的主机名。内网环境通常无需配置,而IaaS一般需要配置为公网地址。默认为“hostname”,可以通过javanetInetAddress()接口获取该值。
advertisedport
注册到zk供用户使用的服务端口,通常在IaaS环境需要额外配置。
numpartitions
自动创建topic的默认partition数量。默认是1,为了获得更好的性能,建议修改为更大。最优取值参考后文。
defaultreplicationfactor
自动创建topic的默认副本数量,官方建议修改为2;但通常一个副本就足够了。
mininsyncreplicas
ISR提交生成者请求的最小副本数。
uncleanleaderelectionenable
是否允许不具备ISR资格的replicas选举为leader作为不得已的措施,甚至不惜牺牲部分数据。默认允许。建议允许。数据异常重要的情况例外。
controlledshutdownenable
在kafka收到stop命令或者异常终止时,允许自动同步数据。建议开启。
三、调优考量
配置合适的partitons数量。
这似乎是kafka新手必问得问题。partiton是kafka的并行单元。从procer和broker的视角看,向不同的partition写入是完全并行的;而对于consumer,并发数完全取决于partition的数量,即,如果consumer数量大于partition数量,则必有consumer闲置。所以,我们可以认为kafka的吞吐与partition时线性关系。partition的数量要根据吞吐来推断,假定p代表生产者写入单个partition的最大吞吐,c代表消费者从单个partition消费的最大吞吐,我们的目标吞吐是t,那么partition的数量应该是t/p和t/c中较大的那一个。实际情况中,p的影响因素有批处理的规模,压缩算法,确认机制和副本数等,然而,多次benchmark的结果表明,单个partition的最大写入吞吐在10MB/sec左右;c的影响因素是逻辑算法,需要在不同场景下实测得出。
这个结论似乎太书生气和不实用。我们通常建议partition的数量一定要大于等于消费者的数量来实现最大并发。官方曾测试过1万个partition的情况,所以不需要太担心partition过多的问题。我建议的做法是,如果是3个broker的集群,有5个消费者,那么建议partition的数量是15,也就是broker和consumer数量的最小公倍数。当然,也可以是一个大于消费者的broker数量的倍数,比如6或者9,还请读者自行根据实际环境裁定。
Kafka到底是个啥?用来干嘛的?
官方定义如下:
翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠!
实时数据处理 ,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。
这些中间件,最大的特点主要有两个:
在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。
但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。
随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。
采用这种模型,本质就是将要推送的数据,不在存放在当前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。
消息中间件 :主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。
应用程序 :只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可!
引入消息中间件之后,整个服务开发会变得更加简单,各负其责。
Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。
LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统, Kafka 由此诞生 。
在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。
先来看一张图, 下面这张图就是 kafka 生产与消费的核心架构模型 !
如果你看不懂这些概念没关系,我会带着大家一起梳理一遍!
简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:
与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个分区 Partition 的概念。
这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分配到 5 个分区中,其中每个分区存储 200 条数据。
这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快!
这是 kafka 与其他的消息系统最大的不同!
和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower ,即使主分区挂了,也不会影响服务的正常运行。
那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则:
与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。
这里我们需要重点了解一个名词: 消费组 !
考虑到多个消费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题!
但是不同的组,可以消费同一个分区的数据!
你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。
但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。
如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。
因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致!
光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。
kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。
zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk
下载zookeeper,并解压文件包
创建数据、日志目录
配置zookeeper
重新配置 dataDir 和 dataLogDir 的存储路径
最后,启动 Zookeeper 服务
到官网 >
以上就是关于Kafka系列-主要参数详解全部的内容,包括:Kafka系列-主要参数详解、kafka——消费者原理解析、ApacheKafka开源消息系统_kafka源码分析等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)