kafka 配置文件参数详解

kafka 配置文件参数详解,第1张

kafka的配置分为 broker、producter、consumer三个不同的配置

一 BROKER 的全局配置

最为核心的三个配置 brokerid、logdir、zookeeperconnect 。

------------------------------------------- 系统 相关 -------------------------------------------

##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变brokerid的话不会影响consumers

brokerid =1

##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2

logdirs = /tmp/kafka-logs

##提供给客户端响应的端口

port =6667

##消息体的最大大小,单位是字节

messagemaxbytes =1000000

## broker 处理消息的最大线程数,一般情况下不需要去修改

numnetworkthreads =3

## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数

numiothreads =8

## 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改

backgroundthreads =4

## 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制

queuedmaxrequests =500

##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置

hostname

## 打广告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使用还未深究

advertisedhostname

## 广告地址端口,必须不同于port中的设置

advertisedport

## socket的发送缓冲区,socket的调优参数SO_SNDBUFF

socketsendbufferbytes =1001024

## socket的接受缓冲区,socket的调优参数SO_RCVBUFF

socketreceivebufferbytes =1001024

## socket请求的最大数值,防止serverOOM,messagemaxbytes必然要小于socketrequestmaxbytes,会被topic创建时的指定参数覆盖

socketrequestmaxbytes =10010241024

------------------------------------------- LOG 相关 -------------------------------------------

## topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖

logsegmentbytes =102410241024

## 这个参数会在日志segment没有达到logsegmentbytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖

logrollhours =247

## 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖

logcleanuppolicy = delete

## 数据存储的最大时间 超过这个时间 会根据logcleanuppolicy设置的策略处理数据,也就是消费端能够多久去消费数据

## logretentionbytes和logretentionminutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

logretentionminutes=7days

指定日志每隔多久检查看是否可以被删除,默认1分钟

logcleanupintervalmins=1

## topic每个分区的最大文件大小,一个topic的大小限制 = 分区数logretentionbytes 。-1没有大小限制

## logretentionbytes和logretentionminutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

logretentionbytes=-1

## 文件大小检查的周期时间,是否处罚 logcleanuppolicy中设置的策略

logretentioncheckintervalms=5minutes

## 是否开启日志压缩

logcleanerenable=false

## 日志压缩运行的线程数

logcleanerthreads =1

## 日志压缩时候处理的最大大小

logcleaneriomaxbytespersecond=None

## 日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好

logcleanerdedupebuffersize=50010241024

## 日志清理时候用到的IO块大小 一般不需要修改

logcleaneriobuffersize=5121024

## 日志清理中hash表的扩大因子 一般不需要修改

logcleaneriobufferloadfactor =09

## 检查是否处罚日志清理的间隔

logcleanerbackoffms =15000

## 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖

logcleanermincleanableratio=05

## 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同logretentionminutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖

logcleanerdeleteretentionms =1day

## 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖

logindexsizemaxbytes =1010241024

## 当执行一个fetch *** 作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数

logindexintervalbytes =4096

## log文件"sync"到磁盘之前累积的消息条数

## 因为磁盘IO *** 作是一个慢 *** 作,但又是一个"数据可靠性"的必要手段

## 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡

## 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)

## 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟

## 物理server故障,将会导致没有fsync的消息丢失

logflushintervalmessages=None

## 检查是否需要固化到硬盘的时间间隔

logflushschedulerintervalms =3000

## 仅仅通过interval来控制消息的磁盘写入时机,是不足的

## 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔

## 达到阀值,也将触发

logflushintervalms = None

## 文件在索引中清除后保留的时间 一般不需要去修改

logdeletedelayms =60000

## 控制上次固化硬盘的时间点,以便于数据恢复 一般不需要去修改

logflushoffsetcheckpointintervalms =60000

------------------------------------------- TOPIC 相关 -------------------------------------------

## 是否允许自动创建topic ,若是false,就需要通过命令创建topic

autocreatetopicsenable =true

## 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数

defaultreplicationfactor =1

## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖

numpartitions =1

实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。

----------------------------------复制(Leader、replicas) 相关 ----------------------------------

## partition leader与replicas之间通讯时,socket的超时时间

controllersockettimeoutms =30000

## partition leader与replicas数据同步时,消息的队列尺寸

controllermessagequeuesize=10

## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中

replicalagtimemaxms =10000

## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效

## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后

## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移

## 到其他follower中

## 在broker数量较少,或者网络不足的环境中,建议提高此值

replicalagmaxmessages =4000

##follower与leader之间的socket超时时间

replicasockettimeoutms=301000

## leader复制时候的socket缓存大小

replicasocketreceivebufferbytes=641024

## replicas每次获取数据的最大大小

replicafetchmaxbytes =10241024

## replicas同leader之间通信的最大等待时间,失败了会重试

replicafetchwaitmaxms =500

## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件

replicafetchminbytes =1

## leader 进行复制的线程数,增大这个数值会增加follower的IO

numreplicafetchers=1

## 每个replica检查是否将最高水位进行固化的频率

replicahighwatermarkcheckpointintervalms =5000

## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker

controlledshutdownenable =false

## 控制器关闭的尝试次数

controlledshutdownmaxretries =3

## 每次关闭尝试的时间间隔

controlledshutdownretrybackoffms =5000

## 是否自动平衡broker之间的分配策略

autoleaderrebalanceenable =false

## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡

leaderimbalanceperbrokerpercentage =10

## 检查leader是否不平衡的时间间隔

leaderimbalancecheckintervalseconds =300

## 客户端保留offset信息的最大空间大小

offsetmetadatamaxbytes

----------------------------------ZooKeeper 相关----------------------------------

##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3

zookeeperconnect = localhost:2181

## ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大

zookeepersessiontimeoutms=6000

## ZooKeeper的连接超时时间

zookeeperconnectiontimeoutms =6000

## ZooKeeper集群中leader和follower之间的同步实际那

zookeepersynctimems =2000

配置的修改

其中一部分配置是可以被每个topic自身的配置所代替,例如

新增配置

bin/kafka-topicssh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config maxmessagebytes=64000--config flushmessages=1

修改配置

bin/kafka-topicssh --zookeeper localhost:2181--alter --topic my-topic --config maxmessagebytes=128000

删除配置 :

bin/kafka-topicssh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig maxmessagebytes

二 CONSUMER 配置

最为核心的配置是groupid、zookeeperconnect

## Consumer归属的组ID,broker是根据groupid来判断是队列模式还是发布订阅模式,非常重要

 groupid

## 消费者的ID,若是没有设置的话,会自增

 consumerid

## 一个用于跟踪调查的ID ,最好同groupid相同

 clientid = group id value

## 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置

 zookeeperconnect=localhost:2182

## zookeeper的心跳超时时间,查过这个时间就认为是dead消费者

 zookeepersessiontimeoutms =6000

## zookeeper的等待连接时间

 zookeeperconnectiontimeoutms =6000

## zookeeper的follower同leader的同步时间

 zookeepersynctimems =2000

## 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常

 autooffsetreset = largest

## socket的超时时间,实际的超时时间是:maxfetchwait + sockettimeoutms

 sockettimeoutms=301000

## socket的接受缓存空间大小

 socketreceivebufferbytes=641024

##从每个分区获取的消息大小限制

 fetchmessagemaxbytes =10241024

## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset

 autocommitenable =true

## 自动提交的时间间隔

 autocommitintervalms =601000

## 用来处理消费消息的块,每个块可以等同于fetchmessagemaxbytes中数值

 queuedmaxmessagechunks =10

## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新

## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册

##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,

## 此值用于控制,注册节点的重试次数

 rebalancemaxretries =4

## 每次再平衡的时间间隔

 rebalancebackoffms =2000

## 每次重新选举leader的时间

 refreshleaderbackoffms

## server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求

 fetchminbytes =1

