1 # Name the components on this agent 2 a1.sources = r1 3 a1.channels = c1 4 5 # Describe/configure the source 6 a1.sources.r1.type = TAILDIR # taildir source 监控几个组,先给这个组起一个名字,测试只用监控一个组就行了,先叫它f1 7 a1.sources.r1.filegroups = f1 8 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.* # positionFile 和断点续传有关,记录的是断点续传的那个断点 9 a1.sources.r1.positionFile = /opt/module/flume/pasitionFileFlume1.json 10 11 # 配置拦截器 12 a1.sources.r1.interceptors = i1 13 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.ETLInterceptor$MyBuilder 14 # Kafka channel是基于磁盘的(因为Kafka的数据是会落盘的,所以他是基于磁盘的),他的速度比memory channel慢 #比file channel快(因为他有很多的优化,比如顺序读写,页缓存什么的)。它的优势在于他节省了一个channel的时间, #因为Kafka channel和Kafka sink的时间是一样的,都是往hdfs上面发。 15 # Use a channel which buffers events in memory # 默认有序列化,就不用配置了 16 a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel 17 a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092 # 发到哪个topic里面,这个topic待会需要自己创建 18 a1.channels.c1.kafka.topic = topic_log # 是否上传上去event 要改成false,否则上传的是event,会乱码 19 a1.channels.c1.parseAsFlumeEvent = false 20 21 # Bind the source and sink to the channel 22 a1.sources.r1.channels = c1kafka-flume-hdfs.conf使用Kafka source
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source # 使用Kafka source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource # 连接Kafka a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092 # 组id改的话,变成了一个新的组,会触发offset重置,这个可以调,在官网下面有 a1.sources.r1.kafka.consumer.group.id = flume1 a1.sources.r1.kafka.topics = topic_log # Describe the sink a1.sinks.k1.type = hdfs # 保存到hdfs的路劲 a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d # 文件前缀 a1.sinks.k1.hdfs.filePrefix = log- # 文件是否滚动 默认是false 没什么用 可以不管 a1.sinks.k1.hdfs.round = false # 控制小文件问题的三个参数 # 实际开发中应该写成一小时,自己测试的时候为了方便,设置成10秒。 a1.sinks.k1.hdfs.rollInterval = 10 # 128M a1.sinks.k1.hdfs.rollSize = 134217728 # event个数 a1.sinks.k1.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 # 设置能够进行压缩的格式 a1.sinks.k1.hdfs.fileType = CompressedStream # 使用lzop压缩,前提是Hadoop要配置好lzop压缩 a1.sinks.k1.hdfs.codeC = lzop # 使用file channel # Use a channel which buffers events in file a1.channels.c1.type = file # 使用本地磁盘作为缓冲的路径 a1.channels.c1.dataDirs = /opt/module/flume/file-channel/date # 检查点,自带检查,同时会进行断点续传。 a1.channels.c1.checkpointDir = /opt/module/flume/file-channel/checkpoint # 配置source拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.TimeStampInterceptor$MyBuilder ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)