kafka的配置分为 broker、producter、consumer三个不同的配置
一 BROKER 的全局配置
最为核心的三个配置 brokerid、logdir、zookeeperconnect 。
------------------------------------------- 系统 相关 -------------------------------------------
##每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变brokerid的话不会影响consumers
brokerid =1
##kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
logdirs = /tmp/kafka-logs
##提供给客户端响应的端口
port =6667
##消息体的最大大小,单位是字节
messagemaxbytes =1000000
## broker 处理消息的最大线程数,一般情况下不需要去修改
numnetworkthreads =3
## broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
numiothreads =8
## 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
backgroundthreads =4
## 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制
queuedmaxrequests =500
##broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
hostname
## 打广告的地址,若是设置的话,会提供给producers, consumers,其他broker连接,具体如何使用还未深究
advertisedhostname
## 广告地址端口,必须不同于port中的设置
advertisedport
## socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socketsendbufferbytes =1001024
## socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socketreceivebufferbytes =1001024
## socket请求的最大数值,防止serverOOM,messagemaxbytes必然要小于socketrequestmaxbytes,会被topic创建时的指定参数覆盖
socketrequestmaxbytes =10010241024
------------------------------------------- LOG 相关 -------------------------------------------
## topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
logsegmentbytes =102410241024
## 这个参数会在日志segment没有达到logsegmentbytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖
logrollhours =247
## 日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
logcleanuppolicy = delete
## 数据存储的最大时间 超过这个时间 会根据logcleanuppolicy设置的策略处理数据,也就是消费端能够多久去消费数据
## logretentionbytes和logretentionminutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
logretentionminutes=7days
指定日志每隔多久检查看是否可以被删除,默认1分钟
logcleanupintervalmins=1
## topic每个分区的最大文件大小,一个topic的大小限制 = 分区数logretentionbytes 。-1没有大小限制
## logretentionbytes和logretentionminutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
logretentionbytes=-1
## 文件大小检查的周期时间,是否处罚 logcleanuppolicy中设置的策略
logretentioncheckintervalms=5minutes
## 是否开启日志压缩
logcleanerenable=false
## 日志压缩运行的线程数
logcleanerthreads =1
## 日志压缩时候处理的最大大小
logcleaneriomaxbytespersecond=None
## 日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好
logcleanerdedupebuffersize=50010241024
## 日志清理时候用到的IO块大小 一般不需要修改
logcleaneriobuffersize=5121024
## 日志清理中hash表的扩大因子 一般不需要修改
logcleaneriobufferloadfactor =09
## 检查是否处罚日志清理的间隔
logcleanerbackoffms =15000
## 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
logcleanermincleanableratio=05
## 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同logretentionminutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
logcleanerdeleteretentionms =1day
## 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
logindexsizemaxbytes =1010241024
## 当执行一个fetch *** 作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
logindexintervalbytes =4096
## log文件"sync"到磁盘之前累积的消息条数
## 因为磁盘IO *** 作是一个慢 *** 作,但又是一个"数据可靠性"的必要手段
## 所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡
## 如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞)
## 如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟
## 物理server故障,将会导致没有fsync的消息丢失
logflushintervalmessages=None
## 检查是否需要固化到硬盘的时间间隔
logflushschedulerintervalms =3000
## 仅仅通过interval来控制消息的磁盘写入时机,是不足的
## 此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔
## 达到阀值,也将触发
logflushintervalms = None
## 文件在索引中清除后保留的时间 一般不需要去修改
logdeletedelayms =60000
## 控制上次固化硬盘的时间点,以便于数据恢复 一般不需要去修改
logflushoffsetcheckpointintervalms =60000
------------------------------------------- TOPIC 相关 -------------------------------------------
## 是否允许自动创建topic ,若是false,就需要通过命令创建topic
autocreatetopicsenable =true
## 一个topic ,默认分区的replication个数 ,不得大于集群中broker的个数
defaultreplicationfactor =1
## 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
numpartitions =1
实例 --replication-factor3--partitions1--topic replicated-topic :名称replicated-topic有一个分区,分区被复制到三个broker上。
----------------------------------复制(Leader、replicas) 相关 ----------------------------------
## partition leader与replicas之间通讯时,socket的超时时间
controllersockettimeoutms =30000
## partition leader与replicas数据同步时,消息的队列尺寸
controllermessagequeuesize=10
## replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replicalagtimemaxms =10000
## 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效
## 通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后
## 如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移
## 到其他follower中
## 在broker数量较少,或者网络不足的环境中,建议提高此值
replicalagmaxmessages =4000
##follower与leader之间的socket超时时间
replicasockettimeoutms=301000
## leader复制时候的socket缓存大小
replicasocketreceivebufferbytes=641024
## replicas每次获取数据的最大大小
replicafetchmaxbytes =10241024
## replicas同leader之间通信的最大等待时间,失败了会重试
replicafetchwaitmaxms =500
## fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
replicafetchminbytes =1
## leader 进行复制的线程数,增大这个数值会增加follower的IO
numreplicafetchers=1
## 每个replica检查是否将最高水位进行固化的频率
replicahighwatermarkcheckpointintervalms =5000
## 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
controlledshutdownenable =false
## 控制器关闭的尝试次数
controlledshutdownmaxretries =3
## 每次关闭尝试的时间间隔
controlledshutdownretrybackoffms =5000
## 是否自动平衡broker之间的分配策略
autoleaderrebalanceenable =false
## leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leaderimbalanceperbrokerpercentage =10
## 检查leader是否不平衡的时间间隔
leaderimbalancecheckintervalseconds =300
## 客户端保留offset信息的最大空间大小
offsetmetadatamaxbytes
----------------------------------ZooKeeper 相关----------------------------------
##zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeperconnect = localhost:2181
## ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeepersessiontimeoutms=6000
## ZooKeeper的连接超时时间
zookeeperconnectiontimeoutms =6000
## ZooKeeper集群中leader和follower之间的同步实际那
zookeepersynctimems =2000
配置的修改
其中一部分配置是可以被每个topic自身的配置所代替,例如
新增配置
bin/kafka-topicssh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config maxmessagebytes=64000--config flushmessages=1
修改配置
bin/kafka-topicssh --zookeeper localhost:2181--alter --topic my-topic --config maxmessagebytes=128000
删除配置 :
bin/kafka-topicssh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig maxmessagebytes
二 CONSUMER 配置
最为核心的配置是groupid、zookeeperconnect
## Consumer归属的组ID,broker是根据groupid来判断是队列模式还是发布订阅模式,非常重要
groupid
## 消费者的ID,若是没有设置的话,会自增
consumerid
## 一个用于跟踪调查的ID ,最好同groupid相同
clientid = group id value
## 对于zookeeper集群的指定,可以是多个 hostname1:port1,hostname2:port2,hostname3:port3 必须和broker使用同样的zk配置
zookeeperconnect=localhost:2182
## zookeeper的心跳超时时间,查过这个时间就认为是dead消费者
zookeepersessiontimeoutms =6000
## zookeeper的等待连接时间
zookeeperconnectiontimeoutms =6000
## zookeeper的follower同leader的同步时间
zookeepersynctimems =2000
## 当zookeeper中没有初始的offset时候的处理方式 。smallest :重置为最小值 largest:重置为最大值 anythingelse:抛出异常
autooffsetreset = largest
## socket的超时时间,实际的超时时间是:maxfetchwait + sockettimeoutms
sockettimeoutms=301000
## socket的接受缓存空间大小
socketreceivebufferbytes=641024
##从每个分区获取的消息大小限制
fetchmessagemaxbytes =10241024
## 是否在消费消息后将offset同步到zookeeper,当Consumer失败后就能从zookeeper获取最新的offset
autocommitenable =true
## 自动提交的时间间隔
autocommitintervalms =601000
## 用来处理消费消息的块,每个块可以等同于fetchmessagemaxbytes中数值
queuedmaxmessagechunks =10
## 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新
## 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册
##"Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点,
## 此值用于控制,注册节点的重试次数
rebalancemaxretries =4
## 每次再平衡的时间间隔
rebalancebackoffms =2000
## 每次重新选举leader的时间
refreshleaderbackoffms
## server发送到消费端的最小数据,若是不满足这个数值则会等待,知道满足数值要求
fetchminbytes =1
## 若是不满足最小大小(fetchminbytes)的话,等待消费端请求的最长等待时间
fetchwaitmaxms =100
## 指定时间内没有消息到达就抛出异常,一般不需要改
consumertimeoutms = -1
三 PRODUCER 的配置
比较核心的配置:metadatabrokerlist、requestrequiredacks、producertype、serializerclass
## 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip
metadatabrokerlist
##消息的确认模式
##0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
##1:发送消息,并会等待leader 收到确认后,一定的可靠性
## -1:发送消息,等待leader收到确认,并进行复制 *** 作后,才返回,最高的可靠性
requestrequiredacks =0
## 消息发送的最长等待时间
requesttimeoutms =10000
## socket的缓存大小
sendbufferbytes=1001024
## key的序列化方式,若是没有设置,同serializerclass
keyserializerclass
## 分区的策略,默认是取模
partitionerclass=kafkaproducerDefaultPartitioner
## 消息的压缩模式,默认是none,可以有gzip和snappy
compressioncodec = none
## 可以针对默写特定的topic进行压缩
compressedtopics=null
## 消息发送失败后的重试次数
messagesendmaxretries =3
## 每次失败后的间隔时间
retrybackoffms =100
## 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据
topicmetadatarefreshintervalms =6001000
## 用户随意指定,但是不能重复,主要用于跟踪记录消息
clientid=""
------------------------------------------- 消息模式 相关 -------------------------------------------
## 生产者的类型 async:异步执行消息的发送 sync:同步执行消息的发送
producertype=sync
## 异步模式下,那么就会在设置的时间缓存消息,并一次性发送
queuebufferingmaxms =5000
## 异步的模式下 最长等待的消息数
queuebufferingmaxmessages =10000
## 异步模式下,进入队列的等待时间 若是设置为0,那么要么进入队列,要么直接抛弃
queueenqueuetimeoutms = -1
## 异步模式下,每次发送的最大消息数,前提是触发了queuebufferingmaxmessages或是queuebufferingmaxms的限制
batchnummessages=200
## 消息体的系列化处理类 ,转化为字节流进行传输
serializerclass= kafkaserializerDefaultEncoder
生产者向broker发送消息,消费者接收消息,broker是物理概念,部署几个kafka即几个broker,topic是逻辑概念,往topic里发送消息会发送到设置好的几个partion上,每个partion存储作为不同队列存储不同数据,partion有leader和follower备份机制,消息发送时会轮循发送到不同broker的不同partion中,同一消费者只能消费同一分区,通过offset记录消费位置,消费者组可以访问一个topic的不同partion
启动镜像
启动kafka可以带上参数,这样会自动修改kafka里的配置文件(/opt/kafka_版本/conf/serverproperties),否则不带参数需要自己进入进行手动修改 带参数版启动可参考
其中1721703需要改成自己docker的网桥连接地址
查看已启动容器
查看所有容器
启动未启动的容器
进入kafka容器
创建主题
主题和分区可以理解为:topic是逻辑划分,kafka通过topic进行区分消息,topic的数据会被存储到日志中,如果数据量太大可以引入partion(同时提高读写吞吐量)来分段存储数据。其中replication-factor作用是将任意分区复制到broker上,broker是物理概念,部署了一个kafka可认为broker数为1,我本机只有一个kafka所以这里replication-factor超过1会报错。 综上几个概念可以理解为:集群中有多个broker,创建主题时可以指明topic有多个partitions(消息拆分到不同分区进行存储,一个partion只能被一个消费者消费--partion内部保证接收数据顺序),可以为分区创建多个副本replication,不同副本在不同的broker中(作为备份使用,这里有leader和flower的区分) 。
查看topic信息
集群部署
可以通过compose集群化部署过es,这里通过创建另一个composeyml文件来部署kafka,配置文件参考 docker-compose集群部署
生产者:
消费者:
方式一:从当前主题的迁移量位置+1开始取数据
方式二:从当前主题第一条消息开始消费
生产者将消息发送broker,broker将消息保存到本地日志中,消息的保存时有序的
单播消息:
当存在一个生产者,一个消费者组的时候,一个消费者组中只有一个消费者会收到消息
多播消息:
当存在一个生产者,多个消费组,不同消费组只有一个消费者收到消息
查看消费组详细信息:
CURRENT-OFFSET:最后被消费的偏移量
LOG-END-OFFSET:消息总量(最后一条消息的偏移量)
LAG :积压了多少条消息
常见问题:
1、如何防止消息丢失
生产者:使用同步消息发送;ack设置为1/all;设置同步分区数>=2
消费者:把自动提交改成手动提交
2、如何防止消息的重复消费
针对网络抖动导致的生产者重试(发送消息),可以设置消费者加锁解决;
3、消息积压
消费者使用多线程异步处理接收数据;创建多个消费者组部署到其他机器上;通过业务架构设计,提升业务层面消费性能。
ps:
缓冲区:kafka默认会创建一个消息缓冲区去存放要发送的消息,大小是32M,每次本地线程会去缓冲区拉16K数据发送到broker,如果不到16K等待10ms也会将数据发送到broker
参考链接:
1、kafka安装教程--推荐
2、kafka配置文件serverproperties参数说明
3、创建主题分区数
4、解决docker容器启动不了的问题
5、通过docker-compose集群部署
6、学习视频
kafka⾼性能,是多⽅⾯协同的结果,包括宏观架构、分布式partition存储、ISR数据同步、以及“⽆所不⽤其极”的
⾼效利⽤磁盘/ *** 作系统特性。
零拷⻉并不是不需要拷⻉,⽽是减少不必要的拷⻉次数。通常是说在IO读写过程中。
nginx的⾼性能也有零拷⻉的身影。
传统IO
⽐如:读取⽂件,socket发送
传统⽅式实现:先读取、再发送,实际经过1~4四次copy。
1、第⼀次:将磁盘⽂件,读取到 *** 作系统内核缓冲区;
2、第⼆次:将内核缓冲区的数据,copy到application应⽤程序的buffer;
3、第三步:将application应⽤程序buffer中的数据,copy到socket⽹络发送缓冲区(属于 *** 作系统内核的缓冲区);
4、第四次:将socket buffer的数据,copy到⽹络协议栈,由⽹卡进⾏⽹络传输。
实际IO读写,需要进⾏IO中断,需要CPU响应中断(内核态到⽤户态转换),尽管引⼊DMA(Direct Memory
Access,直接存储器访问)来接管CPU的中断请求,但四次copy是存在“不必要的拷⻉”的。
实际上并不需要第⼆个和第三个数据副本。数据可以直接从读缓冲区传输到套接字缓冲区。
kafka的两个过程:
1、⽹络数据持久化到磁盘 (Producer 到 Broker)
2、磁盘⽂件通过⽹络发送(Broker 到 Consumer)
数据落盘通常都是⾮实时的,Kafka的数据并不是实时的写⼊硬盘,它充分利⽤了现代 *** 作系统分⻚存储来利⽤内
存提⾼I/O效率。
磁盘⽂件通过⽹络发送(Broker 到 Consumer)
磁盘数据通过DMA(Direct Memory Access,直接存储器访问)拷⻉到内核态 Buffer
直接通过 DMA 拷⻉到 NIC Buffer(socket buffer),⽆需 CPU 拷⻉。
除了减少数据拷⻉外,整个读⽂件 ==> ⽹络发送由⼀个 sendfile 调⽤完成,整个过程只有两次上下⽂切换,因此⼤⼤提⾼了性能。
Java NIO对sendfile的⽀持就是FileChanneltransferTo()/transferFrom()。
fileChanneltransferTo( position, count, socketChannel);
把磁盘⽂件读取OS内核缓冲区后的fileChannel,直接转给socketChannel发送;底层就是sendfile。消费者从broker读取数据,就是由此实现。
具体来看,Kafka 的数据传输通过 TransportLayer 来完成,其⼦类PlaintextTransportLayer 通过Java NIO 的FileChannel 的 transferTo 和 transferFrom ⽅法实现零拷⻉。
页缓存是 *** 作系统实现的⼀种主要的磁盘缓存,以此⽤来减少对磁盘 I/O 的 *** 作。
具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
Kafka接收来⾃socket buffer的⽹络数据,应⽤进程不需要中间处理、直接进⾏持久化时。可以使⽤mmap内存⽂件映射。
Memory Mapped Files
简称mmap,简单描述其作⽤就是:将磁盘⽂件映射到内存, ⽤户通过修改内存就能修改磁盘⽂件。
它的⼯作原理是直接利⽤ *** 作系统的Page来实现磁盘⽂件到物理内存的直接映射。完成映射之后你对物理内存的 *** 作会被同步到硬盘上( *** 作系统在适当的时候)。
通过mmap,进程像读写硬盘⼀样读写内存(当然是虚拟机内存)。使⽤这种⽅式可以获取很⼤的I/O提升,省去了⽤户空间到内核空间复制的开销。
mmap也有⼀个很明显的缺陷:不可靠,写到mmap中的数据并没有被真正的写到硬盘, *** 作系统会在程序主动调⽤flush的时候才把数据真正的写到硬盘。
Kafka提供了⼀个参数 producertype 来控制是不是主动flush;
如果Kafka写⼊到mmap之后就⽴即flush然后再返回Producer叫同步(sync);
写⼊mmap之后⽴即返回Producer不调⽤flush叫异步(async)。
Java NIO对⽂件映射的⽀持Java NIO,提供了⼀个MappedByteBuffer 类可以⽤来实现内存映射。
MappedByteBuffer只能通过调⽤FileChannel的map()取得,再没有其他⽅式。
FileChannelmap()是抽象⽅法,具体实现是在 FileChannelImplmap()可⾃⾏查看JDK源码,其map0()⽅法就是调⽤了Linux内核的mmap的API。
*** 作系统可以针对线性读写做深层次的优化,⽐如预读(read-ahead,提前将⼀个⽐较⼤的磁盘块读⼊内存) 和后写(write-behind,将很多⼩的逻辑写 *** 作合并起来组成⼀个⼤的物理写 *** 作)技术。
Kafka 在设计时采用了文件追加的⽅式来写⼊消息,即只能在⽇志⽂件的尾部追加新的消 息,并且也不允许修改已写⼊的消息,这种⽅式属于典型的顺序写盘的 *** 作,所以就算 Kafka 使⽤磁盘作为存储介质,也能承载⾮常⼤的吞吐量。
mmap和sendfile:
在上一篇文章当中,也算是比较详细且通俗的聊了聊Flink是如何通过checkpoint机制来完成数据精准一次性的实现的。并且也在上一章的结尾表示,要在接下来聊一聊Flink与它的铁哥们Kafaka之间,是如何实现数据的精准一次性消费的。
本次的聊法,还是要通过以kafka(source)->Flink,Flink(source)->Kafka来分别展开讨论。
kafka是一个具有数据保存、数据回放能力的消息队列,说白了就是kafka中的每一个数据,都有一个专门的标记作为标识。而在Flink消费kafka传入的数据的时候,source任务就能够将这个偏移量以算子状态的角色进行保存,写入到设定好的检查点中。这样一旦发生故障,Flink中的FlinkKafkaProduce连接器就i能够按照自己保存的偏移量,自己去Kafka中重新拉取数据,也正是通过这种方式,就能够确保Kafka到Flink之间的精准一次性。
在上一篇文章当中,已经表明了,如果想要让输出端能够进行精准一次性消费,就需要使用到幂等性或者是事务。而事务中的两阶段提交是所有方案里面最好的实现。
其实Flink到Kafak之间也是采用了这种方式,具体的可以看一下ctrl进到FlinkKafkaProduce连接器内部去看一看:
这也就表明了,当数据通过Flink发送给sink端Kafka的时候,是经历了两个阶段的处理的。第一阶段就是Flink向Kafka中插入数据,进入预提交阶段。当JobManager发送的Ckeckpoint保存成功信号过来之后,才会提交事务进行正式的数据发送,也就是让原来不可用的数据可以被使用了。
这个实现过程到目前阶段就很清晰了,它的主体流程无非就是在开启检查点之后,由JobManager向各个阶段的处理逻辑发送有关于检查点的barrier。所有的计算任务接收到之后,就会根据自己当前的状态做一个检查点保存。而当这个barrier来到sink任务的时候,sink就会开启一个事务,然后通过这个事务向外预写数据。直到Jobmanager来告诉它这一次的检查点已经保存完成了,sink就会进行第二次提交,数据也就算是成功写出了。
1必须要保证检查点被打开了,如果检查点没有打开,那么之前说的一切话都是空谈。因为Flink默认检查点是关着的。
2在FlinkKafakProducer连接器的构造函数中要传入参数,这个参数就是用来保证状态一致性的。就是在构造函数的最后一个参数输入如下:
3配置Kafka读取数据的隔离级别
在kafka中有个配置,这个配置用来管理Kafka读取数据的级别。而这个配置默认是能够读取预提交阶段的数据的,所以如果你没改这个配置,那两阶段提交的第一阶段就是白费了。所以需要改一下这个配置,来更换一下隔离级别:
4事务超时时间
这个配置也很有意思,大家试想一下。如果要进行两阶段提交,就要保证sink端支持事务,Kafka是支持事务的,但是像这个组件对于很多机制都有一个超时时间的概念,也就是说如果时间到了这个界限还没完成工作,那就会默认这个工作失败。Kafka中由这个概念,Flink中同样由这个概念。但是flink默认超时时间是1小时,而Kafka默认是15分钟,这就有可能出现检查点保存东西的时间大于15分钟,假如说是16分钟保存完成然后给sink发送检查点保存陈功可以提交事务的信号,但是这个时候Kafka已经认为事务失败,把之前的数据都扔了。那数据不就是丢失了么。所以说Kafka的超时时间要大于Flink的超时时间才好。
截止到目前为止,基本上把有关于状态维护的一些东西都说完了,有状态后端、有检查点。还通过检查点完成可端到端的数据精准一次性消费。但是想到这我又感觉,如果有学习进度比我差一些的,万一没办法很好的理解怎么办。所以在下一篇文章当中我就聊聊Flink中的“状态”到底是个什么东西,都有什么类型,都怎么去用。
楼主你好,这种大系统对硬件设备,软件技术要求都是非常严格。每年的618,双11,对于京东、阿里的技术大咖来说,很紧张状态。这种活动每秒钟处理的订单量都是千万级的。这种大系统都是由各个子系统之间相互配合完成的。
硬件设备就不用多说了,采购最好的。重点说一下软件部分。对于这种大系统。用的技术很多。也是业界都在用的技术,比如大数据实时数据处理、大数据实时计算、几乎准实时查询检索等等。
大数据实时数据处理用的技术主要是Flume+Kafka+SparkStreaming、Flume+Kafka+Storm、Flink等。这些技术每个技术细节就不详细讲述了。它们都是处理海量数据使用的开源框架,对于京东或者阿里很有可能优化了源码,开发出适合他们公司需要的场景框架。但是核心技术差异不大。
大数据实时计算技术基本上都是用Kafka、SparkStreaming、SparkSQL、SparkGrapnX等中的一个或者多个去完成。
大数据准实时查询检索用的技术就很多,这里介绍两种,一种是交互式查询,创建二级索引(Hbase+Solr),另外一种ElasticSearch全文检索框架。
大系统用到的技术都差不多,关键看架构师怎么设计架构好业务场景,设计不好就会出现最早的购票系统12306。设计好了就像现在的京东商城、天猫商城处理那么大数据量还能运行正常。
技术在快速发展,未来各个行业都会有这种千万级秒处理的大平台。需要大家不断的给自己充电学习。大家一起加油!
谢谢大家!如有疑问,可以私信我。
1离线收集工具:ETL在数据仓库的背景下,ETL基本上是数据收集的代表,包括数据提取、转换和加载。在转换过程中,需要根据具体的交易场景对数据进行管理,比如非法数据的监控和过滤、格式转换和数据标准化、数据替换、保证数据完整性等。2实时收集工具:Flume/Kafka实时采集主要用于考虑流处理的事务场景,例如记录数据源的各种 *** 作活动,如网络监控的流量处理、金融应用的股票核算、web服务器记录的用户访问行为等。在流处理场景下,数据采集会成为Kafka的客户,就像大坝一样拦截来自上游的连续数据,然后根据事务场景做相应的处理(比如去重、去噪、中心记账等。),然后将其写入相应的数据存储器。3互联网采集工具:爬虫、DPI等。Scribe是由脸书开发的数据(日志)收集系统。又称网络蜘蛛、网络机器人,是按照一定规则从万维网上自动抓取信息的程序或脚本,它支持、音频、视频等文件或附件的收集。除了网络中包含的内容之外,还可以使用带宽处理技术(如DPI或DFI)来处理网络流量的收集。
以上就是关于kafka 配置文件参数详解全部的内容,包括:kafka 配置文件参数详解、kafka-docker上使用+常用指令、Kafka的磁盘存储等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)