Kafka数据消费

Kafka数据消费,第1张

消费者负责从订阅的主题上拉取消息,消费组是逻辑概念。一个消费者只属于一个消费组,一个消费组包一个或多个消费者。当消息发布到主题后,会被投递到每个消费组,但每个消费组中只有一个消费者能消费给消息。

消费者如何知道该消费哪个分区?当消费组内消费者个数发生变化时,分区分配是如何变化的呢?

按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配, 以保证分区尽可能均匀地分配给所有的消费者。对于 每一个主题 该策略会将消费组内所有的消费者按照名称的字典序排序然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1分区,后面的每个消费者分配n个分区。

如图所示主题中共有7个分区,此时消费组内只有一个消费者C0,C0订阅7个分区。

随着消费组内消费者不断加入,分区逐渐从C0分配到C1~C6,当最后一个消费者C7加入后,此时总共有8个消费者但是只有7个分区,因此C7由于分配不到分区而无法消费任何消息。

消费者并非越多越好,消费者数量应小于等于分区数量,否则会造成资源的浪费

缺点:

当一个消费组订阅两个分别包含四个分区的主题时,分区分配结果如下,比较均匀。

但当两个主题各有3个分区时,则会出现如下分区不均的问题。类似情况扩大的话,可能出现消费者过载问题。

将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式将分区依次分配给每个消费者。如果消费组内消费者的订阅信息都是相同的,那么分区分配会比较均匀。如一个消费组两个消费者,分别订阅两个都有3的分区的主题,如图。

但是当消费组内消费者的订阅信息不同时,则会出现分配不均问题。如图,假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,分区结果将会如下图所示。

后来引入的策略,主要目的:

假设三个消费者,订阅了4个主题,每个主题有两个分区,那么初始分区分配结果如下:

乍一看,跟RoundRobin分配策略结果相同,但此时如果C1下线,那么消费组会执行再均衡 *** 作,重新分配消息分区。如果是RoundRobin策略,分配结果如下:

而如果是Sticky分配策略,则结果如下:

StickyAssignor保留了上一次对C0和C2的分配结果,将C1的分区分配给C0和C2使其均衡。

如果发生分区重分配,那么对于同一个分区而 ,有可能之前的消费者和新指派的消费者不是同一个,之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,造成重复消费。StickyAssignor分配策略如同其名称中的"sticky"一 样,让分配策略具备的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。

再来看下,消费者订阅信息不相同的情况,拿RoundRobinAssignor中的实例来说。

假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,RoundRobinAssignor分区结果将会如下图所示。

而采用StickyAssignor时,分区分配结果如下:

若此时C0下线,RoundRobinAssignor重分配的结果如下:

而StickyAssignor重分配结果如下:

综上:

StickyAssignor分配策略的优点就是可以使分区重分配具备 “黏性”,减少不必要的分区移动(一个分区剥离之前的消费者 ,转而分配给另一个新的消费者)。

Kafka中的消息消费是基于拉模式。

Kafka每次拉取一组消息,每条消息的格式如下:

在每次拉取方法时,它返回的是还没有被消费过的消息集。要实现这个功能,就需要知道上次消费时的消费位移,消费者在消费完消息后要进行消费位移提交动作,且消费位移要进行持久化,消费位移保存在__consumer_offsets主题中。

当前拉取消息的最大offset为x,消费者消费完成提交位移的是offset其实为x+1,表示下次拉取消息的起始位置。

自动提交

默认采用自动提交,默认每隔5s会将拉取到的每个分区的最大的消息位移进行提交。真正的提交动作是在拉取消息的逻辑完成,每次拉取消息前会判断是否可以进行位移提交,如果可以则提交上一次的位移。这里会有两个问题,如下图所示。

重复消费:当前拉取消息x+2,x+7,当前消费到X+5,在提交消费位移前,消费者宕机;新的消费者还是会从X+2开始拉取消息, 因此导致重复消费。

消息丢失:当前拉取消息x+2,x+7,当前消费X+5,到下次拉取的时候,消费位移已经提交为X+8,若此时消费者宕机,新的消费者会从X+8处开始消费,导致X+5 ~ X+7的消息没有被消费,导致消息的丢失。

