浅尝Flume核心组件

浅尝Flume核心组件,第1张

浅尝Flume核心组件

文章目录

Flume核心组件

SourceChannelSink 浅尝Flume

Flume安装采集文件内容上传HDFS

Source配置Channel配置Sink配置生成测试数据结果验证 采集网页数据上传HDFS

Agent配置shell脚本生成测试数据执行顺序 参考文献

Flume核心组件

Flume是一个高可用,高可靠,分布式的海量日志采集、聚合和传输的系统,能够有效的收集、聚合、移动大量的日志数据。

图1 Flume典型应用场景

图1为Flume的一个典型应用场景,左边的是一个Web服务器,它会产生大量的日志数据,中间的是一个Agent代理,通过代理可以将日志数据采集到HDFS上。Agent内部有三大核心组件:Source、Channel和Sink。Source是数据源,负责读取数据;Channel是临时存储数据的,Source会把读取到的数据临时存储到Channel中;Sink是负责从Channel中读取数据的,最终将数据写出去,写到指定的目的地中。下面将对这三大核心组件进行详细介绍。

Source

Source:数据源
通过source组件可以指定让Flume读取哪里的数据,然后将数据传递给后面的channel。
Flume内置支持读取很多种数据源,基于文件、基于目录、基于TCPUDP端口、基于HTTP、Kafka的等。

常用的Source:

Exec Source:实现文件监控,可以实时监控文件中的新增内容(常用)NetCat TCP/UDP Source :采集指定端口(tcp、udp)的数据,可以读取流经端口的每一行数据Spooling Directory Source:采集文件夹里新增的文件Kafka Source:从Kafka消息队列中采集数据(常用)

【注意】Exec Source类似于linux中的tail -f效果。在这需要注意tail -F 和 tail -f的区别。该文件被删除或改名后,如果再次创建相同的文件名,tail -F会继续追踪,tail -f相反

Channel

Channel:接受Source发出的数据,可以把channel理解为一个临时存储数据的管道

常见Channel:

Memory Channel:使用内存作为数据的存储(效率高但可能丢数据)File Channel:使用文件来作为数据的存储(效率相对较低但数据不会丢失)Spillable Memory Channel:使用内存和文件作为数据存储,即先把数据存到内存中,如果内存中数据达到阈值再flush到文件中(解决内存不够用的情况但还是存在丢数据的风险) Sink

Sink:从Channel中读取数据并存储到指定目的地(打印到控制台、HDFS、Kafka)

常用的sink组件:

Logger Sink:将数据作为日志处理,可以选择打印到控制台或者写到文件中,这个主要在测试的时候使用HDFS Sink:将数据传输到HDFS中,这个是比较常见的,主要针对离线计算的场景Kafka Sink:将数据发送到kafka消息队列中,这个也是比较常见的,主要针对实时计算场景,数据不落盘,实时传输,最后使用实时计算框架直接处理。 浅尝Flume Flume安装

Flume官网下载

图2 Flume官网下载界面

下载解压完成之后,修改flume的env环境变量配置文件,在flume的conf目录下,修改flume-env.sh.template的名字,去掉后缀template即可。

 mv flume-env.sh.template  flume-env.sh
采集文件内容上传HDFS

需求:采集目录中已有的文件内容,存储到HDFS
分析:source是要基于目录的(Spooling Directory Source),channel使用(File Channel),sink使用hdfs sink

Source配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/log/studentDir
Channel配置
a1.channels.c1.type = file
# checkpointDir是存放检查点目录
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/studentDir/checkpoint
# data是存放数据的目录
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/studentDir/data
Sink配置
a1.sinks.k1.type = hdfs
# 设置hdfs路径
a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/flume/studentDir
# 设置前缀 可选项
a1.sinks.k1.hdfs.filePrefix = stu-
# fileType默认是SequenceFile,DataStream 不会对输出数据进行压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 默认值是Writable,建议改为Text普通文本数据
a1.sinks.k1.hdfs.writeFormat = Text
# 表示hdfs多长时间切分一个文件 单位是秒 默认为30
a1.sinks.k1.hdfs.rollInterval = 3600
# hdfs上切出来的文件大小  单位是字节 默认为1024
a1.sinks.k1.hdfs.rollSize = 134217728
# hdfs.rollCount每隔N条数据切出来一个文件,默认是10,0表示不按数据条数切文件
a1.sinks.k1.hdfs.rollCount = 0