## 若是不满足最小大小(fetchminbytes)的话,等待消费端请求的最长等待时间

 fetchwaitmaxms =100

## 指定时间内没有消息到达就抛出异常,一般不需要改

 consumertimeoutms = -1

三 PRODUCER 的配置

比较核心的配置:metadatabrokerlist、requestrequiredacks、producertype、serializerclass

## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip

 metadatabrokerlist

##消息的确认模式

 ##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP

 ##1:发送消息,并会等待leader 收到确认后,一定的可靠性

 ## -1:发送消息,等待leader收到确认,并进行复制 *** 作后,才返回,最高的可靠性

 requestrequiredacks =0

## 消息发送的最长等待时间

 requesttimeoutms =10000

## socket的缓存大小

 sendbufferbytes=1001024

## key的序列化方式,若是没有设置,同serializerclass

 keyserializerclass

## 分区的策略,默认是取模

 partitionerclass=kafkaproducerDefaultPartitioner

## 消息的压缩模式,默认是none,可以有gzip和snappy

 compressioncodec = none

## 可以针对默写特定的topic进行压缩

 compressedtopics=null

## 消息发送失败后的重试次数

 messagesendmaxretries =3

## 每次失败后的间隔时间

 retrybackoffms =100

## 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据

 topicmetadatarefreshintervalms =6001000

## 用户随意指定,但是不能重复,主要用于跟踪记录消息

 clientid=""

------------------------------------------- 消息模式 相关 -------------------------------------------

 ## 生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送

 producertype=sync

## 异步模式下,那么就会在设置的时间缓存消息,并一次性发送

 queuebufferingmaxms =5000

## 异步的模式下 最长等待的消息数

 queuebufferingmaxmessages =10000

## 异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃

 queueenqueuetimeoutms = -1

## 异步模式下,每次发送的最大消息数,前提是触发了queuebufferingmaxmessages或是queuebufferingmaxms的限制

 batchnummessages=200

## 消息体的系列化处理类 ,转化为字节流进行传输

 serializerclass= kafkaserializerDefaultEncoder

在上一篇文章当中,也算是比较详细且通俗的聊了聊Flink是如何通过checkpoint机制来完成数据精准一次性的实现的。并且也在上一章的结尾表示,要在接下来聊一聊Flink与它的铁哥们Kafaka之间,是如何实现数据的精准一次性消费的。

本次的聊法,还是要通过以kafka(source)->Flink,Flink(source)->Kafka来分别展开讨论。

kafka是一个具有数据保存、数据回放能力的消息队列,说白了就是kafka中的每一个数据,都有一个专门的标记作为标识。而在Flink消费kafka传入的数据的时候,source任务就能够将这个偏移量以算子状态的角色进行保存,写入到设定好的检查点中。这样一旦发生故障,Flink中的FlinkKafkaProduce连接器就i能够按照自己保存的偏移量,自己去Kafka中重新拉取数据,也正是通过这种方式,就能够确保Kafka到Flink之间的精准一次性。

在上一篇文章当中,已经表明了,如果想要让输出端能够进行精准一次性消费,就需要使用到幂等性或者是事务。而事务中的两阶段提交是所有方案里面最好的实现。

其实Flink到Kafak之间也是采用了这种方式,具体的可以看一下ctrl进到FlinkKafkaProduce连接器内部去看一看:

这也就表明了,当数据通过Flink发送给sink端Kafka的时候,是经历了两个阶段的处理的。第一阶段就是Flink向Kafka中插入数据,进入预提交阶段。当JobManager发送的Ckeckpoint保存成功信号过来之后,才会提交事务进行正式的数据发送,也就是让原来不可用的数据可以被使用了。

这个实现过程到目前阶段就很清晰了,它的主体流程无非就是在开启检查点之后,由JobManager向各个阶段的处理逻辑发送有关于检查点的barrier。所有的计算任务接收到之后,就会根据自己当前的状态做一个检查点保存。而当这个barrier来到sink任务的时候,sink就会开启一个事务,然后通过这个事务向外预写数据。直到Jobmanager来告诉它这一次的检查点已经保存完成了,sink就会进行第二次提交,数据也就算是成功写出了。

