Agent(代理)中的组件包括 Source、Channel、Sink。
1.1 SourceSource 组件可以处理各种类型、各种格式的日志数据。
Flume 中常用的 source:
- avro
- exec
- netcat
- spooling directory
- taildir
Channel 是位于 Source 和 Sink 之间用于存放 Event 的缓冲区。
Channel 是线程安全的,可以同时处理几个 Source 的写入 *** 作和几个 Sink 的读取 *** 作。
Flume 中常用的 Channel:
- Memary Channel
- File Channel
- Kafka Channel
Sink 将 Channel 中的 Event 批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Flume 中常用的 sink:
- logger
- hdfs
- avro
- Hbase
Agent 的组件可以自定义搭配,可以使用不同的 Source、Sink、Channel。
下面罗列几种简单的搭配方式。
exec2logger 表示 exec to logger,2 代表 to,是将 two 音译为 to。常见的有 p2p、log4j 等,都是拟声为数字。
可以先启动 Hadoop 集群:start-dfs.sh。
复杂的 Flume 启动命令为:
flume-ng agent --conf--conf-file --name -Dflume.root.logger=INFO,console
以上命令可简写为:
flume-ng agent -c2.1 Source 2.1.1 exec2logger-f -n -Dflume.root.logger=INFO,console
1️⃣ Step1:在 flume 目录下自定义一个目录(我的是 agents 目录),添加配置文件 exec2logger.conf
vim exec2logger.conf
# exec 读取日志文件 将数据发送到 logger sink # Name the components on this agent exec2logger.sources = r1 exec2logger.sinks = k1 exec2logger.channels = c1 # Describe/configure the source exec2logger.sources.r1.type = exec exec2logger.sources.r1.command = tail -F /root/log.txt # Describe the sink exec2logger.sinks.k1.type = logger # Use a channel which buffers events in memory exec2logger.channels.c1.type = memory exec2logger.channels.c1.capacity = 1000 exec2logger.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel exec2logger.sources.r1.channels = c1 exec2logger.sinks.k1.channel = c1
2️⃣ Step2:启动 Flume exec2logger
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/exec2logger.conf -n exec2logger -Dflume.root.logger=INFO,console
3️⃣ Step3:测试,并查看控制台输出(其实不用touch创建该文件,>> 是进行流的重定向,如果文件不存在就直接新建了)
2.1.2 spoolDir2logger1️⃣ Step1:添加配置文件 spooldir2logger.conf
# spooldir 读取同目录中的多个日志文件 将数据发送到 logger sink # Name the components on this agent spooldir2logger.sources = r1 spooldir2logger.sinks = k1 spooldir2logger.channels = c1 # Describe/configure the source spooldir2logger.sources.r1.type = spooldir spooldir2logger.sources.r1.spoolDir = /root/log # Describe the sink spooldir2logger.sinks.k1.type = logger # Use a channel which buffers events in memory spooldir2logger.channels.c1.type = memory spooldir2logger.channels.c1.capacity = 1000 spooldir2logger.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel spooldir2logger.sources.r1.channels = c1 spooldir2logger.sinks.k1.channel = c1
2️⃣ Step2:启动 Flume spooldir2logger
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/spooldir2logger.conf -n spooldir2logger -Dflume.root.logger=INFO,console
3️⃣ Step3:测试,当文件内的内容输出完成后,会将该文件后缀标记为 COMPLETED 表示已完成的文件。
4️⃣ Step4:如果我们向完成后的文件输入内容,就无法再获取文件变化了。
2.1.3 tailDir2logger1️⃣ Step1:添加配置文件 taildir2logger.conf
# tailDir 读取同目录中的多个日志文件 将数据发送到 logger sink # Name the components on this agent taildir2logger.sources = r1 taildir2logger.sinks = k1 taildir2logger.channels = c1 # Describe/configure the source taildir2logger.sources.r1.type = TAILDIR taildir2logger.sources.r1.filegroups = g1 g2 taildir2logger.sources.r1.filegroups.g1 = /root/log.txt taildir2logger.sources.r1.filegroups.g2 = /root/log/.*.txt # Describe the sink taildir2logger.sinks.k1.type = logger # Use a channel which buffers events in memory taildir2logger.channels.c1.type = memory taildir2logger.channels.c1.capacity = 1000 taildir2logger.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel taildir2logger.sources.r1.channels = c1 taildir2logger.sinks.k1.channel = c1
2️⃣ Step2:启动 Flume taildir2logger
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/taildir2logger.conf -n taildir2logger -Dflume.root.logger=INFO,console
3️⃣ Step3:测试1,向文件追加内容时,检测文件变化
4️⃣ Step4:测试2,检测文件夹中的 .txt 文件变化
5️⃣ Step5:taildir记录完文件不会在文件后缀添加 COMPLETED ,而是将记录的位置放在一个json文件中
6️⃣ Step6:格式化该 json 文件,发现一个数组中存储了 json 格式的数据,每个 json 会存储一个唯一标识 inode、读取的位置 pos、以及文件的绝对路径 file
[ { "inode":134337114, "pos":7, "file":"/root/log.txt" }, { "inode":134333732, "pos":12, "file":"/root/log/file1.txt" } ]
7️⃣ Step7:向 /root/log.txt 中追加内容,并再次启动 Flume,Flume 会将读取的位置加载一遍,然后接着读,并将新的读取位置记录在 json 中
2.2 Channel 2.2.1 memary channel面试题:日志采集通常用哪种Source?
答:通常使用 tailDir 类型的 Source,之前我们试过 exec、spoolDir,后来应为应用场景当中分的目录比较多,而且为了每次读取能接着上次继续读所以选择了 tailDir,用 tailDir 的主要原因是 tailDir会在 json 文件中记录的位置和读取的文件,如果以后 agent 挂掉时,只需重新启动,就能接着上一次读取的进度继续读
capacity的估算依据:估算 channel 可能缓存的数据量(通常 2w~20w)与 agent 可用 Jvm 内存(通常20MB)。
默认最大内存为20MB,如果一条日志长度为 512B,那么总共能容纳 4w 多条数据(20x1024x2=40960),但通常 capacity 不会设置太满,太满容易内存溢出。
❓ 默认值既然怎么小,那如何增大 Jvm 内存容量?
⭐️可以通过修改 Flume 启动的配置项实现扩容,vim /flume-1.9.0/bin/flume-ng 可以查看 Flume 的启动脚本并修改 JAVA_OPTS 项:
2.2.2 File channel数据安全性较高的情况下会使用 File 类型的 Channel,避免因程序崩溃而丢失数据。
程序崩溃后再启动时,会先读取 checkpointDir 配置的文件,Sink 将上次未读取完的 Event 从该文件中继续读取。
2.3 Sink 2.3.1 logger sink在 INFO 级别记录事件,通常用于测试/调试目的。所以通常配置 Fume 时,先使用 logger 将 Event 输出到控制台,来测试 Source 是否有问题,然后再修改 Sink 类型。
1️⃣ Step1:添加配置文件 taildir2hdfs.conf
# taildir 读取同目录中的多个日志文件 将数据发送到 logger sink # Name the components on this agent taildir2hdfs.sources = r1 taildir2hdfs.sinks = k1 taildir2hdfs.channels = c1 # Describe/configure the source taildir2hdfs.sources.r1.type = TAILDIR taildir2hdfs.sources.r1.filegroups = g1 g2 taildir2hdfs.sources.r1.filegroups.g1 = /root/log.txt taildir2hdfs.sources.r1.filegroups.g2 = /root/log/.*.txt # Describe the sink taildir2hdfs.sinks.k1.type = hdfs taildir2hdfs.sinks.k1.hdfs.path = hdfs://node1:8020/flume/%y-%m-%d-%H-%M # 将hdfs输出文件类型改为数据流 taildir2hdfs.sinks.k1.hdfs.fileType = DataStream # 输出格式化改为文本格式 taildir2hdfs.sinks.k1.hdfs.writeFormat = Text taildir2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory taildir2hdfs.channels.c1.type = memory taildir2hdfs.channels.c1.capacity = 30000 taildir2hdfs.channels.c1.transactionCapacity = 3000 # Bind the source and sink to the channel taildir2hdfs.sources.r1.channels = c1 taildir2hdfs.sinks.k1.channel = c1
2️⃣ Step2:一条一条写数据太麻烦,所以写个死循环脚本向文件中输入数据,运行一会就停掉,并查看该文件的大小
[root@node1 ~]# vim printer.sh [root@node1 ~]# chmod a+x printer.sh [root@node1 ~]# cat printer.sh #!/bin/bash while true do echo '123123123123123123123123123123123123123123123123123123123123' >> log.txt done
[root@node1 ~]# ll -h 总用量 17M drwxr-xr-x. 3 root root 65 11月 21 16:51 log -rw-r--r--. 1 root root 14M 11月 21 23:29 log.txt -rwxr-xr-x. 1 root root 134 11月 21 23:22 printer.sh
3️⃣ Step3:启动HDFS start-dfs.sh,启动 Flume taildir2hdfs,可以看到以及已经在疯狂发送日志了
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/taildir2hdfs.conf -n taildir2hdfs -Dflume.root.logger=INFO,console
4️⃣ Step4:查看web页面发现 /flume/ 下有了一个由当前时间命名的文件,进去却发现全是小文件
5️⃣ Step5:flume 一个快的大小为 128MB ,为了能充分利用空间还需要添加配置信息,
# 设置文件的滚动条件 ------------------------------- # 设置滚动周期 单位s taildir2hdfs.sinks.k1.hdfs.rollInterval = 300 # 设置文件达到指定大小,128MB为134,217,728B,但一般不会放满 taildir2hdfs.sinks.k1.hdfs.rollSize = 130000000 # 设置文件中写入多少条记录 taildir2hdfs.sinks.k1.hdfs.rollCount = 0
6️⃣ Step6:修改后重新运行 Flume ,并在web端查看变化
2.3.3 tailDir2avro-avro2hdfs当前需要两台服务器,如果只有一台配置了Flume,可以通过 scp -rq
scp -rq /opt/flume-1.9.0/ node2:/opt/
实现如下拓扑结构:
1️⃣ Step1:node1主机,添加配置文件 tailDir2avro.conf
# taildir 读取同目录中的多个日志文件 将数据发送到 avro sink # Name the components on this agent taildir2avro.sources = r1 taildir2avro.sinks = k1 taildir2avro.channels = c1 # Describe/configure the source taildir2avro.sources.r1.type = TAILDIR taildir2avro.sources.r1.filegroups = g1 g2 taildir2avro.sources.r1.filegroups.g1 = /root/log.txt taildir2avro.sources.r1.filegroups.g2 = /root/log/.*.txt # Describe the sink taildir2avro.sinks.k1.type = avro taildir2avro.sinks.k1.hostname = node2 taildir2avro.sinks.k1.port = 12345 # Use a channel which buffers events in memory taildir2avro.channels.c1.type = memory taildir2avro.channels.c1.capacity = 30000 taildir2avro.channels.c1.transactionCapacity = 3000 # Bind the source and sink to the channel taildir2avro.sources.r1.channels = c1 taildir2avro.sinks.k1.channel = c1
2️⃣ Step2:node2 主机,添加配置文件 avro2hdfs.conf
# spooldir读取同目录中的多个日志文件 将数据发送到logger sink # Name the components on this agent avro2hdfs.sources = r1 avro2hdfs.sinks = k1 avro2hdfs.channels = c1 # Describe/configure the source avro2hdfs.sources.r1.type = avro avro2hdfs.sources.r1.bind = node2 avro2hdfs.sources.r1.port = 12345 # Describe the sink avro2hdfs.sinks.k1.type = hdfs avro2hdfs.sinks.k1.hdfs.path = hdfs://node1:8020/flume/%y-%m-%d-%H-%M avro2hdfs.sinks.k1.hdfs.fileType = DataStream avro2hdfs.sinks.k1.hdfs.writeFormat = Text avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true avro2hdfs.sinks.k1.hdfs.rollInterval = 300 avro2hdfs.sinks.k1.hdfs.rollSize = 130000000 avro2hdfs.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory avro2hdfs.channels.c1.type = memory avro2hdfs.channels.c1.capacity = 30000 avro2hdfs.channels.c1.transactionCapacity = 3000 # Bind the source and sink to the channel avro2hdfs.sources.r1.channels = c1 avro2hdfs.sinks.k1.channel = c1
3️⃣ Step3:启动 node2 上的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/avro2hdfs.conf -n avro2hdfs -Dflume.root.logger=INFO,console
4️⃣ Step4:查看node2 的 12345 端口是否被监听,确定已在监听状态
5️⃣ Step5:运行一会 node1 的 printer.sh 脚本,启动 node1 的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/taildir2avro.conf -n taildir2avro -Dflume.root.logger=INFO,console
6️⃣ Step6:查看 node2 控制台输出,node1 以成功向 node2 的12345端口发送数据了
7️⃣ Step7:查看web端是否有数据提交
已经跑通!
3. 写在最后启动程序的时候,有时候会占用终端,关闭终端就会关闭程序,所以有时我们需要提将程序提交到后台执行,那就需要在命令后面添加一个 & 符号。
如果程序中有内容输出到控制台,而有不想一直在控制台输出,可以在命令前面添加 nohup 命令,将输出的内容输入到文件中,实现程序静默。
❤️ END ❤️
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)