Flume Agent组件搭配

Flume Agent组件搭配,第1张

Flume Agent组件搭配 1. Agent 组件

Agent(代理)中的组件包括 Source、Channel、Sink。

1.1 Source

Source 组件可以处理各种类型、各种格式的日志数据。

Flume 中常用的 source:

  • avro
  • exec
  • netcat
  • spooling directory
  • taildir
常用类别描述avro监听Avro端口并从外部Avro客户端流接收 Event(事件)execExec source在启动时运行一个给定的Unix命令,tail -F filenetcat监听给定的端口,并将每一行文本转换为 Eventsoopling directpry监控整个目录,读取目录中所有的文件,不支持递归 ,不支持断点续传taildir可以监控多个文件或者文件夹的变化,并支持记录读取进度 1.3 Channel

Channel 是位于 Source 和 Sink 之间用于存放 Event 的缓冲区。

Channel 是线程安全的,可以同时处理几个 Source 的写入 *** 作和几个 Sink 的读取 *** 作。

Flume 中常用的 Channel:

  • Memary Channel
  • File Channel
  • Kafka Channel
常用类别描述MemaryEvent 存储在内存中的队列FileEvent 存储在文件中,无需担心数据丢失KafKaEvent 存储在 Kafka 集群中。Kafka 提供了高可用性 1.3 Sink

Sink 将 Channel 中的 Event 批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Flume 中常用的 sink:

  • logger
  • hdfs
  • avro
  • Hbase
常用类别描述logger控制台输出,日志级别为INFO。通常用于测试/调试hdfsEvent 事件写入HDFSavro发送到 Sink 的 Flume Event 被转换为 Avro 事件,并发送到配置的主机名/端口HbaseEvent 事件写入Hbase 2. Agent 组件搭配

Agent 的组件可以自定义搭配,可以使用不同的 Source、Sink、Channel。

下面罗列几种简单的搭配方式。

exec2logger 表示 exec to logger,2 代表 to,是将 two 音译为 to。常见的有 p2p、log4j 等,都是拟声为数字。

可以先启动 Hadoop 集群:start-dfs.sh。

复杂的 Flume 启动命令为:

flume-ng agent --conf  --conf-file  --name  -Dflume.root.logger=INFO,console

以上命令可简写为:

flume-ng agent -c  -f  -n  -Dflume.root.logger=INFO,console
2.1 Source 2.1.1 exec2logger 常用配置项默认值描述type-将类型配置为 execcommand-配置执行的命令,比如:tail -F file

1️⃣ Step1:在 flume 目录下自定义一个目录(我的是 agents 目录),添加配置文件 exec2logger.conf

vim exec2logger.conf
# exec 读取日志文件 将数据发送到 logger sink

# Name the components on this agent
exec2logger.sources = r1
exec2logger.sinks = k1
exec2logger.channels = c1

# Describe/configure the source
exec2logger.sources.r1.type = exec
exec2logger.sources.r1.command = tail -F /root/log.txt


# Describe the sink
exec2logger.sinks.k1.type = logger

# Use a channel which buffers events in memory
exec2logger.channels.c1.type = memory
exec2logger.channels.c1.capacity = 1000
exec2logger.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
exec2logger.sources.r1.channels = c1
exec2logger.sinks.k1.channel = c1

2️⃣ Step2:启动 Flume exec2logger

flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/exec2logger.conf -n exec2logger -Dflume.root.logger=INFO,console

3️⃣ Step3:测试,并查看控制台输出(其实不用touch创建该文件,>> 是进行流的重定向,如果文件不存在就直接新建了)

2.1.2 spoolDir2logger 常用配置项默认值描述type-将类型配置为 spooldirspoolDir-配置成需要读取的文件(绝对路径)

1️⃣ Step1:添加配置文件 spooldir2logger.conf

# spooldir 读取同目录中的多个日志文件 将数据发送到 logger sink

# Name the components on this agent
spooldir2logger.sources = r1
spooldir2logger.sinks = k1
spooldir2logger.channels = c1

# Describe/configure the source
spooldir2logger.sources.r1.type = spooldir
spooldir2logger.sources.r1.spoolDir = /root/log


# Describe the sink
spooldir2logger.sinks.k1.type = logger

