- Flume案例
- 1、简单案例
- 2、实时监控目录下的多个追加文件
- 3、Flume企业开发案例之复制(单数据源多出口)
- 4、Flume企业开发案例之多路复用及拦截器的使用
- 5、Flume企业开发案例之聚合(多数据源一出口)
需求:
监听并收集端口44444的数据,打印至控制台。
环境:
#安装netcat工具 sudo yum install -y nc #检查44444端口是否占用 sudo netstat -nlp | grep 44444
实现步骤:
-
创建nc-flume-log.conf
- 为sources、sinks、channels命名
- 配置指定source
- 配置指定sink
- 配置指定channel
- 配置三者的连接关系
# 配置Agent a1各个组件的名称 a1.sources = r1 #Agent a1 的source有一个,叫做r1 a1.sinks = k1 #Agent a1 的sink也有一个,叫做k1 a1.channels = c1 #Agent a1 的channel有一个,叫做c1 # 配置Agent a1的source r1的属性 a1.sources.r1.type = netcat #使用的是NetCat TCP Source,这里配的是别名,Flume内置的一些组件都是有别名的,没有别名填全限定类名 a1.sources.r1.bind = localhost #NetCat TCP Source监听的hostname,这个是本机 a1.sources.r1.port = 44444 #监听的端口 # 配置Agent a1的sink k1的属性 a1.sinks.k1.type = logger # sink使用的是Logger Sink,这个配的也是别名 # 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的 a1.channels.c1.type = memory #channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据 a1.channels.c1.capacity = 1000 #内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多 a1.channels.c1.transactionCapacity = 100 #source和sink从内存channel每次事务传输的event数量 # 把source和sink绑定到channel上 a1.sources.r1.channels = c1 #与source r1绑定的channel有一个,叫做c1 a1.sinks.k1.channel = c1 #与sink k1绑定的channel有一个,叫做c1
-
启动flume监听端口
[hadoop@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/nc-flume-log.conf -Dflume.root.logger=INFO,console [hadoop@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f conf/nc-flume-log.conf -Dflume.root.logger=INFO,console
参数说明:
–name/-n:表示给agent起名为a1
–conf-file/-f:flume本次启动读取的配置文件是在conf文件夹下的nc-flume-log.conf文件。
-Dflume.root.logger=INFO,console :表示运行时动态修改参数属性值,并将控制台日志打印级别设置为级别。日志级别包括log、info、warn、error。该配置将覆盖日志参数配置文件中的相应配置。
-
使用netcat向本机44444端口发送内容测试
nc localhost 44444
Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。
Taildir Source维护了一个json格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置,因此能够实现断点续传。
需求:
使用Flume监听整个目录的实时追加文件,并上传至HDFS
步骤:
-
创建配置文件taildir-flume-hdfs.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 f2 # 必须精确到文件,可以写匹配表达式匹配多个文件 a1.sources.r1.filegroups.f1 = /opt/module/flume/files1/.*file.* a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*log.* # 实现断点续传的文件存放位置 不改有默认位置也能实现断点续传 a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H #上传文件的前缀 a1.sinks.k1.hdfs.filePrefix = log- #是否使用本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a1.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a1.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a1.sinks.k1.hdfs.rollInterval = 30 #设置每个文件的滚动大小大概是128M a1.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a1.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
测试
启动监控文件夹命令
[hadoop@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/taildir-flume-hdfs.conf
创建/opt/module/flume/files1/目录、file1文件和/opt/module/flume/files2/目录、log1文件
echo hello >> file1 echo hello1 >> log1
-
查看hdfs中文件夹
需求:
使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。
步骤:
-
在/opt/module/flume/conf目录下创建group1文件夹准备flume配置文;在/opt/module/flume/目录下创建flume3datas文件夹,准备接受本地数据
-
配置文件flume1
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/module/flume/files1/.*file.* a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*log.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json # 将数据流复制给所有channel 默认参数可以不写 a1.sources.r1.selector.type = replicating # Describe the sink # sink端的avro是一个数据发送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
-
配置文件flume2
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = hadoop102 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume1/%Y%m%d/%H # 文件的前缀 a1.sinks.k1.hdfs.filePrefix = log- #多久生成一个新的文件 a1.sinks.k1.hdfs.rollInterval = 30 #设置每个文件的滚动大小大概是128M a1.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a1.sinks.k1.hdfs.rollCount = 0 # 使用本地的时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true #设置文件类型,可支持压缩 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
配置文件flume3
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = hadoop102 a1.sources.r1.port = 4142 # Describe the sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /opt/module/flume/flume3datas # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
启动各flume进程
-
修改指定文件夹下的文件进行测试
echo hello >> file1.txt
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume的channel selecter中的Multiplexing结构。Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。
需求:
使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。要求以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。
步骤:
-
拦截器设计
pom文件配置
org.apache.flume flume-ng-core1.9.0 拦截器
import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; import java.util.Map; public class MyInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 需要配合channel选择器使用 向headers当中put对应的参数 // 根据传入的数据 首位是数字还是字母 判断他是不同类型的日志 byte[] body = event.getBody(); byte b = body[0]; Map
headers = event.getHeaders(); if (b >= '0' && b <= '9'){ // b为数字 headers.put("type","number"); }else if((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z')){ // b 为字母 headers.put("type","letter"); } // 可以不需要在写放回headers event.setHeaders(headers); return event; } @Override public List intercept(List events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { } // 静态内部类 public static class MyBuilder implements Builder{ @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) { } } } -
打包上传至flume中lib目录下
-
配置文件flume1
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.selector.type = multiplexing # 使用headers中的哪些参数 a1.sources.r1.selector.header = type a1.sources.r1.selector.mapping.number = c1 a1.sources.r1.selector.mapping.letter = c2 # a1.sources.r1.selector.default = c4 # 拦截器配置 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.MyInterceptor$MyBuilder # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
-
配置文件flume2
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop102 a1.sources.r1.port = 4141 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
-
配置文件flume3
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop102 a1.sources.r1.port = 4142 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
-
测试
需求:
hadoop102上的Flume-1监控文件/opt/module/flume/files1/.*file.*,
hadoop103上的Flume-2监控某一个端口的数据流,
Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。
环境:
- 分发flume和环境变量文件至hadoop103、hadoop104
步骤:
-
配置文件flume1
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /opt/module/flume-1.9.0/flume/files1/.*file.* a1.sources.r1.filegroups.f2 = /opt/module/flume-1.9.0/flume/files2/.*log.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 4141 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
配置文件flume2
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 4141 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
配置文件flume3
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = hadoop104 a1.sources.r1.port = 4141 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
启动
[hadoop@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group3/flume1.conf [hadoop@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group3/flume2.conf [hadoop@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group3/flume3.conf
-
测试
在hadoop102上向/opt/module/flume目录下的group.log追加内容
echo 'hello' > file1 echo 'hello' > log1
在hadoop103上向44444端口发送数据
-
查看hadoop104的变化
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)