1 flume的选择器
flume的选择器是作用在source端,用于将不同event分发到不同的channel里,flume内置的选择器有两种,分别如下:
Replicating:复用选择器,作用是将event拷贝到不同的channel里
Multiplexing:多副路选择器,作用是根据不同的条件将不同的event分发到不同的channel里
Replicating选择器的案例演示
需求:将event拷贝到两个channel中,使用两个sink来各自接收一个channel的数据,落地到hdfs上的两个目录下
1)采集方案的编写:syslogtcp + mem+ hdfs
[root@hadoop01 ~]# vim flumeconf/syslogtcp-mem-hdfs-replicating.properties #命名,并关联 a1.sources=r1 a1.channels=c1 c2 a1.sinks=s1 s2 a1.sources.r1.channels=c1 c2 a1.sinks.s1.channel=c1 a1.sinks.s2.channel=c2 #设置每个组件的接口以及属性 a1.sources.r1.type=syslogtcp a1.sources.r1.host=hadoop01 a1.sources.r1.port=10086 a1.sources.r1.eventSize=2500 a1.sources.r1.selector.type=replicating a1.sources.r1.selector.optional=c2 #设置channel组件的属性 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 #设置channel组件的属性 a1.channels.c2.type=memory a1.channels.c2.capacity=1000 a1.channels.c2.transactionCapacity=100 #设置hdfs的sink a1.sinks.s1.type=hdfs a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/s1/%Y-%m a1.sinks.s1.hdfs.useLocalTimeStamp=true a1.sinks.s1.hdfs.filePrefix=tom a1.sinks.s1.hdfs.fileSuffix=.xxx a1.sinks.s1.hdfs.rollInterval=60 a1.sinks.s1.hdfs.rollSize=1024 a1.sinks.s1.hdfs.rollCount=10 a1.sinks.s1.hdfs.batchSize=100 a1.sinks.s1.hdfs.writeFormat=Text a1.sinks.s1.hdfs.fileType=DataStream a1.sinks.s1.hdfs.round=true a1.sinks.s1.hdfs.roundValue=2 a1.sinks.s1.hdfs.roundUnit=minute #设置hdfs的sink a1.sinks.s2.type=hdfs a1.sinks.s2.hdfs.path=hdfs://hadoop01:8020/flume/s2/%Y-%m a1.sinks.s2.hdfs.useLocalTimeStamp=true a1.sinks.s2.hdfs.filePrefix=tom a1.sinks.s2.hdfs.fileSuffix=.xxx a1.sinks.s2.hdfs.rollInterval=60 a1.sinks.s2.hdfs.rollSize=1024 a1.sinks.s2.hdfs.rollCount=10 a1.sinks.s2.hdfs.batchSize=100 a1.sinks.s2.hdfs.writeFormat=Text a1.sinks.s2.hdfs.fileType=DataStream a1.sinks.s2.hdfs.round=true a1.sinks.s2.hdfs.roundValue=2 a1.sinks.s2.hdfs.roundUnit=minute
2)启动方案
[root@hadoop01 ~]# flume-ng agent -f ./flumeconf/syslogtcp-mem-hdfs-replicating.properties -n a1 -Dflume.root.logger=INFO,console
3)测试
[root@hadoop01 ~]# echo "helloworld" | nc hadoop01 10086 [root@hadoop01 ~]# echo "helloworldhello" | nc hadoop01 10086
查看hdfs上是否有s1和s2目录
2 Multiplexing选择器的案例演示
需求:当event的消息头中的一个叫做hobbys的key的value如果是movie,就存储到一个叫Japan的目录下,如果是music,就存储到一个叫map3的目录下,否则存储到others目录下
1)采集方案的编写:http + mem + hdfs
[root@hadoop01 ~]# vim flumeconf/syslogtcp-mem-hdfs-multiplexing.properties #命名,并关联 a1.sources=r1 a1.channels=c1 c2 c3 c4 a1.sinks=s1 s2 s3 s4 a1.sources.r1.channels=c1 c2 c3 c4 a1.sinks.s1.channel=c1 a1.sinks.s2.channel=c2 a1.sinks.s3.channel=c3 a1.sinks.s4.channel=c4 #设置每个组件的接口以及属性 a1.sources.r1.type=http a1.sources.r1.bind=hadoop01 a1.sources.r1.port=10086 a1.sources.r1.handler=org.apache.flume.source.http.JSonHandler a1.sources.r1.selector.type=multiplexing a1.sources.r1.selector.header=hobbys a1.sources.r1.selector.mapping.movie=c1 a1.sources.r1.selector.mapping.music=c2 a1.sources.r1.selector.mapping.car=c3 a1.sources.r1.selector.default=c4 #设置channel组件的属性 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 #设置channel组件的属性 a1.channels.c2.type=memory a1.channels.c2.capacity=1000 a1.channels.c2.transactionCapacity=100 #设置channel组件的属性 a1.channels.c3.type=memory a1.channels.c3.capacity=1000 a1.channels.c3.transactionCapacity=100 #设置channel组件的属性 a1.channels.c4.type=memory a1.channels.c4.capacity=1000 a1.channels.c4.transactionCapacity=100 #设置hdfs的sink a1.sinks.s1.type=hdfs a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/Japan a1.sinks.s1.hdfs.useLocalTimeStamp=true a1.sinks.s1.hdfs.filePrefix=tom a1.sinks.s1.hdfs.fileSuffix=.xxx a1.sinks.s1.hdfs.rollInterval=60 a1.sinks.s1.hdfs.rollSize=1024 a1.sinks.s1.hdfs.rollCount=10 a1.sinks.s1.hdfs.batchSize=100 a1.sinks.s1.hdfs.writeFormat=Text a1.sinks.s1.hdfs.fileType=DataStream a1.sinks.s1.hdfs.round=true a1.sinks.s1.hdfs.roundValue=2 a1.sinks.s1.hdfs.roundUnit=minute #设置hdfs的sink a1.sinks.s2.type=hdfs a1.sinks.s2.hdfs.path=hdfs://hadoop01:8020/flume/mp3 a1.sinks.s2.hdfs.useLocalTimeStamp=true a1.sinks.s2.hdfs.filePrefix=tom a1.sinks.s2.hdfs.fileSuffix=.xxx a1.sinks.s2.hdfs.rollInterval=60 a1.sinks.s2.hdfs.rollSize=1024 a1.sinks.s2.hdfs.rollCount=10 a1.sinks.s2.hdfs.batchSize=100 a1.sinks.s2.hdfs.writeFormat=Text a1.sinks.s2.hdfs.fileType=DataStream a1.sinks.s2.hdfs.round=true a1.sinks.s2.hdfs.roundValue=2 a1.sinks.s2.hdfs.roundUnit=minute #设置hdfs的sink a1.sinks.s3.type=hdfs a1.sinks.s3.hdfs.path=hdfs://hadoop01:8020/flume/car a1.sinks.s3.hdfs.useLocalTimeStamp=true a1.sinks.s3.hdfs.filePrefix=tom a1.sinks.s3.hdfs.fileSuffix=.xxx a1.sinks.s3.hdfs.rollInterval=60 a1.sinks.s3.hdfs.rollSize=1024 a1.sinks.s3.hdfs.rollCount=10 a1.sinks.s3.hdfs.batchSize=100 a1.sinks.s3.hdfs.writeFormat=Text a1.sinks.s3.hdfs.fileType=DataStream a1.sinks.s3.hdfs.round=true a1.sinks.s3.hdfs.roundValue=2 a1.sinks.s3.hdfs.roundUnit=minute #设置hdfs的sink a1.sinks.s4.type=hdfs a1.sinks.s4.hdfs.path=hdfs://hadoop01:8020/flume/others a1.sinks.s4.hdfs.useLocalTimeStamp=true a1.sinks.s4.hdfs.filePrefix=tom a1.sinks.s4.hdfs.fileSuffix=.xxx a1.sinks.s4.hdfs.rollInterval=60 a1.sinks.s4.hdfs.rollSize=1024 a1.sinks.s4.hdfs.rollCount=10 a1.sinks.s4.hdfs.batchSize=100 a1.sinks.s4.hdfs.writeFormat=Text a1.sinks.s4.hdfs.fileType=DataStream a1.sinks.s4.hdfs.round=true a1.sinks.s4.hdfs.roundValue=2 a1.sinks.s4.hdfs.roundUnit=minute
2)启动方案
[root@hadoop01 ~]# flume-ng agent -f ./flumeconf/syslogtcp-mem-hdfs-multiplexing.properties -n a1 -Dflume.root.logger=INFO,console
3)测试
[root@hadoop01 ~]# curl -X POST -d '[{"headers":{"hobbys":"movie"},"body":"this is canglaoshi conntent"}]' http://hadoop01:10086 [root@hadoop01 ~]# curl -X POST -d '[{"headers":{"hobbys":"music"},"body":"see you again"}]' http://hadoop01:10086 [root@hadoop01 ~]# curl -X POST -d '[{"headers":{"hobbys":"ball"},"body":"basketball"}]' http://hadoop01:10086 [root@hadoop01 ~]# curl -X POST -d '[{"headers":{"hobbys":"car"},"body":"this my audi"}]' http://hadoop01:10086
2 flume的拦截器
flume的拦截器可以作用在source端和sink端,用于拦截flow中的event,向event中的headers里添加键值对,可以用于判断,选择等。
常用的拦截器有:
Timestamp Interceptor:时间戳拦截器
拦截event,将时间设置到headers里,默认的key是timestamp
Host Interceptor:主机拦截器
拦截event,将主机名或者是ip地址设置到headers里
Static Interceptor:静态拦截器
拦截event,可以自定义key
Regex Filtering Interceptor:正则表达式拦截器
使用正则表达式,来拦截过滤掉匹配上的event。
2.1 案例演示
该案例综合三个拦截器一起使用Timestamp、host、static,参考采集方案即可
需求: 时间戳拦截器: 使用hdfs的sink。 可以取消本地时间的使用
主机拦截器: 将主机名|ip设置成 hdfs的文件夹名称
静态拦截器: 指定一个固定值为hdfs的文件后缀
注意:hdfs的文件后缀 不能从静态拦截器上获取静态值。
1)采集方案的编写
[root@hadoop01 ~]# vim flumeconf/interceptors1.properties #命名,并关联 a1.sources=r1 a1.channels=c1 a1.sinks=s1 a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1 #设置每个组件的接口以及属性 a1.sources.r1.type=http a1.sources.r1.bind=hadoop01 a1.sources.r1.port=10086 a1.sources.r1.handler=org.apache.flume.source.http.JSonHandler a1.sources.r1.interceptors = i1 i2 i3 a1.sources.r1.interceptors.i1.type=timestamp a1.sources.r1.interceptors.i2.type=host a1.sources.r1.interceptors.i2.useIP=false a1.sources.r1.interceptors.i2.hostHeader=host a1.sources.r1.interceptors.i3.type=static a1.sources.r1.interceptors.i3.preserveExisting=false a1.sources.r1.interceptors.i3.key=mienv a1.sources.r1.interceptors.i3.value=.xxx #设置channel组件的属性 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 #设置hdfs的sink a1.sinks.s1.type=hdfs a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/%{host}/%Y-%m-%d/%H-%M a1.sinks.s1.hdfs.useLocalTimeStamp=false a1.sinks.s1.hdfs.filePrefix=%{meinv} a1.sinks.s1.hdfs.fileSuffix=.xxx a1.sinks.s1.hdfs.rollInterval=60 a1.sinks.s1.hdfs.rollSize=1024 a1.sinks.s1.hdfs.rollCount=10 a1.sinks.s1.hdfs.batchSize=100 a1.sinks.s1.hdfs.writeFormat=Text a1.sinks.s1.hdfs.fileType=DataStream a1.sinks.s1.hdfs.round=true a1.sinks.s1.hdfs.roundValue=2 a1.sinks.s1.hdfs.roundUnit=minute
2)启动方案
[root@hadoop01 ~]# flume-ng agent -f ./flumeconf/interceptors1.properties -n a1 -Dflume.root.logger=INFO,console
3)测试
[root@hadoop01 ~]# curl -X POST -d '[{"headers":{"hobbys":"movie"},"body":"this is canglaoshi conntent"}]' http://hadoop01:10086
检查hdfs上的路径上是否有hadoop01和时间,以及文件名的前缀是否为.xxx
2.2 正则表达式拦截器的案例演示
需求:
使用正则表达式,判断event的body正文的第一个字符是否为数字,如果是数字,就过滤掉不要。
注意: 正则表达式 可以使用双引号,也可以不用引号,但是不能用单引号
1)采集方案的编写
[root@hadoop01 ~]# vim flumeconf/interceptors2.properties #命名,并关联 a1.sources=r1 a1.channels=c1 a1.sinks=s1 a1.sources.r1.channels=c1 a1.sinks.s1.channel=c1 #设置每个组件的接口以及属性 a1.sources.r1.type=http a1.sources.r1.bind=hadoop01 a1.sources.r1.port=10086 a1.sources.r1.handler=org.apache.flume.source.http.JSonHandler a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type=regex_filter a1.sources.r1.interceptors.i1.regex=^[0-9].* a1.sources.r1.interceptors.i1.excludeEvents=true #设置channel组件的属性 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 #设置hdfs的sink a1.sinks.s1.type=hdfs a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/regex/%Y-%m-%d/%H-%M a1.sinks.s1.hdfs.useLocalTimeStamp=true a1.sinks.s1.hdfs.filePrefix=tom a1.sinks.s1.hdfs.fileSuffix=.zzz a1.sinks.s1.hdfs.rollInterval=60 a1.sinks.s1.hdfs.rollSize=1024 a1.sinks.s1.hdfs.rollCount=10 a1.sinks.s1.hdfs.batchSize=100 a1.sinks.s1.hdfs.writeFormat=Text a1.sinks.s1.hdfs.fileType=DataStream a1.sinks.s1.hdfs.round=true a1.sinks.s1.hdfs.roundValue=2 a1.sinks.s1.hdfs.roundUnit=minute
2)启动方案
[root@hadoop01 ~]# flume-ng agent -f ./flumeconf/interceptors2.properties -n a1 -Dflume.root.logger=INFO,console
3)测试
[root@hadoop01 ~]# curl -X POST -d '[{"headers":{},"body":"0123hellworld"}]' http://hadoop01:10086 [root@hadoop01 ~]# curl -X POST -d '[{"headers":{},"body":"audi666"}]' http://hadoop01:10086
检查hdfs上的路径上是否有hadoop01和时间,以及文件名的前缀是否为nazha
3 自定义拦截器的应用
1)需求:
将event的正文 如果是数字开头的数据存储到hdfs上的/flume/number/下
将event的正文 如果是字母开头的数据存储到hdfs上的/flume/character/下
其他字符存储到hdfs上的/flume/others/下
2)分析
根据需求来分析,时间戳、主机、静态拦截器都无法满足需求,而正则表达式拦截器也不能满足需求,因为正则表达式只能对满足匹配格式的进行过滤掉而已。
所以,需要自定义拦截器,来实现相应的逻辑,并使用多副路选择器来完成具体event的分发。
多副路选择器可以进行如下设置
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header = content
a1.sources.r1.selector.mapping.number = c1
a1.sources.r1.selector.mapping.character = c2
a1.sources.r1.selector.default = c3
每个通道对应一个sink。
因此:在自定义拦截器里的逻辑应该是判断是否为数字|字母|其他开头,然后设置消息头里的键值对
content:number
content:character
3)自定义拦截器
pom.xml
org.apache.flume flume-ng-core1.8.0
代码如下:
package com.qf.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class MyInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { Mapmap = new HashMap<>(); //获取event的正文内容 byte[] body = event.getBody(); String data = new String(body); //获取第一个字符 char c = data.charAt(0); if(c>=48&&c<=57){ map.put("content","number"); }else if((c>=65&&c<=90)||(c>=97&&c<=122)){ map.put("content","character"); }else{ map.put("content","others"); } event.setHeaders(map); return event; } @Override public List intercept(List list) { List newList = new ArrayList<>(); for (Event event : list) { newList.add(intercept(event)); } return newList; } @Override public void close() { } public static class MyBuilder implements Builder{ @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) { } } }
打成jar包,上传到flume的lib目录下
4)编写采集方案
[root@hadoop01 ~]# vim flumeconf/custom-interceptor.properties #命名,并关联 a1.sources=r1 a1.channels=c1 c2 c3 a1.sinks=s1 s2 s3 a1.sources.r1.channels=c1 c2 c3 a1.sinks.s1.channel=c1 a1.sinks.s2.channel=c2 a1.sinks.s3.channel=c3 #设置每个组件的接口以及属性 a1.sources.r1.type=http a1.sources.r1.bind=hadoop01 a1.sources.r1.port=10086 a1.sources.r1.handler=org.apache.flume.source.http.JSonHandler a1.sources.r1.selector.type=multiplexing a1.sources.r1.selector.header=content a1.sources.r1.selector.mapping.number=c1 a1.sources.r1.selector.mapping.character=c2 a1.sources.r1.selector.default=c3 #使用自定义的拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type=com.qf.flume.interceptor.MyInterceptor$MyBuilder #设置channel组件的属性 a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 #设置channel组件的属性 a1.channels.c2.type=memory a1.channels.c2.capacity=1000 a1.channels.c2.transactionCapacity=100 #设置channel组件的属性 a1.channels.c3.type=memory a1.channels.c3.capacity=1000 a1.channels.c3.transactionCapacity=100 #设置hdfs的sink a1.sinks.s1.type=hdfs a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/number a1.sinks.s1.hdfs.useLocalTimeStamp=true a1.sinks.s1.hdfs.filePrefix=tom a1.sinks.s1.hdfs.fileSuffix=.xxx a1.sinks.s1.hdfs.rollInterval=60 a1.sinks.s1.hdfs.rollSize=1024 a1.sinks.s1.hdfs.rollCount=10 a1.sinks.s1.hdfs.batchSize=100 a1.sinks.s1.hdfs.writeFormat=Text a1.sinks.s1.hdfs.fileType=DataStream a1.sinks.s1.hdfs.round=true a1.sinks.s1.hdfs.roundValue=2 a1.sinks.s1.hdfs.roundUnit=minute #设置hdfs的sink a1.sinks.s2.type=hdfs a1.sinks.s2.hdfs.path=hdfs://hadoop01:8020/flume/character a1.sinks.s2.hdfs.useLocalTimeStamp=true a1.sinks.s2.hdfs.filePrefix=tom a1.sinks.s2.hdfs.fileSuffix=.xxx a1.sinks.s2.hdfs.rollInterval=60 a1.sinks.s2.hdfs.rollSize=1024 a1.sinks.s2.hdfs.rollCount=10 a1.sinks.s2.hdfs.batchSize=100 a1.sinks.s2.hdfs.writeFormat=Text a1.sinks.s2.hdfs.fileType=DataStream a1.sinks.s2.hdfs.round=true a1.sinks.s2.hdfs.roundValue=2 a1.sinks.s2.hdfs.roundUnit=minute #设置hdfs的sink a1.sinks.s3.type=hdfs a1.sinks.s3.hdfs.path=hdfs://hadoop01:8020/flume/others a1.sinks.s3.hdfs.useLocalTimeStamp=true a1.sinks.s3.hdfs.filePrefix=tom a1.sinks.s3.hdfs.fileSuffix=.xxx a1.sinks.s3.hdfs.rollInterval=60 a1.sinks.s3.hdfs.rollSize=1024 a1.sinks.s3.hdfs.rollCount=10 a1.sinks.s3.hdfs.batchSize=100 a1.sinks.s3.hdfs.writeFormat=Text a1.sinks.s3.hdfs.fileType=DataStream a1.sinks.s3.hdfs.round=true a1.sinks.s3.hdfs.roundValue=2 a1.sinks.s3.hdfs.roundUnit=minute
5)启动方案
[root@hadoop01 ~]# flume-ng agent -f ./flumeconf/custom-interceptor.properties -n a1 -Dflume.root.logger=INFO,console
6)测试
[root@hadoop01 ~]# curl -X POST -d '[{"headers":{},"body":"0123hellworld"}]' http://hadoop01:10086 [root@hadoop01 ~]# curl -X POST -d '[{"headers":{},"body":"audi777"}]' http://hadoop01:10086 [root@hadoop01 ~]# curl -X POST -d '[{"headers":{},"body":"-audi888"}]' http://hadoop01:10086
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)