# Use a channel which buffers events in memory
spooldir2logger.channels.c1.type = memory
spooldir2logger.channels.c1.capacity = 1000
spooldir2logger.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
spooldir2logger.sources.r1.channels = c1
spooldir2logger.sinks.k1.channel = c1

2️⃣ Step2:启动 Flume spooldir2logger

flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/spooldir2logger.conf -n spooldir2logger -Dflume.root.logger=INFO,console

3️⃣ Step3:测试,当文件内的内容输出完成后,会将该文件后缀标记为 COMPLETED 表示已完成的文件。

4️⃣ Step4:如果我们向完成后的文件输入内容,就无法再获取文件变化了。

2.1.3 tailDir2logger 常用配置项默认值描述type-将类型配置为 TAILDIRfilegroups-配置文件组filegroups.-为文件组配置对应的文件或目录positionFile~/.flume/taildir_position.jsontailDir记录文件文件的绝对路径

1️⃣ Step1:添加配置文件 taildir2logger.conf

# tailDir 读取同目录中的多个日志文件 将数据发送到 logger sink

# Name the components on this agent
taildir2logger.sources = r1
taildir2logger.sinks = k1
taildir2logger.channels = c1

# Describe/configure the source
taildir2logger.sources.r1.type = TAILDIR
taildir2logger.sources.r1.filegroups = g1 g2
taildir2logger.sources.r1.filegroups.g1 = /root/log.txt
taildir2logger.sources.r1.filegroups.g2 = /root/log/.*.txt

# Describe the sink
taildir2logger.sinks.k1.type = logger

# Use a channel which buffers events in memory
taildir2logger.channels.c1.type = memory
taildir2logger.channels.c1.capacity = 1000
taildir2logger.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
taildir2logger.sources.r1.channels = c1
taildir2logger.sinks.k1.channel = c1

2️⃣ Step2:启动 Flume taildir2logger

flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/taildir2logger.conf -n taildir2logger -Dflume.root.logger=INFO,console

3️⃣ Step3:测试1,向文件追加内容时,检测文件变化

4️⃣ Step4:测试2,检测文件夹中的 .txt 文件变化

5️⃣ Step5:taildir记录完文件不会在文件后缀添加 COMPLETED ,而是将记录的位置放在一个json文件中

6️⃣ Step6:格式化该 json 文件,发现一个数组中存储了 json 格式的数据,每个 json 会存储一个唯一标识 inode、读取的位置 pos、以及文件的绝对路径 file

[
	{
		"inode":134337114,
		"pos":7,
		"file":"/root/log.txt"
	},
	{
		"inode":134333732,
		"pos":12,
		"file":"/root/log/file1.txt"
	}
]

7️⃣ Step7:向 /root/log.txt 中追加内容,并再次启动 Flume,Flume 会将读取的位置加载一遍,然后接着读,并将新的读取位置记录在 json 中

面试题:日志采集通常用哪种Source?

答:通常使用 tailDir 类型的 Source,之前我们试过 exec、spoolDir,后来应为应用场景当中分的目录比较多,而且为了每次读取能接着上次继续读所以选择了 tailDir,用 tailDir 的主要原因是 tailDir会在 json 文件中记录的位置和读取的文件,如果以后 agent 挂掉时,只需重新启动,就能接着上一次读取的进度继续读

2.2 Channel 2.2.1 memary channel 常用配置项默认值描述type-将类型配置为 memorycapacity100channel 中能够存放 Event 的最大数量transactionCapacity100channel 每次向 sink 发送 Event 的最大数量

capacity的估算依据:估算 channel 可能缓存的数据量(通常 2w~20w)与 agent 可用 Jvm 内存(通常20MB)。

默认最大内存为20MB,如果一条日志长度为 512B,那么总共能容纳 4w 多条数据(20x1024x2=40960),但通常 capacity 不会设置太满,太满容易内存溢出。

❓ 默认值既然怎么小,那如何增大 Jvm 内存容量?

⭐️可以通过修改 Flume 启动的配置项实现扩容,vim /flume-1.9.0/bin/flume-ng 可以查看 Flume 的启动脚本并修改 JAVA_OPTS 项:

2.2.2 File channel

