Flume拦截器

Flume拦截器,第1张

Flume拦截器

关于Flume拦截器的问题:

​ flume官方文档:https://flume.apache.org/documentation.html

​ 当我们采用flume - kafka - flume的中间件:

​ 第一层flume:

#为各组件命名
a1.sources = r1
a1.channels = c1

#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.atguigu.ETLInterceptor$MyBuilder

#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadooop104:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1

​ taildir source:

taildir_position.json存储的文件格式:
[{"inode":529114,"pos":774728,"file":"/opt/module/applog/log/app.2021-10-22.log"}]

​ 这里我们使用自定义非JSON格式拦截器,过滤掉非JSON格式的信息

​ kafka channel:

​ 减少take事务,提高效率

​ 在kafka 1.6 中 ,kafka channel 中存在bug:

​ parseAsFlumeEvent 这个参数无论设置ture还是false,都会为对source来的数据进行解析,解析完会把头部信息加到数据前面,因此这种情况,下游会需要做额外的截取工作

​ 在kafka 1.7 中,解决了这个bug,默认是ture,我们设置false

​ 第二层Flume:

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.TimeStampInterceptor$MyBuilder

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
#解决小文件问题
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

​ kafka source:

​ 因为我们要设置时间戳拦截器,防止零点漂移问题,所以我们不能采用Kafka channel 来省去source

​ 时间戳拦截器就是提取数据本身的timestamp,把它放到头部信息header中


​ file Chanel:

​ 落盘

​ hdfs sink:

​ useLocalTimeStamp:是否使用当地时间。默认值:flase

​ 所以我们的hdfs路径上落盘文件就按照头部信息的时间戳落盘

​ codeC:文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy

​ fileType:文件格式,包括:SequenceFile, DataStream,CompressedStream,默认值:SequenceFile

​ 当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;

​ 当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5678800.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存