手动提交

同步提交和异步提交。

同步提交默认提交本次拉取分区消息的最大偏移量,如本次拉取X+2,X+7的消息,同步提交默认提交X+8的位置;当时同步提交也可指定提交的偏移量,比如消费一条提交1次,因为提交本身为同步 *** 作,所以会耗费一定的性能。

同步提交也会导致重复消费的问题,如消费完成后,提交前消费者宕机。

异步提交消费者线程不会被阻塞,使性能得到增强,但异步提交失败重试可能会导致提交位移被覆盖的问题,如本次异步提交offset=X失败,下次异步提交offset=X+y成功;此时前一次提交重试再次提交offset=x,如果业务上没有重试校验,会导致offset被覆盖,最终导致重复消费。

当新的消费组建立、消费者订阅新的主题或之前提交的位移信息因为过期被删除等,此时查不到纪录的消费位移。Kafka可配置从最新或从最早处开始消费。

Kafka还支持从特定位移处开始消费,可以实现回溯消费,Kafka内部提供了Seek()方法,来重置消费位移。

当需要回溯指定时间后的消息时,可先用offsetsForTimes方法查到指定时间后第一条消息的位移,然后再用seek重置位移。

分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除或添加消费者。

Kfaka提供了组协调器(GroupCoordinator)和消费者协调器(ConsumerCoordinator),前者负责管理消费组,后者负责与前者交互,两者最重要的职责就是负责再均衡的 *** 作。

举例说明,当消费者加入消费组时,消费者、消费组和组协调器之间一般会经历以下几个阶段。

第一阶段(FIND COORDINATOR)

消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker并创建与该broker 相互通信的网络连接。

消费者会向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器。

Kafka根据请求中的coordinator_key(也就是groupld )的哈希值计算__consumer_offsets中的分区编号,如下图所示。找到对应的分区之后,在寻找此分区leader副本所在的broker节点,该节点即为当前消费组所在的组协调器节点。

消费组最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给该broker节点。该broker节点既扮演GroupCoordinato的角色又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。

第二阶段(JOIN GROUP)

在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的 消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

组协调器内部主要做了以下几件事:

选举消费组的leader

如果当前组内没有leader,那么第一个加入消费组的则为leader。如果leader挂掉,组协调器会从内部维护的HashMap(消费者信息,key为member_id)中选择第一个key作为新的leader。

选举分区分配策略

前面说的每个消费者可能会上报多个分区分配策略,选举过程如下:

第三阶段(SYNC GROUP)

leader消费者根据在第二阶段中得到的分区分配策略来实施分区分配,然后将分配结果同步到组协调器。各个消费者会向组协调器发送SyncGroupRequest请求来同步分配方案。

请求结构如图,leader发送的请求才会有group_assignment。

其中包含了各个消费者对应的具体分配方案,member_id表示消费者的唯一标识,而 member_assignment是与消费者对应的分配方案,如图

消费者收到具体的分区分配方案后,会开启心跳任务,定期向组协调器发送心跳请求确定彼此在线。

第四阶段(HEARTBEAT)

在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交成功,那么消费者会请求获取上次提交的消费位移并从此处继续消费。

心跳线程是一个独立的线程,可以在轮询消息的空档发送。如果消费者停发送心跳的时间足够长,组协调器会认为这个消费者已经死亡,则触发一次再均衡行为。

发送消息的主要步骤

格式:每个消息是一个 ProducerRecord 对象, 必须指定 所属的 Topic和Value , 还可以指定Partition及Key

1:序列化 ProducerRecord

2:分区: 如指定Partition,不做任何事情;否则,Partitioner 根据key得到Partition 。生产者向哪个Partition发送

3:消息添加到相应 bach中 ,独立线程将batch 发到Broker上

4:broker收到消息响应 。 成功回RecordMetaData对象 ,包含了Topic信息、Patition信息、消息在Partition中的Offset信息; 失败返回错误