1必须要保证检查点被打开了,如果检查点没有打开,那么之前说的一切话都是空谈。因为Flink默认检查点是关着的。

2在FlinkKafakProducer连接器的构造函数中要传入参数,这个参数就是用来保证状态一致性的。就是在构造函数的最后一个参数输入如下:

3配置Kafka读取数据的隔离级别

在kafka中有个配置,这个配置用来管理Kafka读取数据的级别。而这个配置默认是能够读取预提交阶段的数据的,所以如果你没改这个配置,那两阶段提交的第一阶段就是白费了。所以需要改一下这个配置,来更换一下隔离级别:

4事务超时时间

这个配置也很有意思,大家试想一下。如果要进行两阶段提交,就要保证sink端支持事务,Kafka是支持事务的,但是像这个组件对于很多机制都有一个超时时间的概念,也就是说如果时间到了这个界限还没完成工作,那就会默认这个工作失败。Kafka中由这个概念,Flink中同样由这个概念。但是flink默认超时时间是1小时,而Kafka默认是15分钟,这就有可能出现检查点保存东西的时间大于15分钟,假如说是16分钟保存完成然后给sink发送检查点保存陈功可以提交事务的信号,但是这个时候Kafka已经认为事务失败,把之前的数据都扔了。那数据不就是丢失了么。所以说Kafka的超时时间要大于Flink的超时时间才好。

截止到目前为止,基本上把有关于状态维护的一些东西都说完了,有状态后端、有检查点。还通过检查点完成可端到端的数据精准一次性消费。但是想到这我又感觉,如果有学习进度比我差一些的,万一没办法很好的理解怎么办。所以在下一篇文章当中我就聊聊Flink中的“状态”到底是个什么东西,都有什么类型,都怎么去用。

小马最近学习了《深入理解kafka 核心设计与实践原理》朱忠华 著 一书,机缘巧合中又看到了这篇文章,觉得整理得很是详细和全面,图文并茂很直观,在此摘录。

精华总结:依靠主题分区来类似分库分表的方式提高性能,用 副本主从 同步+ ISR(偏移量和HW) 来保证消息队列的可靠性,消费者提交 消费位移 来保证消息不丢失和重复消费等,用ZK来处理 服务发现 ,负载均衡,选举,集群管理,消费位移记录(以被推荐记录于kafka主题内)等。

HW之前的消息才能被消费者拉取,理解为都同步备份完了,才算生产者消息提交成功,对消费者可见。这种ISR机制影响了性能但是保证了可靠性,保证消息不丢失。消费位移提交,默认的是自动提交,异常下消息会重复消费会丢失,但可以参数配置手动提交,自行在业务处理完再提交。消费者拉的方式自主获取消费,便于消费者自行控制消费速率。默认分区规则是哈希一致性方式。

相比 Redis消息队列 本身的可靠性就不如,被消费者拉取完就认为消费完了,消息丢失,所以一般需要自行维护ack机制。

Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。即使是普通的服务器, Kafka也可以轻松支持每秒百万级的写入请求 ,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。 Kafka速度的秘诀在于 ,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。

一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡,在下一课介绍)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。

Kafka 节点的 broker涉及 Topic、Partition 两个重要概念

在 Kafka 架构中,有几个术语:

Producer :生产者,即消息发送者,push 消息到 Kafka 集群中的 broker(就是 server)中;

Broker :Kafka 集群由多个 Kafka 实例(server) 组成,每个实例构成一个 broker,说白了就是服务器;

Topic :producer 向 kafka 集群 push 的消息会被归于某一类别,即Topic,这本质上只是一个逻辑概念,面向的对象是 producer 和 consumer,producer 只需要关注将消息 push 到哪一个 Topic 中,而 consumer 只需要关心自己订阅了哪个 Topic;

