kafka 配置文件参数详解

kafka 配置文件参数详解,第1张

kafka的配置分为 broker、producter、consumer三个不同的配置

一 BROKER 的全局配置

最为核心的三个配置 brokerid、logdir、zookeeperconnect 。

------------------------------------------- 系统 相关 -------------------------------------------

##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变brokerid的话不会影响consumers

brokerid =1

##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2

logdirs = /tmp/kafka-logs

##提供给客户端响应的端口

port =6667

##消息体的最大大小,单位是字节

messagemaxbytes =1000000

## broker 处理消息的最大线程数,一般情况下不需要去修改

numnetworkthreads =3

## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数

numiothreads =8

## 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改

backgroundthreads =4

## 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制

queuedmaxrequests =500

##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置

hostname

## 打广告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使用还未深究

advertisedhostname

## 广告地址端口,必须不同于port中的设置

advertisedport

## socket的发送缓冲区,socket的调优参数SO_SNDBUFF

socketsendbufferbytes =1001024

## socket的接受缓冲区,socket的调优参数SO_RCVBUFF

socketreceivebufferbytes =1001024

## socket请求的最大数值,防止serverOOM,messagemaxbytes必然要小于socketrequestmaxbytes,会被topic创建时的指定参数覆盖

socketrequestmaxbytes =10010241024

------------------------------------------- LOG 相关 -------------------------------------------

## topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖

logsegmentbytes =102410241024

## 这个参数会在日志segment没有达到logsegmentbytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖

logrollhours =247

## 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖

logcleanuppolicy = delete

## 数据存储的最大时间 超过这个时间 会根据logcleanuppolicy设置的策略处理数据,也就是消费端能够多久去消费数据

## logretentionbytes和logretentionminutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

logretentionminutes=7days

指定日志每隔多久检查看是否可以被删除,默认1分钟

logcleanupintervalmins=1

## topic每个分区的最大文件大小,一个topic的大小限制 = 分区数logretentionbytes 。-1没有大小限制

## logretentionbytes和logretentionminutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

logretentionbytes=-1

## 文件大小检查的周期时间,是否处罚 logcleanuppolicy中设置的策略

logretentioncheckintervalms=5minutes

## 是否开启日志压缩

logcleanerenable=false

## 日志压缩运行的线程数

logcleanerthreads =1

## 日志压缩时候处理的最大大小

logcleaneriomaxbytespersecond=None

## 日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好

logcleanerdedupebuffersize=50010241024

## 日志清理时候用到的IO块大小 一般不需要修改

logcleaneriobuffersize=5121024

## 日志清理中hash表的扩大因子 一般不需要修改

logcleaneriobufferloadfactor =09

## 检查是否处罚日志清理的间隔

logcleanerbackoffms =15000

## 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖

logcleanermincleanableratio=05

## 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同logretentionminutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖

logcleanerdeleteretentionms =1day

## 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖

logindexsizemaxbytes =1010241024

## 当执行一个fetch *** 作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数

logindexintervalbytes =4096

## log文件"sync"到磁盘之前累积的消息条数

## 因为磁盘IO *** 作是一个慢 *** 作,但又是一个"数据可靠性"的必要手段

## 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡

## 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)

## 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟

## 物理server故障,将会导致没有fsync的消息丢失

logflushintervalmessages=None

## 检查是否需要固化到硬盘的时间间隔

logflushschedulerintervalms =3000

## 仅仅通过interval来控制消息的磁盘写入时机,是不足的

## 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔

## 达到阀值,也将触发

logflushintervalms = None

## 文件在索引中清除后保留的时间 一般不需要去修改

logdeletedelayms =60000

## 控制上次固化硬盘的时间点,以便于数据恢复 一般不需要去修改

logflushoffsetcheckpointintervalms =60000

------------------------------------------- TOPIC 相关 -------------------------------------------

## 是否允许自动创建topic ,若是false,就需要通过命令创建topic

autocreatetopicsenable =true

## 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数

defaultreplicationfactor =1

## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖

numpartitions =1

实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。

----------------------------------复制(Leader、replicas) 相关 ----------------------------------

## partition leader与replicas之间通讯时,socket的超时时间

controllersockettimeoutms =30000