有序场景:不建议retries  0。可maxinflightrequestsperconnection  1, 影响生产者吞吐量,但保证有序          ps: 同partition消息有序

三个 必选 的属性:

(1) bootstrapservers ,broker地址清单

(2) keyserializer: 实现orgapachekafkacommonserializationSerializer接口的类,key序列化成字节数组。注意: 必须被设置,即使没指定key

(3)valueserializer, value序列化成字节数组

同步发送消息

异步发送消息

(1)acks: 指定多少partition副本收到消息,生产者才会认为写成功

        0,不需等待服务器的响应,吞吐量高,如broker没有收到,生产者不知道

        1,leader partition收到消息,一个即成功

        all,所有partition都收到,才成功,leader和follower共同应答

(2)buffermemory, 生产者内 缓存区域大小

(3)compressiontype ,默认不压缩,设置成snappy、gzip或lz4对发送给broker压缩

(4)retries, 重发消息的次数

(5)batchsize, 发送同一partition消息会先存储在batch中,该参数指定一个batch内存大小,单位byte。不一定填满才发送

(6)lingerms ,批次时间,batch被填满或者lingerms达到上限,就把batch中的消息发送出去

(7)maxinflightrequestsperconnection, 生产者在收到服务器响应之前可以发送的消息个数

创建ProducerRecord时,必须 指定序列化器 ,推荐序列化框架Avro、Thrift、ProtoBuf等

用 Avro 之前,先定义schema(通常用 JSON 写)

(1)创建一个类代表客户,作为消息的value

(2)定义schema

(3)生成Avro对象发送到Kafka

ProducerRecord包含Topic、value,key默认null,ey的两个作用:1)附加信息    2)被写到Topic的哪个partition

key  null ,默认partitioner, RoundRobin均衡分布

key不空,hash进行散列 ,不改变partition数量(永远不加),key和partition映射不变。

自定义paritioner 需实现Partitioner接口

每个kafka broker中配置文件serverproperties默认必须配置的属性如下:

#唯一标识在集群中的ID,要求是正数。
brokerid=0
#服务端口,默认9092
port=9092
#监听地址
hostname=debugo01

# 处理网络请求的最大线程数
numnetworkthreads=2
# 处理磁盘I/O的线程数
numiothreads=8
# 一些后台线程数
backgroundthreads = 4
# 等待IO线程处理的请求队列最大数
queuedmaxrequests = 500

# socket的发送缓冲区(SO_SNDBUF)
socketsendbufferbytes=1048576
# socket的接收缓冲区 (SO_RCVBUF)
socketreceivebufferbytes=1048576
# socket请求的最大字节数。为了防止内存溢出,messagemaxbytes必然要小于
socketrequestmaxbytes = 104857600

# 每个topic的分区个数,更多的partition会产生更多的segment file
numpartitions=2
# 是否允许自动创建topic ,若是false,就需要通过命令创建topic
autocreatetopicsenable =true
# 一个topic ,默认分区的replication个数 ,不能大于集群中broker的个数。
defaultreplicationfactor =1
# 消息体的最大大小,单位是字节
messagemaxbytes = 1000000

# Zookeeper quorum设置。如果有多个使用逗号分割
zookeeperconnect=debugo01:2181,debugo02,debugo03
# 连接zk的超时时间
zookeeperconnectiontimeoutms=1000000
# ZooKeeper集群中leader和follower之间的同步实际
zookeepersynctimems = 2000

#日志存放目录,多个目录使用逗号分割
logdirs=/var/log/kafka

# 日志清理策略(delete|compact)
logcleanuppolicy = delete
# 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
logretentionhours=168
# 日志数据存储的最大字节数。超过这个时间会根据policy处理数据。
#logretentionbytes=1073741824

# 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
logsegmentbytes=536870912
# 当达到下面时间,会强制新建一个segment
logrollhours = 247
# 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(logretentionhours或logretentionbytes)
logretentioncheckintervalms=60000

# 是否开启压缩
logcleanerenable=false
# 对于压缩的日志保留的最长时间
logcleanerdeleteretentionms = 1 day

