Flume核心组件
SourceChannelSink 浅尝Flume
Flume安装采集文件内容上传HDFS
Source配置Channel配置Sink配置生成测试数据结果验证 采集网页数据上传HDFS
Agent配置shell脚本生成测试数据执行顺序 参考文献
Flume核心组件Flume是一个高可用,高可靠,分布式的海量日志采集、聚合和传输的系统,能够有效的收集、聚合、移动大量的日志数据。
图1为Flume的一个典型应用场景,左边的是一个Web服务器,它会产生大量的日志数据,中间的是一个Agent代理,通过代理可以将日志数据采集到HDFS上。Agent内部有三大核心组件:Source、Channel和Sink。Source是数据源,负责读取数据;Channel是临时存储数据的,Source会把读取到的数据临时存储到Channel中;Sink是负责从Channel中读取数据的,最终将数据写出去,写到指定的目的地中。下面将对这三大核心组件进行详细介绍。
SourceSource:数据源
通过source组件可以指定让Flume读取哪里的数据,然后将数据传递给后面的channel。
Flume内置支持读取很多种数据源,基于文件、基于目录、基于TCPUDP端口、基于HTTP、Kafka的等。
常用的Source:
Exec Source:实现文件监控,可以实时监控文件中的新增内容(常用)NetCat TCP/UDP Source :采集指定端口(tcp、udp)的数据,可以读取流经端口的每一行数据Spooling Directory Source:采集文件夹里新增的文件Kafka Source:从Kafka消息队列中采集数据(常用)
【注意】Exec Source类似于linux中的tail -f效果。在这需要注意tail -F 和 tail -f的区别。该文件被删除或改名后,如果再次创建相同的文件名,tail -F会继续追踪,tail -f相反
ChannelChannel:接受Source发出的数据,可以把channel理解为一个临时存储数据的管道
常见Channel:
Memory Channel:使用内存作为数据的存储(效率高但可能丢数据)File Channel:使用文件来作为数据的存储(效率相对较低但数据不会丢失)Spillable Memory Channel:使用内存和文件作为数据存储,即先把数据存到内存中,如果内存中数据达到阈值再flush到文件中(解决内存不够用的情况但还是存在丢数据的风险) Sink
Sink:从Channel中读取数据并存储到指定目的地(打印到控制台、HDFS、Kafka)
常用的sink组件:
Logger Sink:将数据作为日志处理,可以选择打印到控制台或者写到文件中,这个主要在测试的时候使用HDFS Sink:将数据传输到HDFS中,这个是比较常见的,主要针对离线计算的场景Kafka Sink:将数据发送到kafka消息队列中,这个也是比较常见的,主要针对实时计算场景,数据不落盘,实时传输,最后使用实时计算框架直接处理。 浅尝Flume Flume安装
Flume官网下载
下载解压完成之后,修改flume的env环境变量配置文件,在flume的conf目录下,修改flume-env.sh.template的名字,去掉后缀template即可。
mv flume-env.sh.template flume-env.sh采集文件内容上传HDFS
需求:采集目录中已有的文件内容,存储到HDFS
分析:source是要基于目录的(Spooling Directory Source),channel使用(File Channel),sink使用hdfs sink
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /data/log/studentDirChannel配置
a1.channels.c1.type = file # checkpointDir是存放检查点目录 a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/studentDir/checkpoint # data是存放数据的目录 a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/studentDir/dataSink配置
a1.sinks.k1.type = hdfs # 设置hdfs路径 a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/flume/studentDir # 设置前缀 可选项 a1.sinks.k1.hdfs.filePrefix = stu- # fileType默认是SequenceFile,DataStream 不会对输出数据进行压缩 a1.sinks.k1.hdfs.fileType = DataStream # 默认值是Writable,建议改为Text普通文本数据 a1.sinks.k1.hdfs.writeFormat = Text # 表示hdfs多长时间切分一个文件 单位是秒 默认为30 a1.sinks.k1.hdfs.rollInterval = 3600 # hdfs上切出来的文件大小 单位是字节 默认为1024 a1.sinks.k1.hdfs.rollSize = 134217728 # hdfs.rollCount每隔N条数据切出来一个文件,默认是10,0表示不按数据条数切文件 a1.sinks.k1.hdfs.rollCount = 0
最后连接组件
a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
将上面的代码放入data/soft/apache-flume-1.9.0-bin/conf/file-to-hdfs.conf中
生成测试数据将bigdata05节点设置为hadoop集群的一个客户端节点,这样 *** 作hdfs就没有任何问题了。在bigdata05机器中生成测试数据。
[root@bigdata05 ~]# mkdir -p /data/log/studentDir [root@bigdata05 ~]# cd /data/log/studentDir [root@bigdata05 studentDir]# more testData.dat java 18 male python 20 male hdfs 17 male
【注意】testData.dat文件后缀可自由命名
启动Agent命令,将日志信息打印到控制台。
bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-hdfs.conf -Dflume.root.logger=INFO,console结果验证
[root@bigdata05 apache-flume-1.9.0-bin]# hdfs dfs -ls hdfs://192.168.182.100:9000/flume/studentDir Found 1 items -rw-r--r-- 2 root supergroup 42 2020-05-02 16:14 hdfs://192.168.182.100:9000/flume/studentDir/stu-.1588407260986.tmp
tmp表示它在被使用,因为Flume只要采集到数据就会向里面写,这个后缀默认是由hdfs.inUseSuffix参数来控制的,当Agent停止的时候就会去掉.tmp标志了
【注意】Flume不会重复读取同一个文件的数据,原因在于每执行完毕一个文件,会在添加一个后缀,即.COMPLETED
[root@bigdata05 ~]# cd /data/log/studentDir/ [root@bigdata05 studentDir]# ll total 4 -rw-r--r--. 1 root root 42 May 2 13:46 class1.dat.COMPLETED采集网页数据上传HDFS
需求:
- 将A和B两台机器实时产生的日志数据汇总到机器C中通过机器C将数据统一上传至HDFS的指定目录中
使用bigdata02和bigdata03采集当前机器上产生的实时日志数据,统一汇总到bigdata04机器上。
其中bigdata02和bigdata03中的source使用ExecSource,因为要实时读取文件中的新增数据;channel在这里我们使用基于内存的Channel;由于bigdata02和bigdata03的数据需要快速发送到bigdata04中,为了快速发送我们可以通过网络直接传输,使用Avro sink,
bigdata04的source使用Avro source、channel还是基于内存的Channel,sink就使用Hdfs sink。
最终需要在每台机器上启动一个Agent,启动的时候需要注意先后顺序,先启动bigdata04上面的,再启动bigdata02和bigdata03上面的。
bigdata02和bigdata03的Agent配置
[root@bigdata03 conf] vi file-to-avro-102.conf # bigdata02的文件名是file-to-avro-o2,bigdata03的文件名是file-to-avro-o3进行区分 # agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置source组件 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /data/log/access.log # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.182.103 a1.sinks.k1.port = 45454 # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
bigdata04 Agent配置文件
[root@bigdata04 conf] vi avro-to-hdfs.conf # agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置source组件 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 45454 # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/access/%Y%m%d a1.sinks.k1.hdfs.filePrefix = access a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
【注意】bigdata02和bigdata03中配置的a1.sinks.k1.port 的值需要和bigdata04中配置的a1.sources.r1.port的值一样
shell脚本生成测试数据#!/bin/bash # 循环向文件中生成数据 while [ "1" = "1" ] do # 获取当前时间戳 curr_time=`date +%s` # 获取当前主机名 name=`hostname` echo ${name}_${curr_time} >> /data/log/access.log # 暂停1秒 sleep 1 done执行顺序
- 启动bigdata04中的Agent启动bigdata-02上的agent服务和shell脚本启动bigdata-03上的agent服务和shell脚本验证结果关闭bigdata-02上的agent服务和shell脚本关闭bigdata-03上的agent服务和shell脚本关闭bigdata04中的Agent
https://www.imooc.com/wiki/BigData:慕课网《大数据开发工程师体系课程》
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)