## partition leader与replicas数据同步时,消息的队列尺寸

controllermessagequeuesize=10

## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中

replicalagtimemaxms =10000

## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效

## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后

## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移

## 到其他follower中

## 在broker数量较少,或者网络不足的环境中,建议提高此值

replicalagmaxmessages =4000

##follower与leader之间的socket超时时间

replicasockettimeoutms=301000

## leader复制时候的socket缓存大小

replicasocketreceivebufferbytes=641024

## replicas每次获取数据的最大大小

replicafetchmaxbytes =10241024

## replicas同leader之间通信的最大等待时间,失败了会重试

replicafetchwaitmaxms =500

## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件

replicafetchminbytes =1

## leader 进行复制的线程数,增大这个数值会增加follower的IO

numreplicafetchers=1

## 每个replica检查是否将最高水位进行固化的频率

replicahighwatermarkcheckpointintervalms =5000

## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker

controlledshutdownenable =false

## 控制器关闭的尝试次数

controlledshutdownmaxretries =3

## 每次关闭尝试的时间间隔

controlledshutdownretrybackoffms =5000

## 是否自动平衡broker之间的分配策略

autoleaderrebalanceenable =false

## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡

leaderimbalanceperbrokerpercentage =10

## 检查leader是否不平衡的时间间隔

leaderimbalancecheckintervalseconds =300

## 客户端保留offset信息的最大空间大小

offsetmetadatamaxbytes

----------------------------------ZooKeeper 相关----------------------------------

##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3

zookeeperconnect = localhost:2181

## ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大

zookeepersessiontimeoutms=6000

## ZooKeeper的连接超时时间

zookeeperconnectiontimeoutms =6000

## ZooKeeper集群中leader和follower之间的同步实际那

zookeepersynctimems =2000

配置的修改

其中一部分配置是可以被每个topic自身的配置所代替,例如

新增配置

bin/kafka-topicssh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config maxmessagebytes=64000--config flushmessages=1

修改配置

bin/kafka-topicssh --zookeeper localhost:2181--alter --topic my-topic --config maxmessagebytes=128000

删除配置 :

bin/kafka-topicssh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig maxmessagebytes

二 CONSUMER 配置

最为核心的配置是groupid、zookeeperconnect

## Consumer归属的组ID,broker是根据groupid来判断是队列模式还是发布订阅模式,非常重要

 groupid

## 消费者的ID,若是没有设置的话,会自增

 consumerid

## 一个用于跟踪调查的ID ,最好同groupid相同

 clientid = group id value

## 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置

 zookeeperconnect=localhost:2182

## zookeeper的心跳超时时间,查过这个时间就认为是dead消费者

 zookeepersessiontimeoutms =6000

## zookeeper的等待连接时间

 zookeeperconnectiontimeoutms =6000

## zookeeper的follower同leader的同步时间

 zookeepersynctimems =2000

## 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常

 autooffsetreset = largest

## socket的超时时间,实际的超时时间是:maxfetchwait + sockettimeoutms

 sockettimeoutms=301000

## socket的接受缓存空间大小

 socketreceivebufferbytes=641024

##从每个分区获取的消息大小限制

 fetchmessagemaxbytes =10241024

## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset

 autocommitenable =true

## 自动提交的时间间隔

 autocommitintervalms =601000

## 用来处理消费消息的块,每个块可以等同于fetchmessagemaxbytes中数值

 queuedmaxmessagechunks =10

## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新

## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册

##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,

## 此值用于控制,注册节点的重试次数

 rebalancemaxretries =4

## 每次再平衡的时间间隔

 rebalancebackoffms =2000

## 每次重新选举leader的时间

 refreshleaderbackoffms

## server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求

 fetchminbytes =1

## 若是不满足最小大小(fetchminbytes)的话,等待消费端请求的最长等待时间

 fetchwaitmaxms =100

## 指定时间内没有消息到达就抛出异常,一般不需要改

 consumertimeoutms = -1

三 PRODUCER 的配置

比较核心的配置:metadatabrokerlist、requestrequiredacks、producertype、serializerclass

## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip

 metadatabrokerlist