# 对于segment日志的索引文件大小限制
logindexsizemaxbytes = 10 1024 1024
#y索引计算的一个缓冲区,一般不需要设置。
logindexintervalbytes = 4096

# 是否自动平衡broker之间的分配策略
autoleaderrebalanceenable = false
# leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leaderimbalanceperbrokerpercentage = 10
# 检查leader是否不平衡的时间间隔
leaderimbalancecheckintervalseconds = 300
# 客户端保留offset信息的最大空间大小
offsetmetadatamaxbytes = 1024

# Consumer端核心的配置是groupid、zookeeperconnect
# 决定该Consumer归属的唯一组ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group
groupid
# 消费者的ID,若是没有设置的话,会自增
consumerid
# 一个用于跟踪调查的ID ,最好同groupid相同
clientid = <group_id>

# socket的超时时间,实际的超时时间为maxfetchwait + sockettimeoutms
sockettimeoutms= 30 1000
# socket的接收缓存空间大小
socketreceivebufferbytes=64 1024
#从每个分区fetch的消息大小限制
fetchmessagemaxbytes = 1024 1024

# true时,Consumer会在消费消息后将offset同步到zookeeper,这样当Consumer失败后,新的consumer就能从zookeeper获取最新的offset
autocommitenable = true
# 自动提交的时间间隔
autocommitintervalms = 60 1000

# 用于消费的最大数量的消息块缓冲大小,每个块可以等同于fetchmessagemaxbytes中数值
queuedmaxmessagechunks = 10

# 当有新的consumer加入到group时,将尝试reblance,将partitions的消费端迁移到新的consumer中, 该设置是尝试的次数
rebalancemaxretries = 4
# 每次reblance的时间间隔
rebalancebackoffms = 2000
# 每次重新选举leader的时间
refreshleaderbackoffms

# server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。
fetchminbytes = 1
# 若是不满足fetchminbytes时,等待消费端请求的最长等待时间
fetchwaitmaxms = 100
# 如果指定时间内没有新消息可用于消费,就抛出异常,默认-1表示不受限
consumertimeoutms = -1

# 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip
metadatabrokerlist

#消息的确认模式
# 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
# 1:发送消息,并会等待leader 收到确认后,一定的可靠性
# -1:发送消息,等待leader收到确认,并进行复制 *** 作后,才返回,最高的可靠性
requestrequiredacks = 0

# 异步模式下缓冲数据的最大时间。例如设置为100则会集合100ms内的消息后发送,这样会提高吞吐量,但是会增加消息发送的延时
queuebufferingmaxms = 5000
# 异步模式下缓冲的最大消息数,同上
queuebufferingmaxmessages = 10000
# 异步模式下,消息进入队列的等待时间。若是设置为0,则消息不等待,如果进入不了队列,则直接被抛弃
queueenqueuetimeoutms = -1
# 异步模式下,每次发送的消息数,当queuebufferingmaxmessages或queuebufferingmaxms满足条件之一时producer会触发发送。
batchnummessages=200

Kafka是高吞吐量低延迟的高并发、高性能的消息中间件,在大数据领域有广泛的应用。那他是如何做到这么高的吞吐量和高性能呢?

生产者通过多batch合并一个request 一次性发送broker提高吞吐量
每个Kafka服务端叫做一个broker,负责管理一台机器上的数据;每个topic拆分成多个partition,这样每个partition存储一部分数据并放在不同的broker上。这时候生产者如果生产一条消息,就建立连接然后发送数据,效率肯定不高。

Kafka会在生产者放一个内存缓冲区,当生产消息后,它会把同一个topic同一个partition的消息打包成一个batch。

这样就结束了吗?当然不是,如果生产者这边有多个topic,不同topic发送到同一个broker的数据是否可以合并呢?当然可以
如果客户端那边有2个topic,每个topic有个3个partition,那每个broker就会有2个partition。这时候我们完全可以将发送到同一个broker的batch一次发送。所以kafka将发送到同一个broker的batch打包成一个request发送,进一步提高了通信的效率。