数据安全性较高的情况下会使用 File 类型的 Channel,避免因程序崩溃而丢失数据。

常用配置项默认值描述type-将类型配置为 filecheckpointDir~/.flume/file-channel/checkpoint检查点文件的绝对路径

程序崩溃后再启动时,会先读取 checkpointDir 配置的文件,Sink 将上次未读取完的 Event 从该文件中继续读取。

2.3 Sink 2.3.1 logger sink

在 INFO 级别记录事件,通常用于测试/调试目的。所以通常配置 Fume 时,先使用 logger 将 Event 输出到控制台,来测试 Source 是否有问题,然后再修改 Sink 类型。

常用配置项默认值描述type-将类型配置为 logger 2.3.2 tailDir2hdfs 常用配置项默认值描述type-将类型配置为 filehdfs.path-HDFS 存放日志的路径hdfs.fileTypeSequenceFile配置文件类型,可配置为 DataStream 或 CompressedStreamhdfs.writeFormatWritable格式化类型,可配置为 Texthdfs.useLocalTimeStampfalse配置是否使用本地时间戳hdfs.rollInterva30配置滚动周期,单位 shdfs.rollSize1024配置文件到达的指定大小hdfs.rollCount10配置向文件写入多少条数据,配置为0表示不依据该项

1️⃣ Step1:添加配置文件 taildir2hdfs.conf

# taildir 读取同目录中的多个日志文件 将数据发送到 logger sink

# Name the components on this agent
taildir2hdfs.sources = r1
taildir2hdfs.sinks = k1
taildir2hdfs.channels = c1

# Describe/configure the source
taildir2hdfs.sources.r1.type = TAILDIR
taildir2hdfs.sources.r1.filegroups = g1 g2
taildir2hdfs.sources.r1.filegroups.g1 = /root/log.txt
taildir2hdfs.sources.r1.filegroups.g2 = /root/log/.*.txt


# Describe the sink
taildir2hdfs.sinks.k1.type = hdfs
taildir2hdfs.sinks.k1.hdfs.path = hdfs://node1:8020/flume/%y-%m-%d-%H-%M
# 将hdfs输出文件类型改为数据流
taildir2hdfs.sinks.k1.hdfs.fileType = DataStream
# 输出格式化改为文本格式
taildir2hdfs.sinks.k1.hdfs.writeFormat = Text
taildir2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
taildir2hdfs.channels.c1.type = memory
taildir2hdfs.channels.c1.capacity = 30000
taildir2hdfs.channels.c1.transactionCapacity = 3000

# Bind the source and sink to the channel
taildir2hdfs.sources.r1.channels = c1
taildir2hdfs.sinks.k1.channel = c1

2️⃣ Step2:一条一条写数据太麻烦,所以写个死循环脚本向文件中输入数据,运行一会就停掉,并查看该文件的大小

[root@node1 ~]# vim printer.sh
[root@node1 ~]# chmod a+x printer.sh
[root@node1 ~]# cat printer.sh
#!/bin/bash
while true
do
        echo '123123123123123123123123123123123123123123123123123123123123' >> log.txt
done
[root@node1 ~]# ll -h
总用量 17M
drwxr-xr-x. 3 root root  65 11月 21 16:51 log
-rw-r--r--. 1 root root 14M 11月 21 23:29 log.txt
-rwxr-xr-x. 1 root root 134 11月 21 23:22 printer.sh

3️⃣ Step3:启动HDFS start-dfs.sh,启动 Flume taildir2hdfs,可以看到以及已经在疯狂发送日志了

flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/taildir2hdfs.conf -n taildir2hdfs -Dflume.root.logger=INFO,console

4️⃣ Step4:查看web页面发现 /flume/ 下有了一个由当前时间命名的文件,进去却发现全是小文件

5️⃣ Step5:flume 一个快的大小为 128MB ,为了能充分利用空间还需要添加配置信息,

# 设置文件的滚动条件 -------------------------------
# 设置滚动周期  单位s
taildir2hdfs.sinks.k1.hdfs.rollInterval = 300
# 设置文件达到指定大小,128MB为134,217,728B,但一般不会放满 
taildir2hdfs.sinks.k1.hdfs.rollSize = 130000000
# 设置文件中写入多少条记录
taildir2hdfs.sinks.k1.hdfs.rollCount = 0