Partition :每一个 Topic 又被分为多个 Partitions,即物理分区;出于负载均衡的考虑,同一个 Topic 的 Partitions 分别存储于 Kafka 集群的多个 broker 上;而为了提高可靠性,这些 Partitions 可以由 Kafka 机制中的 replicas 来设置备份的数量;如上面的框架图所示,每个 partition 都存在两个备份;

Consumer :消费者,从 Kafka 集群的 broker 中 pull 消息、消费消息;

Consumer group :high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;

replicas :partition 的副本,保障 partition 的高可用;

leader :replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;

follower :replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;

controller :Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;

ZooKeeper :Kafka 通过 ZooKeeper 来存储集群的 meta 信息等,文中将详述。

一个 topic 可以认为是一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。 Kafka 机制中,producer push 来的消息是追加(append)到 partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存,如下示意图:

Kafka 中 topic 的每个 partition 有一个预写式的日志文件,虽然 partition 可以继续细分为若干个 segment 文件,但是对于上层应用来说,仍然可以将 partition 看成最小的存储单元(一个有多个 segment 文件拼接的 “巨型” 文件),每个 partition 都由一些列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。

上图中有两个新名词:HW 和 LEO。这里先介绍下 LEO,LogEndOffset 的缩写,表示每个 partition 的 log 最后一条 Message 的位置。HW 是 HighWatermark 的缩写,是指 consumer 能够看到的此 partition 的位置,这个涉及到多副本的概念,这里先提及一下,下文再详述。

言归正传,为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。如下图所示,Kafka 集群中有 4 个 broker,某 topic 有 3 个 partition,且复制因子即副本个数也为 3:

如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。

上一节中讲到了同步副本队列 ISR(In-Sync Replicas)。虽然副本极大的增强了可用性,但是副本数量对 Kafka 的吞吐率有一定影响。默认情况下 Kafka 的 replica 数量为 1,即每个 partition 都只有唯一的 leader,无 follower,没有容灾能力。为了确保消息的可靠性,生产环境中,通常将其值(由 broker 的参数 offsetstopicreplicationfactor 指定)大小设置为大于 1,比如 3。 所有的副本(replicas)统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 维护 ISR 列表,follower 从 leader 同步数据有一些延迟(由参数 replicalagtimemaxms 设置超时阈值),超过阈值的 follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在 OSR 中。AR=ISR+OSR。

上面一节还涉及到一个概念,即 HW。HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。另外每个 replica 都有 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。

下图详细的说明了当 producer 生产消息至 broker 后,ISR 以及 HW 和 LEO 的流转过程:

由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式受限于复制最慢的 follower,会极大的影响吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据,降低可靠性。而 Kafka 使用 ISR 的策略则在可靠性和吞吐率方面取得了较好的平衡。

Kafka 的 ISR 的管理最终都会反馈到 ZooKeeper 节点上,具体位置为:

/brokers/topics/[topic]/partitions/[partition]/state

目前,有两个地方会对这个 ZooKeeper 的节点进行维护。

Controller 来维护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 写入 ZooKeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。

leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR,如果发现 ISR 变化,则会将新的 ISR 的信息返回到 ZooKeeper 的相关节点中。

考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:follower1 同步了消息 4、5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader,那么此时 follower1 中的多出的消息 5 该做如何处理呢?

类似于木桶原理,水位取决于最低那块短板。

如上图,某个 topic 的某 partition 有三个副本,分别为 A、B、C。A 作为 leader 肯定是 LEO 最高,B 紧随其后,C 机器由于配置比较低,网络比较差,故而同步最慢。这个时候 A 机器宕机,这时候如果 B 成为 leader,假如没有 HW,在 A 重新恢复之后会做同步(makeFollower) *** 作,在宕机时 log 文件之后直接做追加 *** 作,而假如 B 的 LEO 已经达到了 A 的 LEO,会产生数据不一致的情况,所以使用 HW 来避免这种情况。 A 在做同步 *** 作的时候,先将 log 文件截断到之前自己的 HW 的位置,即 3,之后再从 B 中拉取消息进行同步。

