(file-flume-kafka-flume-hdfs过程中)flume 配置文件的编写

(file-flume-kafka-flume-hdfs过程中)flume 配置文件的编写,第1张

(file-flume-kafka-flume-hdfs过程中)flume 配置文件的编写 flume 配置文件的编写 flume 配置文件 file-flume-kafka.conf 使用 TAILDIR source
  1 # Name the components on this agent
  2 a1.sources = r1
  3 a1.channels = c1
  4 
  5 # Describe/configure the source
  6 a1.sources.r1.type = TAILDIR
  # taildir source 监控几个组,先给这个组起一个名字,测试只用监控一个组就行了,先叫它f1 
  7 a1.sources.r1.filegroups = f1
  8 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
  # positionFile 和断点续传有关,记录的是断点续传的那个断点
  9 a1.sources.r1.positionFile = /opt/module/flume/pasitionFileFlume1.json
 10 
 11 # 配置拦截器
 12 a1.sources.r1.interceptors = i1
 13 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.ETLInterceptor$MyBuilder
 14 
 # Kafka channel是基于磁盘的(因为Kafka的数据是会落盘的,所以他是基于磁盘的),他的速度比memory channel慢
 #比file channel快(因为他有很多的优化,比如顺序读写,页缓存什么的)。它的优势在于他节省了一个channel的时间,
 #因为Kafka channel和Kafka sink的时间是一样的,都是往hdfs上面发。
 15 # Use a channel which buffers events in memory
 # 默认有序列化,就不用配置了
 16 a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
 17 a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092
 # 发到哪个topic里面,这个topic待会需要自己创建
 18 a1.channels.c1.kafka.topic = topic_log
 # 是否上传上去event 要改成false,否则上传的是event,会乱码
 19 a1.channels.c1.parseAsFlumeEvent = false
 20 
 21 # Bind the source and sink to the channel
 22 a1.sources.r1.channels = c1

kafka-flume-hdfs.conf使用Kafka source
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 使用Kafka source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 连接Kafka
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
# 组id改的话,变成了一个新的组,会触发offset重置,这个可以调,在官网下面有
a1.sources.r1.kafka.consumer.group.id = flume1
a1.sources.r1.kafka.topics = topic_log

# Describe the sink
a1.sinks.k1.type = hdfs
# 保存到hdfs的路劲
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
# 文件前缀
a1.sinks.k1.hdfs.filePrefix = log-
# 文件是否滚动 默认是false 没什么用 可以不管
a1.sinks.k1.hdfs.round = false

# 控制小文件问题的三个参数
# 实际开发中应该写成一小时,自己测试的时候为了方便,设置成10秒。
a1.sinks.k1.hdfs.rollInterval = 10
# 128M
a1.sinks.k1.hdfs.rollSize = 134217728
# event个数
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件。
# 设置能够进行压缩的格式
a1.sinks.k1.hdfs.fileType = CompressedStream
# 使用lzop压缩,前提是Hadoop要配置好lzop压缩
a1.sinks.k1.hdfs.codeC = lzop

# 使用file channel
# Use a channel which buffers events in file
a1.channels.c1.type = file
# 使用本地磁盘作为缓冲的路径
a1.channels.c1.dataDirs = /opt/module/flume/file-channel/date
# 检查点,自带检查,同时会进行断点续传。
a1.channels.c1.checkpointDir = /opt/module/flume/file-channel/checkpoint

# 配置source拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.TimeStampInterceptor$MyBuilder

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4010507.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-10-22
下一篇 2022-10-22

发表评论

登录后才能评论

评论列表(0条)

保存