##消息的确认模式

 ##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP

 ##1:发送消息,并会等待leader 收到确认后,一定的可靠性

 ## -1:发送消息,等待leader收到确认,并进行复制 *** 作后,才返回,最高的可靠性

 requestrequiredacks =0

## 消息发送的最长等待时间

 requesttimeoutms =10000

## socket的缓存大小

 sendbufferbytes=1001024

## key的序列化方式,若是没有设置,同serializerclass

 keyserializerclass

## 分区的策略,默认是取模

 partitionerclass=kafkaproducerDefaultPartitioner

## 消息的压缩模式,默认是none,可以有gzip和snappy

 compressioncodec = none

## 可以针对默写特定的topic进行压缩

 compressedtopics=null

## 消息发送失败后的重试次数

 messagesendmaxretries =3

## 每次失败后的间隔时间

 retrybackoffms =100

## 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据

 topicmetadatarefreshintervalms =6001000

## 用户随意指定,但是不能重复,主要用于跟踪记录消息

 clientid=""

------------------------------------------- 消息模式 相关 -------------------------------------------

 ## 生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送

 producertype=sync

## 异步模式下,那么就会在设置的时间缓存消息,并一次性发送

 queuebufferingmaxms =5000

## 异步的模式下 最长等待的消息数

 queuebufferingmaxmessages =10000

## 异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃

 queueenqueuetimeoutms = -1

## 异步模式下,每次发送的最大消息数,前提是触发了queuebufferingmaxmessages或是queuebufferingmaxms的限制

 batchnummessages=200

## 消息体的系列化处理类 ,转化为字节流进行传输

 serializerclass= kafkaserializerDefaultEncoder

生产者向broker发送消息,消费者接收消息,broker是物理概念,部署几个kafka即几个broker,topic是逻辑概念,往topic里发送消息会发送到设置好的几个partion上,每个partion存储作为不同队列存储不同数据,partion有leader和follower备份机制,消息发送时会轮循发送到不同broker的不同partion中,同一消费者只能消费同一分区,通过offset记录消费位置,消费者组可以访问一个topic的不同partion

启动镜像

启动kafka可以带上参数,这样会自动修改kafka里的配置文件(/opt/kafka_版本/conf/serverproperties),否则不带参数需要自己进入进行手动修改 带参数版启动可参考

其中1721703需要改成自己docker的网桥连接地址

查看已启动容器

查看所有容器

启动未启动的容器

进入kafka容器

创建主题

主题和分区可以理解为:topic是逻辑划分,kafka通过topic进行区分消息,topic的数据会被存储到日志中,如果数据量太大可以引入partion(同时提高读写吞吐量)来分段存储数据。其中replication-factor作用是将任意分区复制到broker上,broker是物理概念,部署了一个kafka可认为broker数为1,我本机只有一个kafka所以这里replication-factor超过1会报错。 综上几个概念可以理解为:集群中有多个broker,创建主题时可以指明topic有多个partitions(消息拆分到不同分区进行存储,一个partion只能被一个消费者消费--partion内部保证接收数据顺序),可以为分区创建多个副本replication,不同副本在不同的broker中(作为备份使用,这里有leader和flower的区分)

查看topic信息

集群部署

可以通过compose集群化部署过es,这里通过创建另一个composeyml文件来部署kafka,配置文件参考 docker-compose集群部署

生产者:

消费者:

方式一:从当前主题的迁移量位置+1开始取数据

方式二:从当前主题第一条消息开始消费

生产者将消息发送broker,broker将消息保存到本地日志中,消息的保存时有序的

单播消息:

当存在一个生产者,一个消费者组的时候,一个消费者组中只有一个消费者会收到消息

多播消息:

当存在一个生产者,多个消费组,不同消费组只有一个消费者收到消息

查看消费组详细信息:

CURRENT-OFFSET:最后被消费的偏移量

LOG-END-OFFSET:消息总量(最后一条消息的偏移量)

LAG :积压了多少条消息

常见问题:

1、如何防止消息丢失

生产者:使用同步消息发送;ack设置为1/all;设置同步分区数>=2

消费者:把自动提交改成手动提交

2、如何防止消息的重复消费

针对网络抖动导致的生产者重试(发送消息),可以设置消费者加锁解决;

3、消息积压

