Flume-----选择器和拦截器

Flume-----选择器和拦截器,第1张

Flume-----选择器和拦截器 flume的选择器和拦截器

1 flume的选择器
flume的选择器是作用在source端,用于将不同event分发到不同的channel里,flume内置的选择器有两种,分别如下:
Replicating:复用选择器,作用是将event拷贝到不同的channel里
Multiplexing:多副路选择器,作用是根据不同的条件将不同的event分发到不同的channel里
Replicating选择器的案例演示
需求:将event拷贝到两个channel中,使用两个sink来各自接收一个channel的数据,落地到hdfs上的两个目录下
1)采集方案的编写:syslogtcp + mem+ hdfs

[root@hadoop01 ~]# vim flumeconf/syslogtcp-mem-hdfs-replicating.properties
#命名,并关联
a1.sources=r1
a1.channels=c1 c2
a1.sinks=s1 s2
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
​
#设置每个组件的接口以及属性
a1.sources.r1.type=syslogtcp
a1.sources.r1.host=hadoop01
a1.sources.r1.port=10086
a1.sources.r1.eventSize=2500
a1.sources.r1.selector.type=replicating
a1.sources.r1.selector.optional=c2
​
​
​
#设置channel组件的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
​
#设置channel组件的属性
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
​
#设置hdfs的sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/s1/%Y-%m
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=tom
a1.sinks.s1.hdfs.fileSuffix=.xxx
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=2      
a1.sinks.s1.hdfs.roundUnit=minute
​
​
#设置hdfs的sink
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://hadoop01:8020/flume/s2/%Y-%m
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sinks.s2.hdfs.filePrefix=tom
a1.sinks.s2.hdfs.fileSuffix=.xxx
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=2      
a1.sinks.s2.hdfs.roundUnit=minute

2)启动方案

[root@hadoop01 ~]# flume-ng agent  -f ./flumeconf/syslogtcp-mem-hdfs-replicating.properties -n a1  -Dflume.root.logger=INFO,console

3)测试

[root@hadoop01 ~]# echo "helloworld" | nc hadoop01 10086
[root@hadoop01 ~]# echo "helloworldhello" | nc hadoop01 10086

查看hdfs上是否有s1和s2目录

2 Multiplexing选择器的案例演示
需求:当event的消息头中的一个叫做hobbys的key的value如果是movie,就存储到一个叫Japan的目录下,如果是music,就存储到一个叫map3的目录下,否则存储到others目录下
1)采集方案的编写:http + mem + hdfs