如果失败的 follower 恢复过来,它首先将自己的 log 文件截断到上次 checkpointed 时刻的 HW 的位置,之后再从 leader 中同步消息。leader 挂掉会重新选举,新的 leader 会发送 “指令” 让其余的 follower 截断至自身的 HW 的位置然后再拉取新的消息。

当 ISR 中的个副本的 LEO 不一致时,如果此时 leader 挂掉,选举新的 leader 时并不是按照 LEO 的高低进行选举,而是按照 ISR 中的顺序选举。

在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的 消费进度 Offset 记录到 ZooKeeper上 ,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。Offset 在 ZooKeeper 中由一个专门节点进行记录,其节点路径为:

#节点内容就是Offset的值。/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

PS:Kafka 已推荐将 consumer 的 Offset 信息保存在 Kafka 内部的 topic 中,即:

__consumer_offsets(/brokers/topics/__consumer_offsets)

并且默认提供了 kafka_consumer_groupssh 脚本供用户查看consumer 信息(命令:sh kafka-consumer-groupssh –bootstrap-server –describe –group )。在当前版本中,offset 存储方式要么存储在本地文件中,要么存储在 broker 端,具体的存储方式取决 offsetstoremethod 的配置,默认是存储在 broker 端。

在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:broker 注册、topic 注册、producer 和 consumer 负载均衡、维护 partition 与 consumer 的关系、记录消息消费的进度以及 consumer 注册等。

参考原文:

再谈基于 Kafka 和 ZooKeeper 的分布式消息队列原理

spark streaming从12开始提供了数据的零丢失,想享受这个特性,需要满足如下条件: 1数据输入需要可靠的sources和可靠的receivers 2应用metadata必须通过应用driver checkpoint 3WAL(write ahead log)

需求就是将流量数据(json格式)中某个接口数据抽取一下。如:有个identityUri="yiyang/user/getById/13782" , 这里的13782,是个userId,我们需要将其处理成 identityUri="yiyang/user/getById/{}"

实际上我们生产中是将二者接口使用的。先使用2,如果没有匹配到,在使用1

这里是演示flink kafka的用法,我们简单使用正则处理

注意:kafka消费的方式是: kafkaConsumersetStartFromGroupOffsets();

看下上面的启动日志,有这样的信息:Resetting offset for partition yiyang-0 to offset 22

我们另外启动一个程序,发送消息,并消费两个topic中的数据

看下 ConsumeKafkaTest 中的日志

在看下另外一个服务(消费两个topic数据)的日志:

说明已经成功的把处理好的消息发送到另外一个topic中了

关于数据处理,如果只是简单的增加字段,减少字段,正则替换,也可以使用logstash工具

使用kafkapython读取实时数据小例子 使用kafkapython读取实时数据小例子 from kafka import KafkaConsumer from kafkaclient import KafkaClient imp

不过要注意一些注意事项,对于多个partition和多个consumer

1 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数

2 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀

最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目

3 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

4 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

5 High-level接口中获取不到数据的时候是会block的

简单版,

简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置

因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset

Properties props = new Properties();

propsput("autooffsetreset", "smallest"); //必须要加,如果要读旧数据

propsput("zookeeperconnect", "localhost:2181");

propsput("groupid", "pv");

propsput("zookeepersessiontimeoutms", "400");

propsput("zookeepersynctimems", "200");

propsput("autocommitintervalms", "1000");

ConsumerConfig conf = new ConsumerConfig(props);

ConsumerConnector consumer = kafkaconsumerConsumercreateJavaConsumerConnector(conf);

String topic = "page_visits";

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMapput(topic, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic);

KafkaStream<byte[], byte[]> stream = streamsget(0);

ConsumerIterator<byte[], byte[]> it = streamiterator();

while (ithasNext()){

Systemoutprintln("message: " + new String(itnext()message()));

}

if (consumer != null) consumershutdown(); //其实执行不到,因为上面的hasNext会block