消费者使用多线程异步处理接收数据;创建多个消费者组部署到其他机器上;通过业务架构设计,提升业务层面消费性能。

ps:

缓冲区:kafka默认会创建一个消息缓冲区去存放要发送的消息,大小是32M,每次本地线程会去缓冲区拉16K数据发送到broker,如果不到16K等待10ms也会将数据发送到broker

参考链接:

1、kafka安装教程--推荐

2、kafka配置文件serverproperties参数说明

3、创建主题分区数

4、解决docker容器启动不了的问题

5、通过docker-compose集群部署

6、学习视频

kafka⾼性能,是多⽅⾯协同的结果,包括宏观架构、分布式partition存储、ISR数据同步、以及“⽆所不⽤其极”的

⾼效利⽤磁盘/ *** 作系统特性。

零拷⻉并不是不需要拷⻉,⽽是减少不必要的拷⻉次数。通常是说在IO读写过程中。

nginx的⾼性能也有零拷⻉的身影。

传统IO

⽐如:读取⽂件,socket发送

传统⽅式实现:先读取、再发送,实际经过1~4四次copy。

1、第⼀次:将磁盘⽂件,读取到 *** 作系统内核缓冲区;

2、第⼆次:将内核缓冲区的数据,copy到application应⽤程序的buffer;

3、第三步:将application应⽤程序buffer中的数据,copy到socket⽹络发送缓冲区(属于 *** 作系统内核的缓冲区);

4、第四次:将socket buffer的数据,copy到⽹络协议栈,由⽹卡进⾏⽹络传输。

实际IO读写,需要进⾏IO中断,需要CPU响应中断(内核态到⽤户态转换),尽管引⼊DMA(Direct Memory

Access,直接存储器访问)来接管CPU的中断请求,但四次copy是存在“不必要的拷⻉”的。

实际上并不需要第⼆个和第三个数据副本。数据可以直接从读缓冲区传输到套接字缓冲区。

kafka的两个过程:

1、⽹络数据持久化到磁盘 (Producer 到 Broker)

2、磁盘⽂件通过⽹络发送(Broker 到 Consumer)

数据落盘通常都是⾮实时的,Kafka的数据并不是实时的写⼊硬盘,它充分利⽤了现代 *** 作系统分⻚存储来利⽤内

存提⾼I/O效率。

磁盘⽂件通过⽹络发送(Broker 到 Consumer)

磁盘数据通过DMA(Direct Memory Access,直接存储器访问)拷⻉到内核态 Buffer

直接通过 DMA 拷⻉到 NIC Buffer(socket buffer),⽆需 CPU 拷⻉。

除了减少数据拷⻉外,整个读⽂件 ==> ⽹络发送由⼀个 sendfile 调⽤完成,整个过程只有两次上下⽂切换,因此⼤⼤提⾼了性能。

Java NIO对sendfile的⽀持就是FileChanneltransferTo()/transferFrom()。

fileChanneltransferTo( position, count, socketChannel);

把磁盘⽂件读取OS内核缓冲区后的fileChannel,直接转给socketChannel发送;底层就是sendfile。消费者从broker读取数据,就是由此实现。

具体来看,Kafka 的数据传输通过 TransportLayer 来完成,其⼦类PlaintextTransportLayer 通过Java NIO 的FileChannel 的 transferTo 和 transferFrom ⽅法实现零拷⻉。

页缓存是 *** 作系统实现的⼀种主要的磁盘缓存,以此⽤来减少对磁盘 I/O 的 *** 作。

具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。

Kafka接收来⾃socket buffer的⽹络数据,应⽤进程不需要中间处理、直接进⾏持久化时。可以使⽤mmap内存⽂件映射。

Memory Mapped Files

简称mmap,简单描述其作⽤就是:将磁盘⽂件映射到内存, ⽤户通过修改内存就能修改磁盘⽂件。

它的⼯作原理是直接利⽤ *** 作系统的Page来实现磁盘⽂件到物理内存的直接映射。完成映射之后你对物理内存的 *** 作会被同步到硬盘上( *** 作系统在适当的时候)。

通过mmap,进程像读写硬盘⼀样读写内存(当然是虚拟机内存)。使⽤这种⽅式可以获取很⼤的I/O提升,省去了⽤户空间到内核空间复制的开销。