[root@hadoop01 ~]# vim flumeconf/syslogtcp-mem-hdfs-multiplexing.properties
#命名,并关联
a1.sources=r1
a1.channels=c1 c2 c3 c4
a1.sinks=s1 s2 s3 s4
a1.sources.r1.channels=c1 c2 c3 c4
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
a1.sinks.s3.channel=c3
a1.sinks.s4.channel=c4
​
#设置每个组件的接口以及属性
a1.sources.r1.type=http
a1.sources.r1.bind=hadoop01
a1.sources.r1.port=10086
a1.sources.r1.handler=org.apache.flume.source.http.JSonHandler
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=hobbys
a1.sources.r1.selector.mapping.movie=c1
a1.sources.r1.selector.mapping.music=c2
a1.sources.r1.selector.mapping.car=c3
a1.sources.r1.selector.default=c4
​
#设置channel组件的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
​
#设置channel组件的属性
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
​
#设置channel组件的属性
a1.channels.c3.type=memory
a1.channels.c3.capacity=1000
a1.channels.c3.transactionCapacity=100
​
#设置channel组件的属性
a1.channels.c4.type=memory
a1.channels.c4.capacity=1000
a1.channels.c4.transactionCapacity=100
​
#设置hdfs的sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/Japan
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=tom
a1.sinks.s1.hdfs.fileSuffix=.xxx
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=2      
a1.sinks.s1.hdfs.roundUnit=minute
​
​
#设置hdfs的sink
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://hadoop01:8020/flume/mp3
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sinks.s2.hdfs.filePrefix=tom
a1.sinks.s2.hdfs.fileSuffix=.xxx
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=2      
a1.sinks.s2.hdfs.roundUnit=minute
​
#设置hdfs的sink
a1.sinks.s3.type=hdfs
a1.sinks.s3.hdfs.path=hdfs://hadoop01:8020/flume/car
a1.sinks.s3.hdfs.useLocalTimeStamp=true
a1.sinks.s3.hdfs.filePrefix=tom
a1.sinks.s3.hdfs.fileSuffix=.xxx
a1.sinks.s3.hdfs.rollInterval=60
a1.sinks.s3.hdfs.rollSize=1024
a1.sinks.s3.hdfs.rollCount=10
a1.sinks.s3.hdfs.batchSize=100
a1.sinks.s3.hdfs.writeFormat=Text
a1.sinks.s3.hdfs.fileType=DataStream
a1.sinks.s3.hdfs.round=true
a1.sinks.s3.hdfs.roundValue=2      
a1.sinks.s3.hdfs.roundUnit=minute
​
#设置hdfs的sink
a1.sinks.s4.type=hdfs
a1.sinks.s4.hdfs.path=hdfs://hadoop01:8020/flume/others
a1.sinks.s4.hdfs.useLocalTimeStamp=true
a1.sinks.s4.hdfs.filePrefix=tom
a1.sinks.s4.hdfs.fileSuffix=.xxx
a1.sinks.s4.hdfs.rollInterval=60
a1.sinks.s4.hdfs.rollSize=1024
a1.sinks.s4.hdfs.rollCount=10
a1.sinks.s4.hdfs.batchSize=100
a1.sinks.s4.hdfs.writeFormat=Text
a1.sinks.s4.hdfs.fileType=DataStream
a1.sinks.s4.hdfs.round=true
a1.sinks.s4.hdfs.roundValue=2      
a1.sinks.s4.hdfs.roundUnit=minute

2)启动方案

[root@hadoop01 ~]# flume-ng agent  -f ./flumeconf/syslogtcp-mem-hdfs-multiplexing.properties -n a1  -Dflume.root.logger=INFO,console

3)测试

[root@hadoop01 ~]# curl -X POST -d '[{"headers":{"hobbys":"movie"},"body":"this is canglaoshi  conntent"}]' http://hadoop01:10086
[root@hadoop01 ~]# curl -X POST -d '[{"headers":{"hobbys":"music"},"body":"see you again"}]' http://hadoop01:10086
[root@hadoop01 ~]# curl -X POST -d '[{"headers":{"hobbys":"ball"},"body":"basketball"}]' http://hadoop01:10086
[root@hadoop01 ~]# curl -X POST -d '[{"headers":{"hobbys":"car"},"body":"this my audi"}]' http://hadoop01:10086

2 flume的拦截器
flume的拦截器可以作用在source端和sink端,用于拦截flow中的event,向event中的headers里添加键值对,可以用于判断,选择等。
常用的拦截器有:
Timestamp Interceptor:时间戳拦截器
拦截event,将时间设置到headers里,默认的key是timestamp
Host Interceptor:主机拦截器
拦截event,将主机名或者是ip地址设置到headers里
Static Interceptor:静态拦截器
拦截event,可以自定义key
Regex Filtering Interceptor:正则表达式拦截器
使用正则表达式,来拦截过滤掉匹配上的event。
2.1 案例演示
该案例综合三个拦截器一起使用Timestamp、host、static,参考采集方案即可

需求: 时间戳拦截器: 使用hdfs的sink。 可以取消本地时间的使用
主机拦截器: 将主机名|ip设置成 hdfs的文件夹名称
静态拦截器: 指定一个固定值为hdfs的文件后缀

   注意:hdfs的文件后缀 不能从静态拦截器上获取静态值。

1)采集方案的编写

