kafka怎么样接收数据保存到MYSQL数据库

kafka怎么样接收数据保存到MYSQL数据库,第1张

private function loaderHandler(event:):void {

switch(eventtype) {

case EventCOMPLETE:

trace(_loaderdataresult);

break;

case EventOPEN:

trace("open: " + event);

break;

case ProgressEventPROGRESS:

trace("progress: " + event);

break;

51文件目录布局

根目录下有以下5个checkpoint文件: cleaner-offset-checkpoint, log-start-offset-checkpoint, metaproperties, recovery-point-offset-checkpoint, replication-offset-checkpoint

分区目录下有以下目录: 0000xxxindex(偏移量为64位长整形,长度固定为20位), 0000xxxlog, 0000xxxtimeindex

还有可能包含deleted cleaned swap等临时文件, 以及可能的snapshot txnindex leader-epoch-checkpoint

52日志格式演变

521 v0版本

kafka0100之前

RECORD_OVERHEAD包括offset(8B)和message size(4B)

RECORD包括:

crc32(4B):crc32校验值

magic(1B):消息版本号0

attributes(1B):消息属性。低3位表示压缩类型:0-NONE 1-GZIP 2-SNAPPY 3-LZ4(09x引入)

key length(4B):表示消息的key的长度。-1代表null

key: 可选

value length(4B):实际消息体的长度。-1代表null

value: 消息体。可以为空,如墓碑消息

522 v1版本

kafka0100-0110

比v0多了timestamp(8B)字段,表示消息的时间戳

attributes的第4位也被利用起来,0表示timestamp的类型为CreateTime,1表示timestamp的类型为LogAppendTime

timestamp类型由broker端参数logmessagetimestamptype来配置,默认为CreateTime,即采用生产者创建的时间戳

523 消息压缩

保证端到端的压缩,服务端配置compressiontype,默认为"producer",表示保留生产者使用的压缩方式,还可以配置为"gzip","snappy","lz4"

多条消息压缩至value字段,以提高压缩率

524 变长字段

变长整形(Varints):每一个字节都有一个位于最高位的msb位(most significant bit),除了最后一个字节为1,其余都为0,字节倒序排列

为了使编码更加高效,Varints使用ZigZag编码:sint32对应 (n<<1)^(n>>31) sint64对应 (n<<1)^(n>>63)

525 v2版本

Record Batch

first offset:

length:

partition leader epoch:

magic:固定为2

attributes:两个字节。低3位表示压缩格式,第4位表示时间戳类型,第5位表示事务(0-非事务1-事务),第6位控制消息(0-非控制1控制)

first timestamp:

max timestamp:

producer id:

producer epoch:

first sequence:

records count:

v2版本的消息去掉了crc字段,另外增加了length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增量)和headers信息,并且弃用了attributes

Record

length:

attributes:弃用,但仍占据1B

timestamp delta:

offset delta:

headers:

53日志索引

稀疏索引(sparse index):每当写入一定量(broker端参数logindexintervalbytes指定,默认为4096B),偏移量索引文件和时间索引文件分别对应一个索引项

日志段切分策略:

1大小超过broker端参数logsegmentbytes配置的值,默认为1073741824(1GB)

2当前日志段消息的最大时间戳与当前系统的时间戳差值大于logrollms或者logrollhours,ms优先级高,默认logrollhours=168(7天)

3索引文件或者时间戳索引文件的大小大于logindexsizemaxbytes配置的值,默认为10485760(10MB)

4偏移量差值(offset-baseOffset)>IntegerMAX_VALUE

531 偏移量索引

每个索引项占用8个字节,分为两个部分:1relativeOffset相对偏移量(4B) 2position物理地址(4B)

使用kafka-dump-logsh脚本来解析index文件(包括timeindex、snapshot、txnindex等文件),如下:

bin/kafka-dump-logsh --files /tmp/kafka-logs/topicId-0/00……00index

如果broker端参数logindexsizemaxbytes不是8的倍数,内部会自动转换为8的倍数

532 时间戳索引

每个索引项占用12个字节,分为两个部分:1timestamp当前日志分段的最大时间戳(12B) 2relativeOffset时间戳对应的相对偏移量(4B)

如果broker端参数logindexsizemaxbytes不是12的倍数,内部会自动转换为12的倍数

54日志清理

日志清理策略可以控制到主题级别

541 日志删除

broker端参数logcleanuppolicy设置为delete(默认为delete)

检测周期broker端参数logretentioncheckintervalms=300000(默认5分钟)

1基于时间

broker端参数logretentionhours,logretentionminutes,logretentionms,优先级ms>minutes>hours

删除时先增加delete后缀,延迟删除根据filedeletedelayms(默认60000)配置

2基于日志大小

日志总大小为broker端参数logretentionbytes(默认为-1,表示无穷大)

日志段大小为broker端参数logsegmentbytes(默认为1073741824,1GB)

3基于日志起始偏移量

DeleteRecordRequest请求

1KafkaAdminClient的deleteRecord()

2kafka-delete-recordsh脚本

542 日志压缩

broker端参数logcleanuppolicy设置为compact,且logcleanerenable设置为true(默认为true)

55磁盘存储

相关测试:一个由6块7200r/min的RAID-5阵列组成的磁盘簇的线性写入600MB/s,随机写入100KB/s,随机内存写入400MB/s,线性内存36GB/s

551 页缓存

Linux *** 作系统的vmdirty_background_ratio参数用来指定脏页数量达到系统的百分比之后就触发pdflush/flush/kdmflush,一般小于10,不建议为0

vmdirty_ratio表示脏页百分比之后刷盘,但是阻塞新IO请求

kafka同样提供同步刷盘及间断性强制刷盘(fsync)功能,可以通过logflushintervalmessages、logflushintervalms等参数来控制

kafka不建议使用swap分区,vmswappiness参数上限为100,下限为0,建议设置为1

552 磁盘I/O流程

一般磁盘IO的场景有以下4种:

1用户调用标准C库进行IO *** 作,数据流为:应用程序Buffer->C库标准IOBuffer->文件系统也缓存->通过具体文件系统到磁盘

2用户调用文件IO,数据流为:应用程序Buffer->文件系统也缓存->通过具体文件系统到磁盘

3用户打开文件时使用O_DIRECT,绕过页缓存直接读写磁盘

4用户使用类似dd工具,并使用direct参数,绕过系统cache与文件系统直接读写磁盘

Linux系统中IO调度策略有4种:

1NOOP:no operation

2CFQ

3DEADLINE

4ANTICIPATORY

553 零拷贝

指数据直接从磁盘文件复制到网卡设备中,不需要经应用程序

对linux而言依赖于底层的sendfile()

对java而言,FileChannaltransferTo()的底层实现就是sendfile()

以上就是关于kafka怎么样接收数据保存到MYSQL数据库全部的内容,包括:kafka怎么样接收数据保存到MYSQL数据库、深入理解kafka(五)日志存储、等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/10097537.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-05
下一篇 2023-05-05

发表评论

登录后才能评论

评论列表(0条)

保存