首先kafka在接收到数据后都会写磁盘,但是我们都知道写磁盘性能会很差,所以它并不是直接写磁盘。 *** 作系统本身有一层缓存叫做页缓存,相当于内存,所以kafka是把数据写入到页缓存中,接下来由页缓存决定什么时候写入到磁盘。

普通机械磁盘如果随机写的话,性能会非常差,Kafka采用磁盘顺序写的方式,仅仅将数据追加到文件末尾,这种方式性能基本可以和内存媲美。

上面说的是kafka写入数据的优秀设计,那从磁盘读数据分发又是如何设计的呢?

我们可以想想分发数据一般是如何做的。首先肯定是从磁盘中读出数据到页缓存,然后从页缓存中拷贝到kafka中,然后再从kafka中拷贝到socket中,最后再给网卡。
而零拷贝技术,就是去除上面2步拷贝,直接从页缓存中将数据发送到网卡中,socket中仅仅只是拷贝一个描述符,这个过程大大提升了读取的性能。而且从磁盘读数据时候会先看看页缓存中有没有,如果有的话都不用读磁盘了。

kafka的配置分为 broker、producter、consumer三个不同的配置

一 BROKER 的全局配置

最为核心的三个配置 brokerid、logdir、zookeeperconnect 。
------------------------------------------- 系统 相关 -------------------------------------------

##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变brokerid的话不会影响consumers

brokerid =1
##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2

logdirs = /tmp/kafka-logs
##提供给客户端响应的端口

port =6667
##消息体的最大大小,单位是字节

messagemaxbytes =1000000
## broker 处理消息的最大线程数,一般情况下不需要去修改

numnetworkthreads =3
## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数

numiothreads =8
## 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改

backgroundthreads =4
## 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制

queuedmaxrequests =500
##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置

hostname
## 打广告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使用还未深究

advertisedhostname
## 广告地址端口,必须不同于port中的设置

advertisedport
## socket的发送缓冲区,socket的调优参数SO_SNDBUFF

socketsendbufferbytes =1001024
## socket的接受缓冲区,socket的调优参数SO_RCVBUFF

socketreceivebufferbytes =1001024
## socket请求的最大数值,防止serverOOM,messagemaxbytes必然要小于socketrequestmaxbytes,会被topic创建时的指定参数覆盖

socketrequestmaxbytes =10010241024
------------------------------------------- LOG 相关 -------------------------------------------

## topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖

logsegmentbytes =102410241024
## 这个参数会在日志segment没有达到logsegmentbytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖

logrollhours =247
## 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖

logcleanuppolicy = delete
## 数据存储的最大时间 超过这个时间 会根据logcleanuppolicy设置的策略处理数据,也就是消费端能够多久去消费数据

## logretentionbytes和logretentionminutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

logretentionminutes=7days

指定日志每隔多久检查看是否可以被删除,默认1分钟

logcleanupintervalmins=1
## topic每个分区的最大文件大小,一个topic的大小限制 = 分区数logretentionbytes 。-1没有大小限制

## logretentionbytes和logretentionminutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

logretentionbytes=-1
## 文件大小检查的周期时间,是否处罚 logcleanuppolicy中设置的策略

logretentioncheckintervalms=5minutes
## 是否开启日志压缩

logcleanerenable=false
## 日志压缩运行的线程数

logcleanerthreads =1
## 日志压缩时候处理的最大大小

logcleaneriomaxbytespersecond=None
## 日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好

logcleanerdedupebuffersize=50010241024
## 日志清理时候用到的IO块大小 一般不需要修改

logcleaneriobuffersize=5121024
## 日志清理中hash表的扩大因子 一般不需要修改

logcleaneriobufferloadfactor =09
## 检查是否处罚日志清理的间隔

logcleanerbackoffms =15000
## 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖

logcleanermincleanableratio=05
## 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同logretentionminutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖

logcleanerdeleteretentionms =1day
## 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖

logindexsizemaxbytes =1010241024
## 当执行一个fetch *** 作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数

logindexintervalbytes =4096
## log文件"sync"到磁盘之前累积的消息条数