在用high-level的consumer时,两个给力的工具,

1 bin/kafka-run-classsh kafkatoolsConsumerOffsetChecker --group pv

可以看到当前group offset的状况,比如这里看pv的状况,3个partition

Group Topic Pid Offset logSize Lag Owner

pv page_visits 0 21 21 0 none

pv page_visits 1 19 19 0 none

pv page_visits 2 20 20 0 none

关键就是offset,logSize和Lag

这里以前读完了,所以offset=logSize,并且Lag=0

2 bin/kafka-run-classsh kafkatoolsUpdateOffsetsInZK earliest config/consumerproperties page_visits

3个参数,

[earliest | latest],表示将offset置到哪里

consumerproperties ,这里是配置文件的路径

topic,topic名,这里是page_visits

我们对上面的pv group执行完这个 *** 作后,再去check group offset状况,结果如下,

Group Topic Pid Offset logSize Lag Owner

pv page_visits 0 0 21 21 none

pv page_visits 1 0 19 19 none

pv page_visits 2 0 20 20 none

可以看到offset已经被清0,Lag=logSize

底下给出原文中多线程consumer的完整代码

import kafkaconsumerConsumerConfig;

import kafkaconsumerKafkaStream;

import kafkajavaapiconsumerConsumerConnector;

import javautilHashMap;

import javautilList;

import javautilMap;

import javautilProperties;

import javautilconcurrentExecutorService;

import javautilconcurrentExecutors;

public class ConsumerGroupExample {

private final ConsumerConnector consumer;

private final String topic;

private ExecutorService executor;

public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {

consumer = kafkaconsumerConsumercreateJavaConsumerConnector( // 创建Connector,注意下面对conf的配置

createConsumerConfig(a_zookeeper, a_groupId));

thistopic = a_topic;

}

public void shutdown() {

if (consumer != null) consumershutdown();

if (executor != null) executorshutdown();

}

public void run(int a_numThreads) { // 创建并发的consumers

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMapput(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap); // 创建Streams

List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic); // 每个线程对应于一个KafkaStream

// now launch all the threads

//

executor = ExecutorsnewFixedThreadPool(a_numThreads);

// now create an object to consume the messages

//

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executorsubmit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread

threadNumber++;

}

}

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {

Properties props = new Properties();

propsput("zookeeperconnect", a_zookeeper);

propsput("groupid", a_groupId);

propsput("zookeepersessiontimeoutms", "400");

propsput("zookeepersynctimems", "200");

propsput("autocommitintervalms", "1000");

return new ConsumerConfig(props);

}

public static void main(String[] args) {

String zooKeeper = args[0];

String groupId = args[1];

String topic = args[2];

int threads = IntegerparseInt(args[3]);

ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);

examplerun(threads);

try {

Threadsleep(10000);

} catch (InterruptedException ie) {

}

exampleshutdown();

}

}

SimpleConsumer

另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口

参考,

什么时候用这个接口

Read a message multiple times

Consume only a subset of the partitions in a topic in a process

Manage transactions to make sure a message is processed once and only once

当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦

所以不是一定要用,最好别用

You must keep track of the offsets in your application to know where you left off consuming

You must figure out which Broker is the lead Broker for a topic and partition

You must handle Broker leader changes

使用SimpleConsumer的步骤:

Find an active Broker and find out which Broker is the leader for your topic and partition

Determine who the replica Brokers are for your topic and partition

Build the request defining what data you are interested in

Fetch the data

Identify and recover from leader changes

首先,你必须知道读哪个topic的哪个partition

然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker

再者,自己去写request并fetch数据

最终,还要注意需要识别和处理broker leader的改变

以上就是关于kafka 配置文件参数详解全部的内容,包括:kafka 配置文件参数详解、4.一文搞定:Flink与Kafka之间的精准一次性、[转载]kafka入门笔记等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/web/10136226.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-05
下一篇 2023-05-05

发表评论

登录后才能评论

评论列表(0条)

保存