mmap也有⼀个很明显的缺陷:不可靠,写到mmap中的数据并没有被真正的写到硬盘, *** 作系统会在程序主动调⽤flush的时候才把数据真正的写到硬盘。

Kafka提供了⼀个参数 producertype 来控制是不是主动flush;

如果Kafka写⼊到mmap之后就⽴即flush然后再返回Producer叫同步(sync);

写⼊mmap之后⽴即返回Producer不调⽤flush叫异步(async)。

Java NIO对⽂件映射的⽀持Java NIO,提供了⼀个MappedByteBuffer 类可以⽤来实现内存映射。

MappedByteBuffer只能通过调⽤FileChannel的map()取得,再没有其他⽅式。

FileChannelmap()是抽象⽅法,具体实现是在 FileChannelImplmap()可⾃⾏查看JDK源码,其map0()⽅法就是调⽤了Linux内核的mmap的API。

*** 作系统可以针对线性读写做深层次的优化,⽐如预读(read-ahead,提前将⼀个⽐较⼤的磁盘块读⼊内存) 和后写(write-behind,将很多⼩的逻辑写 *** 作合并起来组成⼀个⼤的物理写 *** 作)技术。

Kafka 在设计时采用了文件追加的⽅式来写⼊消息,即只能在⽇志⽂件的尾部追加新的消 息,并且也不允许修改已写⼊的消息,这种⽅式属于典型的顺序写盘的 *** 作,所以就算 Kafka 使⽤磁盘作为存储介质,也能承载⾮常⼤的吞吐量。

mmap和sendfile:

在上一篇文章当中,也算是比较详细且通俗的聊了聊Flink是如何通过checkpoint机制来完成数据精准一次性的实现的。并且也在上一章的结尾表示,要在接下来聊一聊Flink与它的铁哥们Kafaka之间,是如何实现数据的精准一次性消费的。

本次的聊法,还是要通过以kafka(source)->Flink,Flink(source)->Kafka来分别展开讨论。

kafka是一个具有数据保存、数据回放能力的消息队列,说白了就是kafka中的每一个数据,都有一个专门的标记作为标识。而在Flink消费kafka传入的数据的时候,source任务就能够将这个偏移量以算子状态的角色进行保存,写入到设定好的检查点中。这样一旦发生故障,Flink中的FlinkKafkaProduce连接器就i能够按照自己保存的偏移量,自己去Kafka中重新拉取数据,也正是通过这种方式,就能够确保Kafka到Flink之间的精准一次性。

在上一篇文章当中,已经表明了,如果想要让输出端能够进行精准一次性消费,就需要使用到幂等性或者是事务。而事务中的两阶段提交是所有方案里面最好的实现。

其实Flink到Kafak之间也是采用了这种方式,具体的可以看一下ctrl进到FlinkKafkaProduce连接器内部去看一看:

这也就表明了,当数据通过Flink发送给sink端Kafka的时候,是经历了两个阶段的处理的。第一阶段就是Flink向Kafka中插入数据,进入预提交阶段。当JobManager发送的Ckeckpoint保存成功信号过来之后,才会提交事务进行正式的数据发送,也就是让原来不可用的数据可以被使用了。

这个实现过程到目前阶段就很清晰了,它的主体流程无非就是在开启检查点之后,由JobManager向各个阶段的处理逻辑发送有关于检查点的barrier。所有的计算任务接收到之后,就会根据自己当前的状态做一个检查点保存。而当这个barrier来到sink任务的时候,sink就会开启一个事务,然后通过这个事务向外预写数据。直到Jobmanager来告诉它这一次的检查点已经保存完成了,sink就会进行第二次提交,数据也就算是成功写出了。

1必须要保证检查点被打开了,如果检查点没有打开,那么之前说的一切话都是空谈。因为Flink默认检查点是关着的。

2在FlinkKafakProducer连接器的构造函数中要传入参数,这个参数就是用来保证状态一致性的。就是在构造函数的最后一个参数输入如下:

3配置Kafka读取数据的隔离级别

在kafka中有个配置,这个配置用来管理Kafka读取数据的级别。而这个配置默认是能够读取预提交阶段的数据的,所以如果你没改这个配置,那两阶段提交的第一阶段就是白费了。所以需要改一下这个配置,来更换一下隔离级别:

4事务超时时间

这个配置也很有意思,大家试想一下。如果要进行两阶段提交,就要保证sink端支持事务,Kafka是支持事务的,但是像这个组件对于很多机制都有一个超时时间的概念,也就是说如果时间到了这个界限还没完成工作,那就会默认这个工作失败。Kafka中由这个概念,Flink中同样由这个概念。但是flink默认超时时间是1小时,而Kafka默认是15分钟,这就有可能出现检查点保存东西的时间大于15分钟,假如说是16分钟保存完成然后给sink发送检查点保存陈功可以提交事务的信号,但是这个时候Kafka已经认为事务失败,把之前的数据都扔了。那数据不就是丢失了么。所以说Kafka的超时时间要大于Flink的超时时间才好。

截止到目前为止,基本上把有关于状态维护的一些东西都说完了,有状态后端、有检查点。还通过检查点完成可端到端的数据精准一次性消费。但是想到这我又感觉,如果有学习进度比我差一些的,万一没办法很好的理解怎么办。所以在下一篇文章当中我就聊聊Flink中的“状态”到底是个什么东西,都有什么类型,都怎么去用。

楼主你好,这种大系统对硬件设备,软件技术要求都是非常严格。每年的618,双11,对于京东、阿里的技术大咖来说,很紧张状态。这种活动每秒钟处理的订单量都是千万级的。这种大系统都是由各个子系统之间相互配合完成的。

硬件设备就不用多说了,采购最好的。重点说一下软件部分。对于这种大系统。用的技术很多。也是业界都在用的技术,比如大数据实时数据处理、大数据实时计算、几乎准实时查询检索等等。

大数据实时数据处理用的技术主要是Flume+Kafka+SparkStreaming、Flume+Kafka+Storm、Flink等。这些技术每个技术细节就不详细讲述了。它们都是处理海量数据使用的开源框架,对于京东或者阿里很有可能优化了源码,开发出适合他们公司需要的场景框架。但是核心技术差异不大。

大数据实时计算技术基本上都是用Kafka、SparkStreaming、SparkSQL、SparkGrapnX等中的一个或者多个去完成。

大数据准实时查询检索用的技术就很多,这里介绍两种,一种是交互式查询,创建二级索引(Hbase+Solr),另外一种ElasticSearch全文检索框架。

大系统用到的技术都差不多,关键看架构师怎么设计架构好业务场景,设计不好就会出现最早的购票系统12306。设计好了就像现在的京东商城、天猫商城处理那么大数据量还能运行正常。

技术在快速发展,未来各个行业都会有这种千万级秒处理的大平台。需要大家不断的给自己充电学习。大家一起加油!

谢谢大家!如有疑问,可以私信我。

1离线收集工具:ETL在数据仓库的背景下,ETL基本上是数据收集的代表,包括数据提取、转换和加载。在转换过程中,需要根据具体的交易场景对数据进行管理,比如非法数据的监控和过滤、格式转换和数据标准化、数据替换、保证数据完整性等。2实时收集工具:Flume/Kafka实时采集主要用于考虑流处理的事务场景,例如记录数据源的各种 *** 作活动,如网络监控的流量处理、金融应用的股票核算、web服务器记录的用户访问行为等。在流处理场景下,数据采集会成为Kafka的客户,就像大坝一样拦截来自上游的连续数据,然后根据事务场景做相应的处理(比如去重、去噪、中心记账等。),然后将其写入相应的数据存储器。3互联网采集工具:爬虫、DPI等。Scribe是由脸书开发的数据(日志)收集系统。又称网络蜘蛛、网络机器人,是按照一定规则从万维网上自动抓取信息的程序或脚本,它支持、音频、视频等文件或附件的收集。除了网络中包含的内容之外,还可以使用带宽处理技术(如DPI或DFI)来处理网络流量的收集。

以上就是关于kafka 配置文件参数详解全部的内容,包括:kafka 配置文件参数详解、kafka-docker上使用+常用指令、Kafka的磁盘存储等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9515795.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-29
下一篇 2023-04-29

发表评论

登录后才能评论

评论列表(0条)

保存