## 因为磁盘IO *** 作是一个慢 *** 作,但又是一个"数据可靠性"的必要手段

## 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡

## 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)

## 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟

## 物理server故障,将会导致没有fsync的消息丢失

logflushintervalmessages=None
## 检查是否需要固化到硬盘的时间间隔

logflushschedulerintervalms =3000
## 仅仅通过interval来控制消息的磁盘写入时机,是不足的

## 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔

## 达到阀值,也将触发

logflushintervalms = None
## 文件在索引中清除后保留的时间 一般不需要去修改

logdeletedelayms =60000
## 控制上次固化硬盘的时间点,以便于数据恢复 一般不需要去修改

logflushoffsetcheckpointintervalms =60000
------------------------------------------- TOPIC 相关 -------------------------------------------

## 是否允许自动创建topic ,若是false,就需要通过命令创建topic

autocreatetopicsenable =true
## 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数

defaultreplicationfactor =1
## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖

numpartitions =1
实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。
----------------------------------复制(Leader、replicas) 相关 ----------------------------------

## partition leader与replicas之间通讯时,socket的超时时间

controllersockettimeoutms =30000
## partition leader与replicas数据同步时,消息的队列尺寸

controllermessagequeuesize=10
## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中

replicalagtimemaxms =10000
## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效

## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后

## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移

## 到其他follower中

## 在broker数量较少,或者网络不足的环境中,建议提高此值

replicalagmaxmessages =4000
##follower与leader之间的socket超时时间

replicasockettimeoutms=301000
## leader复制时候的socket缓存大小

replicasocketreceivebufferbytes=641024
## replicas每次获取数据的最大大小

replicafetchmaxbytes =10241024
## replicas同leader之间通信的最大等待时间,失败了会重试

replicafetchwaitmaxms =500
## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件

replicafetchminbytes =1
## leader 进行复制的线程数,增大这个数值会增加follower的IO

numreplicafetchers=1
## 每个replica检查是否将最高水位进行固化的频率

replicahighwatermarkcheckpointintervalms =5000
## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker

controlledshutdownenable =false
## 控制器关闭的尝试次数

controlledshutdownmaxretries =3
## 每次关闭尝试的时间间隔

controlledshutdownretrybackoffms =5000
## 是否自动平衡broker之间的分配策略

autoleaderrebalanceenable =false
## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡

leaderimbalanceperbrokerpercentage =10
## 检查leader是否不平衡的时间间隔

leaderimbalancecheckintervalseconds =300
## 客户端保留offset信息的最大空间大小

offsetmetadatamaxbytes
----------------------------------ZooKeeper 相关----------------------------------

##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3

zookeeperconnect = localhost:2181
## ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大

zookeepersessiontimeoutms=6000
## ZooKeeper的连接超时时间

zookeeperconnectiontimeoutms =6000
## ZooKeeper集群中leader和follower之间的同步实际那

zookeepersynctimems =2000

配置的修改

其中一部分配置是可以被每个topic自身的配置所代替,例如

新增配置

bin/kafka-topicssh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config maxmessagebytes=64000--config flushmessages=1
修改配置

bin/kafka-topicssh --zookeeper localhost:2181--alter --topic my-topic --config maxmessagebytes=128000
删除配置 :

bin/kafka-topicssh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig maxmessagebytes

二 CONSUMER 配置

最为核心的配置是groupid、zookeeperconnect

## Consumer归属的组ID,broker是根据groupid来判断是队列模式还是发布订阅模式,非常重要

 groupid
## 消费者的ID,若是没有设置的话,会自增

 consumerid
## 一个用于跟踪调查的ID ,最好同groupid相同

 clientid = group id value
## 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置

 zookeeperconnect=localhost:2182
## zookeeper的心跳超时时间,查过这个时间就认为是dead消费者

 zookeepersessiontimeoutms =6000
## zookeeper的等待连接时间

 zookeeperconnectiontimeoutms =6000
## zookeeper的follower同leader的同步时间

 zookeepersynctimems =2000
## 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常

 autooffsetreset = largest