6️⃣ Step6:修改后重新运行 Flume ,并在web端查看变化

2.3.3 tailDir2avro-avro2hdfs

当前需要两台服务器,如果只有一台配置了Flume,可以通过 scp -rq : 将文件发送至其他服务器。

scp -rq /opt/flume-1.9.0/ node2:/opt/

实现如下拓扑结构:

avro sink 常用配置项默认值描述type-将类型配置为 avrohostname-将类型配置为主机名或 IP 地址port-配置端口号 avro source常用配置项默认值描述type-将类型配置为 avrobind-将类型配置为需要监听的主机名或 IP 地址port-配置监听端口号

1️⃣ Step1:node1主机,添加配置文件 tailDir2avro.conf

# taildir 读取同目录中的多个日志文件 将数据发送到 avro sink

# Name the components on this agent
taildir2avro.sources = r1
taildir2avro.sinks = k1
taildir2avro.channels = c1

# Describe/configure the source
taildir2avro.sources.r1.type = TAILDIR
taildir2avro.sources.r1.filegroups = g1 g2
taildir2avro.sources.r1.filegroups.g1 = /root/log.txt
taildir2avro.sources.r1.filegroups.g2 = /root/log/.*.txt

# Describe the sink
taildir2avro.sinks.k1.type = avro
taildir2avro.sinks.k1.hostname = node2
taildir2avro.sinks.k1.port = 12345

# Use a channel which buffers events in memory
taildir2avro.channels.c1.type = memory
taildir2avro.channels.c1.capacity = 30000
taildir2avro.channels.c1.transactionCapacity = 3000

# Bind the source and sink to the channel
taildir2avro.sources.r1.channels = c1
taildir2avro.sinks.k1.channel = c1

2️⃣ Step2:node2 主机,添加配置文件 avro2hdfs.conf

# spooldir读取同目录中的多个日志文件 将数据发送到logger sink

# Name the components on this agent
avro2hdfs.sources = r1
avro2hdfs.sinks = k1
avro2hdfs.channels = c1

# Describe/configure the source
avro2hdfs.sources.r1.type = avro
avro2hdfs.sources.r1.bind = node2
avro2hdfs.sources.r1.port = 12345


# Describe the sink
avro2hdfs.sinks.k1.type = hdfs
avro2hdfs.sinks.k1.hdfs.path = hdfs://node1:8020/flume/%y-%m-%d-%H-%M
avro2hdfs.sinks.k1.hdfs.fileType = DataStream
avro2hdfs.sinks.k1.hdfs.writeFormat = Text
avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true
avro2hdfs.sinks.k1.hdfs.rollInterval = 300
avro2hdfs.sinks.k1.hdfs.rollSize = 130000000
avro2hdfs.sinks.k1.hdfs.rollCount = 0


# Use a channel which buffers events in memory
avro2hdfs.channels.c1.type = memory
avro2hdfs.channels.c1.capacity = 30000
avro2hdfs.channels.c1.transactionCapacity = 3000

# Bind the source and sink to the channel
avro2hdfs.sources.r1.channels = c1
avro2hdfs.sinks.k1.channel = c1

3️⃣ Step3:启动 node2 上的 Flume

flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/avro2hdfs.conf -n avro2hdfs -Dflume.root.logger=INFO,console

4️⃣ Step4:查看node2 的 12345 端口是否被监听,确定已在监听状态

5️⃣ Step5:运行一会 node1 的 printer.sh 脚本,启动 node1 的 Flume

flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/taildir2avro.conf -n taildir2avro -Dflume.root.logger=INFO,console

6️⃣ Step6:查看 node2 控制台输出,node1 以成功向 node2 的12345端口发送数据了

7️⃣ Step7:查看web端是否有数据提交

已经跑通!

3. 写在最后

启动程序的时候,有时候会占用终端,关闭终端就会关闭程序,所以有时我们需要提将程序提交到后台执行,那就需要在命令后面添加一个 & 符号。

如果程序中有内容输出到控制台,而有不想一直在控制台输出,可以在命令前面添加 nohup 命令,将输出的内容输入到文件中,实现程序静默。

 


❤️ END ❤️

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5608213.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存