[root@hadoop01 ~]# vim flumeconf/interceptors1.properties
#命名,并关联
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
​
#设置每个组件的接口以及属性
a1.sources.r1.type=http
a1.sources.r1.bind=hadoop01
a1.sources.r1.port=10086
a1.sources.r1.handler=org.apache.flume.source.http.JSonHandler
a1.sources.r1.interceptors = i1 i2 i3
a1.sources.r1.interceptors.i1.type=timestamp
a1.sources.r1.interceptors.i2.type=host
a1.sources.r1.interceptors.i2.useIP=false
a1.sources.r1.interceptors.i2.hostHeader=host
a1.sources.r1.interceptors.i3.type=static
a1.sources.r1.interceptors.i3.preserveExisting=false
a1.sources.r1.interceptors.i3.key=mienv
a1.sources.r1.interceptors.i3.value=.xxx
​
#设置channel组件的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
​
​
#设置hdfs的sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/%{host}/%Y-%m-%d/%H-%M
a1.sinks.s1.hdfs.useLocalTimeStamp=false
a1.sinks.s1.hdfs.filePrefix=%{meinv}
a1.sinks.s1.hdfs.fileSuffix=.xxx
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=2      
a1.sinks.s1.hdfs.roundUnit=minute

2)启动方案

[root@hadoop01 ~]# flume-ng agent  -f ./flumeconf/interceptors1.properties -n a1  -Dflume.root.logger=INFO,console

3)测试

[root@hadoop01 ~]# curl -X POST -d '[{"headers":{"hobbys":"movie"},"body":"this is canglaoshi  conntent"}]' http://hadoop01:10086


检查hdfs上的路径上是否有hadoop01和时间,以及文件名的前缀是否为.xxx

2.2 正则表达式拦截器的案例演示
需求:
使用正则表达式,判断event的body正文的第一个字符是否为数字,如果是数字,就过滤掉不要。


注意: 正则表达式 可以使用双引号,也可以不用引号,但是不能用单引号
1)采集方案的编写

[root@hadoop01 ~]# vim flumeconf/interceptors2.properties
#命名,并关联
a1.sources=r1
a1.channels=c1
a1.sinks=s1
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
​
#设置每个组件的接口以及属性
a1.sources.r1.type=http
a1.sources.r1.bind=hadoop01
a1.sources.r1.port=10086
a1.sources.r1.handler=org.apache.flume.source.http.JSonHandler
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type=regex_filter
a1.sources.r1.interceptors.i1.regex=^[0-9].*
a1.sources.r1.interceptors.i1.excludeEvents=true
​
​
#设置channel组件的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
​
​
#设置hdfs的sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/regex/%Y-%m-%d/%H-%M
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=tom
a1.sinks.s1.hdfs.fileSuffix=.zzz
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=2      
a1.sinks.s1.hdfs.roundUnit=minute

2)启动方案

[root@hadoop01 ~]# flume-ng agent  -f ./flumeconf/interceptors2.properties -n a1  -Dflume.root.logger=INFO,console

3)测试

[root@hadoop01 ~]# curl -X POST -d '[{"headers":{},"body":"0123hellworld"}]' http://hadoop01:10086
[root@hadoop01 ~]# curl -X POST -d '[{"headers":{},"body":"audi666"}]' http://hadoop01:10086

检查hdfs上的路径上是否有hadoop01和时间,以及文件名的前缀是否为nazha

3 自定义拦截器的应用
1)需求:
将event的正文 如果是数字开头的数据存储到hdfs上的/flume/number/下
将event的正文 如果是字母开头的数据存储到hdfs上的/flume/character/下
其他字符存储到hdfs上的/flume/others/下
2)分析
根据需求来分析,时间戳、主机、静态拦截器都无法满足需求,而正则表达式拦截器也不能满足需求,因为正则表达式只能对满足匹配格式的进行过滤掉而已。
所以,需要自定义拦截器,来实现相应的逻辑,并使用多副路选择器来完成具体event的分发。
多副路选择器可以进行如下设置

a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header = content
a1.sources.r1.selector.mapping.number = c1
a1.sources.r1.selector.mapping.character = c2
a1.sources.r1.selector.default = c3

每个通道对应一个sink。

因此:在自定义拦截器里的逻辑应该是判断是否为数字|字母|其他开头,然后设置消息头里的键值对
content:number
content:character

3)自定义拦截器
pom.xml


    
        org.apache.flume
        flume-ng-core
        1.8.0
    

代码如下:

package com.qf.flume.interceptor;
​
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
​
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
​

public class MyInterceptor implements Interceptor {
    @Override
    public void initialize() {
​
    }
​
    
    @Override
    public Event intercept(Event event) {
        Map map = new HashMap<>();
        //获取event的正文内容
        byte[] body = event.getBody();
        String data = new String(body);
        //获取第一个字符
        char c = data.charAt(0);
        if(c>=48&&c<=57){
            map.put("content","number");
        }else if((c>=65&&c<=90)||(c>=97&&c<=122)){
            map.put("content","character");
        }else{
            map.put("content","others");
        }
        event.setHeaders(map);
        return event;
    }
    
    @Override
    public List intercept(List list) {
        List newList = new ArrayList<>();
        for (Event event : list) {
            newList.add(intercept(event));
        }
        return newList;
    }
​
    @Override
    public void close() {
​
    }
​
    
    public static class MyBuilder implements Builder{
​
        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }
​
        @Override
        public void configure(Context context) {
​
        }
    }
}

打成jar包,上传到flume的lib目录下

4)编写采集方案

[root@hadoop01 ~]# vim flumeconf/custom-interceptor.properties
#命名,并关联
a1.sources=r1
a1.channels=c1 c2 c3
a1.sinks=s1 s2 s3
a1.sources.r1.channels=c1 c2 c3
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
a1.sinks.s3.channel=c3
​
#设置每个组件的接口以及属性
a1.sources.r1.type=http
a1.sources.r1.bind=hadoop01
a1.sources.r1.port=10086
a1.sources.r1.handler=org.apache.flume.source.http.JSonHandler
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=content
a1.sources.r1.selector.mapping.number=c1
a1.sources.r1.selector.mapping.character=c2
a1.sources.r1.selector.default=c3
#使用自定义的拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type=com.qf.flume.interceptor.MyInterceptor$MyBuilder
​
#设置channel组件的属性
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
​
#设置channel组件的属性
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
​
#设置channel组件的属性
a1.channels.c3.type=memory
a1.channels.c3.capacity=1000
a1.channels.c3.transactionCapacity=100
​
#设置hdfs的sink
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://hadoop01:8020/flume/number
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s1.hdfs.filePrefix=tom
a1.sinks.s1.hdfs.fileSuffix=.xxx
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=2      
a1.sinks.s1.hdfs.roundUnit=minute
​
​
#设置hdfs的sink
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://hadoop01:8020/flume/character
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sinks.s2.hdfs.filePrefix=tom
a1.sinks.s2.hdfs.fileSuffix=.xxx
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=2      
a1.sinks.s2.hdfs.roundUnit=minute
​
#设置hdfs的sink
a1.sinks.s3.type=hdfs
a1.sinks.s3.hdfs.path=hdfs://hadoop01:8020/flume/others
a1.sinks.s3.hdfs.useLocalTimeStamp=true
a1.sinks.s3.hdfs.filePrefix=tom
a1.sinks.s3.hdfs.fileSuffix=.xxx
a1.sinks.s3.hdfs.rollInterval=60
a1.sinks.s3.hdfs.rollSize=1024
a1.sinks.s3.hdfs.rollCount=10
a1.sinks.s3.hdfs.batchSize=100
a1.sinks.s3.hdfs.writeFormat=Text
a1.sinks.s3.hdfs.fileType=DataStream
a1.sinks.s3.hdfs.round=true
a1.sinks.s3.hdfs.roundValue=2      
a1.sinks.s3.hdfs.roundUnit=minute

5)启动方案

[root@hadoop01 ~]# flume-ng agent  -f ./flumeconf/custom-interceptor.properties -n a1  -Dflume.root.logger=INFO,console

6)测试

[root@hadoop01 ~]# curl -X POST -d '[{"headers":{},"body":"0123hellworld"}]' http://hadoop01:10086
[root@hadoop01 ~]# curl -X POST -d '[{"headers":{},"body":"audi777"}]' http://hadoop01:10086
[root@hadoop01 ~]# curl -X POST -d '[{"headers":{},"body":"-audi888"}]' http://hadoop01:10086

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683422.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存