最后连接组件

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

将上面的代码放入data/soft/apache-flume-1.9.0-bin/conf/file-to-hdfs.conf中

生成测试数据

将bigdata05节点设置为hadoop集群的一个客户端节点,这样 *** 作hdfs就没有任何问题了。在bigdata05机器中生成测试数据。

[root@bigdata05 ~]# mkdir -p /data/log/studentDir
[root@bigdata05 ~]# cd /data/log/studentDir
[root@bigdata05 studentDir]# more testData.dat
java    18      male
python  20      male
hdfs    17      male

【注意】testData.dat文件后缀可自由命名
启动Agent命令,将日志信息打印到控制台。

bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-hdfs.conf -Dflume.root.logger=INFO,console
结果验证
[root@bigdata05 apache-flume-1.9.0-bin]# hdfs dfs -ls hdfs://192.168.182.100:9000/flume/studentDir
Found 1 items
-rw-r--r--   2 root supergroup         42 2020-05-02 16:14 hdfs://192.168.182.100:9000/flume/studentDir/stu-.1588407260986.tmp

tmp表示它在被使用,因为Flume只要采集到数据就会向里面写,这个后缀默认是由hdfs.inUseSuffix参数来控制的,当Agent停止的时候就会去掉.tmp标志了
【注意】Flume不会重复读取同一个文件的数据,原因在于每执行完毕一个文件,会在添加一个后缀,即.COMPLETED

[root@bigdata05 ~]# cd /data/log/studentDir/
[root@bigdata05 studentDir]# ll
total 4
-rw-r--r--. 1 root root 42 May  2 13:46 class1.dat.COMPLETED
采集网页数据上传HDFS

需求:

    将A和B两台机器实时产生的日志数据汇总到机器C中通过机器C将数据统一上传至HDFS的指定目录中
图3 实现采集网页日志数据上传HDFS分析图

使用bigdata02和bigdata03采集当前机器上产生的实时日志数据,统一汇总到bigdata04机器上。

其中bigdata02和bigdata03中的source使用ExecSource,因为要实时读取文件中的新增数据;channel在这里我们使用基于内存的Channel;由于bigdata02和bigdata03的数据需要快速发送到bigdata04中,为了快速发送我们可以通过网络直接传输,使用Avro sink,

bigdata04的source使用Avro source、channel还是基于内存的Channel,sink就使用Hdfs sink。
最终需要在每台机器上启动一个Agent,启动的时候需要注意先后顺序,先启动bigdata04上面的,再启动bigdata02和bigdata03上面的。

Agent配置

bigdata02和bigdata03的Agent配置

[root@bigdata03 conf] vi file-to-avro-102.conf
# bigdata02的文件名是file-to-avro-o2,bigdata03的文件名是file-to-avro-o3进行区分
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 配置source组件
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /data/log/access.log
# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置sink组件
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.182.103
a1.sinks.k1.port = 45454
# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

bigdata04 Agent配置文件

[root@bigdata04 conf] vi avro-to-hdfs.conf
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 配置source组件
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 45454
# 配置channel组件
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置sink组件
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.182.100:9000/access/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = access
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 把组件连接起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

【注意】bigdata02和bigdata03中配置的a1.sinks.k1.port 的值需要和bigdata04中配置的a1.sources.r1.port的值一样

shell脚本生成测试数据
#!/bin/bash
# 循环向文件中生成数据
while [ "1" = "1" ]
do
	# 获取当前时间戳
	curr_time=`date +%s`
	# 获取当前主机名
	name=`hostname`
	echo ${name}_${curr_time} >> /data/log/access.log
	# 暂停1秒
	sleep 1
done
执行顺序
    启动bigdata04中的Agent启动bigdata-02上的agent服务和shell脚本启动bigdata-03上的agent服务和shell脚本验证结果关闭bigdata-02上的agent服务和shell脚本关闭bigdata-03上的agent服务和shell脚本关闭bigdata04中的Agent
参考文献

https://www.imooc.com/wiki/BigData:慕课网《大数据开发工程师体系课程》

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701636.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存