## socket的超时时间,实际的超时时间是:maxfetchwait + sockettimeoutms

 sockettimeoutms=301000
## socket的接受缓存空间大小

 socketreceivebufferbytes=641024
##从每个分区获取的消息大小限制

 fetchmessagemaxbytes =10241024
## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset

 autocommitenable =true
## 自动提交的时间间隔

 autocommitintervalms =601000
## 用来处理消费消息的块,每个块可以等同于fetchmessagemaxbytes中数值

 queuedmaxmessagechunks =10
## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新

## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册

##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,

## 此值用于控制,注册节点的重试次数

 rebalancemaxretries =4
## 每次再平衡的时间间隔

 rebalancebackoffms =2000
## 每次重新选举leader的时间

 refreshleaderbackoffms
## server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求

 fetchminbytes =1
## 若是不满足最小大小(fetchminbytes)的话,等待消费端请求的最长等待时间

 fetchwaitmaxms =100
## 指定时间内没有消息到达就抛出异常,一般不需要改

 consumertimeoutms = -1
三 PRODUCER 的配置

比较核心的配置:metadatabrokerlist、requestrequiredacks、producertype、serializerclass

## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip

 metadatabrokerlist
##消息的确认模式

 ##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP

 ##1:发送消息,并会等待leader 收到确认后,一定的可靠性

 ## -1:发送消息,等待leader收到确认,并进行复制 *** 作后,才返回,最高的可靠性

 requestrequiredacks =0
## 消息发送的最长等待时间

 requesttimeoutms =10000
## socket的缓存大小

 sendbufferbytes=1001024
## key的序列化方式,若是没有设置,同serializerclass

 keyserializerclass
## 分区的策略,默认是取模

 partitionerclass=kafkaproducerDefaultPartitioner
## 消息的压缩模式,默认是none,可以有gzip和snappy

 compressioncodec = none
## 可以针对默写特定的topic进行压缩

 compressedtopics=null
## 消息发送失败后的重试次数

 messagesendmaxretries =3
## 每次失败后的间隔时间

 retrybackoffms =100
## 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据

 topicmetadatarefreshintervalms =6001000
## 用户随意指定,但是不能重复,主要用于跟踪记录消息

 clientid=""
------------------------------------------- 消息模式 相关 -------------------------------------------

 ## 生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送

 producertype=sync
## 异步模式下,那么就会在设置的时间缓存消息,并一次性发送

 queuebufferingmaxms =5000
## 异步的模式下 最长等待的消息数

 queuebufferingmaxmessages =10000
## 异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃

 queueenqueuetimeoutms = -1
## 异步模式下,每次发送的最大消息数,前提是触发了queuebufferingmaxmessages或是queuebufferingmaxms的限制

 batchnummessages=200
## 消息体的系列化处理类 ,转化为字节流进行传输

 serializerclass= kafkaserializerDefaultEncoder

作者 | Normcore Tech

译者 | 弯月,责编 | 屠敏

出品 | CSDN(ID:CSDNnews)

以下为译文:

可能有人没有听说过Kafka,这是一个非常复杂的分布式软件,可协调多台计算机之间的数据传输。更具体地说,该软件的功能是“展平”数据,然后快速地将数据从一个地方移动到另一个地方。一般来讲,如果你有很多数据需要快速处理并发送到其他地方,那么就可以考虑一下Kafka。Kafka还可以在一定期限内保留数据,比如设置数据保存2天、3天或7天,如果你的下游流程失败,那么你还可以利用存储在Kafka中的数据重新处理。

许多处理汇总数据的公司(比如Facebook和Twitter等社交网络数据,以及每晚需要处理大量星体运动的天文学家,或需要快速了解车辆周围环境数据的自动驾驶车辆公司等)都在使用Kafka,将任意地方生产的数据(即用户通过键盘输入的数据,通过望远镜读取的数据,通过车辆遥测读取的数据等)移动至下游流程进行处理和分析。

最近,WeWork更为名The We Company,他们在共享工作间领域取得了成功,其官网宣称公司的使命为:

“提升世界的意识。”其核心业务是从房地产出租公司那里租下办公室,然后转租给无法按照传统流程租赁办公室的个人和小公司。

为了“提升世界的意识”,该公司致力于为世界各地的个人和公司的团队打造独特却又不完全相同的办公空间。最近,该公司还开始涉足教育。

最近,因为上市,WeWork曝光了一些财务信息:

从好的方面来看,根据A xi os的数据,2018年WeWork的入住率为90%,且会员总数在不断增加。

有人常常将WeWork视为硅谷地区的公司过高估值的完美例子。作为一家房地产企业,WeWork烧钱的速度非常快,毫无疑问他们必须努力让公众市场投资者相信公司有长远的发展,同时和还要维护其作为 科技 公司的地位。

这家公司再三强调说它不是一家房地产公司(毕竟它在不断烧钱对吧?),那么一家消息中介技术公司究竟能提供什么?WeWork曾宣布,它使用Kafka来实现“内部部署的物联网需求”。这是什么意思?

“我们的产品是物理空间,”WeWork的首席开发负责人David Fano说,他在会议期间穿着一件印有“bldgs = data”字样的T恤。

每个办公室都有10个环境传感器——小巧的壁挂式绿色盒子,这些传感器可跟踪室内温度、湿度、空气质量、气压和环境光线水平。还有20个白色的壁挂式信标,呈三角形分布在公共空间(开放式办公区和会议室),用于测量WeWork成员的室内位置(数据是匿名的)。顶部四分之一的传感器通过计算机视觉观察成员的活动。

换句话说,WeWork会跟踪WeWork的多个物理事件并记录所有这些数据。但是他们真的有必要这样做吗?记录Keith Harring壁画周围开放区域的环境温度能给他们带来怎样的竞争优势?更重要的是,他们能否将这些信息用到重要的项目中?

对于公司而言,重要的是要了解办公室的“单位组合” ——私人办公室、会议空间和开放式办公桌——的比例,我们可以利用这些信息对下一个办公间作出调整。

我觉得这家新闻报道机构需要建立一种思考技术的心理模型。Ben Thompson为Stratechery提供了出色的服务,他建立了聚合理论(>acks=0,生产者发送过来数据就不管了,可靠性差,效率高;

acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;

acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;

在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,

对可靠性要求比较高的场景。

至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

最多一次(At Most Once)= ACK级别设置为0

总结:

At Least Once可以保证数据不丢失,但是不能保证数据不重复;

At Most Once可以保证数据不重复,但是不能保证数据不丢失。

精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

Kafka 011版本以后,引入了一项重大特性:幂等性和事务。

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

经上所述,如果保障数据不丢,不重复需要保证一下条件:

精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)

开启参数 enableidempotence 默认为 true,false 关闭。

如何保障kafka内数据有序呢?

kafka在1x及以后版本保证数据单分区有序,条件如下:

(2)开启幂等性

maxinflightrequestsperconnection需要设置小于等于5。

(1)未开启幂等性

maxinflightrequestsperconnection需要设置为1。

原因说明:因为在kafka1x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,

故无论如何,都可以保证最近5个request的数据都是有序的

源端集群
kafka01:101110
kafka02:101111
kafka03:101112
目标集群
kafka11:101113
kafka12:101114
kafka13:101115
公司新业务需要从其他部门取到Kafka中的数据到我们的Kafka集群,这里使用Kafka自带的kafka-mirror-maker工具进行数据的同步,数据流向为源端数据到目标集群,具体配置看下面配置,这里只提供基础的配置,生产中使用请去官网根据文档配置自己需要的个性化配置。
在目标端集群配置$KAFKA_HOME/config/consumerproperties
groupid可以自己定义

在目标端集群配置$KAFKA_HOME/config/producerproperties

在目标端及源端各自新建test02的topic

在目标端启动同步进程(如要后台启动请加nohup)

===测试===
在源端启动生产者进程

在目标端启动消费者进程,在消费端能看到生产者发送的信息即可


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/dianzi/10444952.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-08
下一篇 2023-05-08

发表评论

登录后才能评论

评论列表(0条)

保存