【大数据之 Flume】入门到放弃

【大数据之 Flume】入门到放弃,第1张

【大数据之 Flume】入门到放弃

文章目录
  • 1 Flume 概述
    • 1.1 Flume 定义
    • 1.2 Flume 基础架构
  • 2 Flume 入门
    • 2.1 Flume 安装部署
      • 2.1.1 安装地址
      • 2.1.2 安装部署
    • 2.2 Flume 入门案例
      • 2.2.1 监控端口数据
      • 2.2.2 实时监控单个追加文件
      • 2.2.3 实时监控目录下多个新文件
      • 2.2.4 实时监控目录下的多个追加文件
  • 3 Flume 进阶
    • 3.1 Flume 事务
    • 3.2 Flume Agent 内部原理
    • 3.3 Flume 拓扑结构
      • 3.3.1 简单串联
      • 3.3.2 复制和多路复用
      • 3.3.3 负载均衡或故障转移
      • 3.3.4 聚合
    • 3.4 Flume 进阶案例
      • 3.4.1 复制和多路复用
      • 3.4.2 负载均衡和故障转移
      • 3.4.3 聚合
    • 3.5 自定义 Interceptor
    • 3.6 自定义 Source
    • 3.7 自定义 Sink

1 Flume 概述 1.1 Flume 定义

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。

Flume 最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入 HDFS 等目标。

1.2 Flume 基础架构


1)Agent
Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的地。主要由 Source,Channel,Sink 三部分组成。

2)Source
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directorynetcattaildir、sequence generator、syslog、http、legacy。

3)Channel
Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入 *** 作和几个Sink 的读取 *** 作。

Flume 自带两种 Channel:Memory Channel 和 File Channel。Memory Channel 是内存中的队列。File Channel 将所有事件写到磁盘。

4)Sink
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。Sink 组件目的地包括 hdfsloggeravro、thrift、ipc、fileHbase、solr、自定义。

5)Event
Flume 数据传输的基本单元。以 Event 的形式将数据从源头送至目的地。Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组。

2 Flume 入门 2.1 Flume 安装部署 2.1.1 安装地址

1)官网地址:http://flume.apache.org/
2)文档地址:http://flume.apache.org/FlumeUserGuide.html
3)下载地址:http://archive.apache.org/dist/flume/

2.1.2 安装部署

需要先部署服务器环境,虚拟机环境搭建参考 【大数据之 Hadoop】集群环境搭建

1)将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的 /opt/software 目录下。
2)解压安装包至 /opt/module 目录下。

[liyibin@hadoop102 software]$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/

3)修改文件夹 apache-flume-1.9.0-bin 名称为 flume-1.9.0。

[liyibin@hadoop102 module]$ mv apache-flume-1.9.0-bin/ flume-1.9.0

4)将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3。

[liyibin@hadoop102 module]$ rm /opt/module/flume-1.9.0/lib/guava-11.0.2.jar 
2.2 Flume 入门案例 2.2.1 监控端口数据

1)案例需求:
使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

2)需求分析

3)实现步骤

(1)安装 netcat 工具。

[liyibin@hadoop102 module]$ sudo yum install -y nc

(2)判断 9999 端口是否被占用。

[liyibin@hadoop102 module]$ sudo netstat -nlp | grep 9999

(3)在 flume-1.9.0 目录下创建 job 文件夹并进入。

[liyibin@hadoop102 flume-1.9.0]$ mkdir job
[liyibin@hadoop102 flume-1.9.0]$ cd job

(4)在 job 目录下创建 Flume Agent 配置文件 flume-netcat-logger.properties。

[liyibin@hadoop102 job]$ vim flume-netcat-logger.properties

(5)在 flume-netcat-logger.properties 添加如下内容。

# 定义 source channle 和 sink,a1 表示 agent 的名称
# c1 表示 channel 的名称
a1.channels = c1
# r1 表示 source 名称
a1.sources = r1
# k1 表示 sink 的名称
a1.sinks = k1

# 配置通道
# a1 的channel 类型
a1.channels.c1.type = memory
# a1 的 channel 总容量
a1.channels.c1.capacity = 1000
# a1 的channel 传输时收集 100 条 event 后再去提交事务
a1.channels.c1.transactionCapacity = 100

# 配置 source
# a1 输入源类型为 netcat
a1.sources.r1.type = netcat
# a1 监听的主机
a1.sources.r1.bind = localhost
# a1 监听的端口
a1.sources.r1.port = 9999
# 连接 source 和 channel,一个 source 可以连接多个 channel
a1.sources.r1.channels = c1

# 配置 sink
# a1 的输出目的地是控制台
a1.sinks.k1.type = logger
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a1.sinks.k1.channel = c1

配置文件解析参考注释,更详细的解释参考官方文档。

(6)开启 flume 监控 9999 端口。

[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.properties -Dflume.root.logger=INFO,console

[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.properties -Dflume.root.logger=INFO,console

参数说明:

  1. –conf/-c:表示配置文件存储在 conf/ 目录
  2. –name/-n:表示给 agent 起名为 a1
  3. –conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-netcat-logger.properties 文件。
  4. -Dflume.root.logger=INFO,console:-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error。

(7)使用 netcat 工具向本机的 9999 端口发送内容。

[liyibin@hadoop102 ~]$ nc localhost 9999
hello
OK
world
OK

(8)在 Flume 监听页面观察接收数据情况。

2.2.2 实时监控单个追加文件

1)案例需求:实时监控 Hive 日志,并上传到 HDFS 中。
2)需求分析:

3)实现步骤
(1)创建 flume-file-hdfs.properties。

[liyibin@hadoop102 logs]$ vim /opt/module/flume-1.9.0/job/flume-file-hdfs.properties

添加以下内容。

# 定义 source channle 和 sink,a2 表示 agent 的名称
# c2 表示 channel 的名称
a2.channels = c2
# r2 表示 source 名称
a2.sources = r2
# k2 表示 sink 的名称
a2.sinks = k2

# 配置通道
# a2 的channel 类型
a2.channels.c2.type = memory
# a2 的 channel 总容量
a2.channels.c2.capacity = 1000
# a2 的channel 传输时收集 100 条 event 后再去提交事务
a2.channels.c2.transactionCapacity = 100

# 配置 source
# a2 输入源类型为 exec
a2.sources.r2.type = exec
# 监控日志
a2.sources.r2.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
# 连接 source 和 channel,一个 source 可以连接多个 channel
a2.sources.r2.channels = c2

# 配置 sink
# a2 的输出目的地是hdfs
a2.sinks.k2.type = hdfs
# hdfs 输出目录
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
# 文件前缀
a2.sinks.k2.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a2.sinks.k2.channel = c2

注意:对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的 key(除非 hdfs.useLocalTimeStamp = true,此方法会使用 TimestampInterceptor 自动添加 timestamp)。

(2)运行 flume

[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/flume-file-hdfs.properties

(3)开启 Hadoop 和 Hive 产生 Hive 日志。

[liyibin@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh 
[liyibin@hadoop102 hadoop-3.1.3]$ sbin/start-yarn.sh 

[liyibin@hadoop102 hive-3.1.2]$ bin/hive

(4)在 HDFS 上查看文件。

30 秒后文件被重命名。

hdfs 上只有新日志产生才会生成文件。

2.2.3 实时监控目录下多个新文件

1)案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS
2)需求分析

3)实现步骤
(1)创建 flume-file-hdfs.properties。

[liyibin@hadoop102 logs]$ vim /opt/module/flume-1.9.0/job/flume-dir-hdfs.properties

添加以下内容。

# 定义 source channle 和 sink,a3 表示 agent 的名称
# c3 表示 channel 的名称
a3.channels = c3
# r3 表示 source 名称
a3.sources = r3
# k3 表示 sink 的名称
a3.sinks = k3

# 配置通道
# a3 的channel 类型
a3.channels.c3.type = memory
# a3 的 channel 总容量
a3.channels.c3.capacity = 1000
# a3 的channel 传输时收集 100 条 event 后再去提交事务
a3.channels.c3.transactionCapacity = 100

# 配置 source
# a3 输入源类型为 spooldir
a3.sources.r3.type = spooldir
# 监控目录
a3.sources.r3.spoolDir = /opt/module/flume-1.9.0/upload
# 定义文件上传完后缀
a1.sources.r3.fileSuffix = .COMPLETED
# 是否有文件头
a1.sources.r3.fileHeader = true
# 忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*.tmp)
# 连接 source 和 channel,一个 source 可以连接多个 channel
a3.sources.r3.channels = c3

# 配置 sink
# a3 的输出目的地是hdfs
a3.sinks.k3.type = hdfs
# hdfs 输出目录
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
# 文件前缀
a3.sinks.k3.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
# 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
# 多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a3.sinks.k3.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a3.sinks.k3.channel = c3

(2)启动监控文件命令

[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/flume-dir-hdfs.properties

(3)向 upload 目录添加文件

[liyibin@hadoop102 upload]$ vim test1.txt
[liyibin@hadoop102 upload]$ vim test2.tmp

(4)查看文件是否有 .COMPLETED 后缀

.tmp 后缀文件不会被上传。

6)查看 HDFS 上的数据

2.2.4 实时监控目录下的多个追加文件

Exec source 适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而 Taildir Source 适合用于监听多个实时追加的文件,并且能够实现断点续传

1)案例需求:使用 Flume 监听整个目录的实时追加文件,并上传到 HDFS
2)需求分析

3)实现步骤
(1)创建 flume-taildir-hdfs.properties。

[liyibin@hadoop102 logs]$ vim /opt/module/flume-1.9.0/job/flume-taildir-hdfs.properties

添加以下内容。

# 定义 source channle 和 sink,a3 表示 agent 的名称
# c3 表示 channel 的名称
a3.channels = c3
# r3 表示 source 名称
a3.sources = r3
# k3 表示 sink 的名称
a3.sinks = k3

# 配置通道
# a3 的channel 类型
a3.channels.c3.type = memory
# a3 的 channel 总容量
a3.channels.c3.capacity = 1000
# a3 的channel 传输时收集 100 条 event 后再去提交事务
a3.channels.c3.transactionCapacity = 100

# 配置 source
# a3 输入源类型为 TAILDIR
a3.sources.r3.type = TAILDIR
# 指定position_file位置
a3.sources.r3.positionFile = /opt/module/flume-1.9.0/tail_dir.json
# 监控的文件目录组,可以指定多个,空格分割
a3.sources.r3.filegroups = f1
a3.sources.r3.filegroups.f1 = /opt/module/flume-1.9.0/files/.*file.*
# 连接 source 和 channel,一个 source 可以连接多个 channel
a3.sources.r3.channels = c3

# 配置 sink
# a3 的输出目的地是hdfs
a3.sinks.k3.type = hdfs
# hdfs 输出目录
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/files/%Y%m%d/%H
# 文件前缀
a3.sinks.k3.hdfs.filePrefix = upload-
# 是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
# 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
# 多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a3.sinks.k3.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a3.sinks.k3.channel = c3

(2)启动监控文件命令

[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/flume-taildir-hdfs.properties

(3)向 files 文件夹的文件添加文件

[liyibin@hadoop102 files]$ echo hello >> file1.txt
[liyibin@hadoop102 files]$ echo world >> file2.txt

(4)查看 HDFS 上的文件

Taildir说明
Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File 中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下:

[{"inode":68976062,"pos":6,"file":"/opt/module/flume-1.9.0/files/file1.txt"},
{"inode":68976061,"pos":6,"file":"/opt/module/flume-1.9.0/files/file2.txt"}]

:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码, *** 作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件

3 Flume 进阶 3.1 Flume 事务


1)put 事务流程

  • doPut 将批数据写入临时缓冲区 putList
  • doCommit 检查 channel 内存队列是否足够合并
  • doRollback channel 内存队列空间不足,回滚数据

2)take 事务流程

  • doTake 将数据取到临时缓冲区 takeList,并将数据发送到 HDFS
  • doCommit 如果数据全部发送成功,则清除临时缓冲区 takeList
  • doRollback 数据如果发送过程出现异常,rollbak 将临时缓冲区 takeList 中的数据归还给内存队列
3.2 Flume Agent 内部原理


重要组件:
1)ChannelSelector
ChannelSelector 的作用是选出 Event 将要发往哪个 Channel 。共有两种类型,Replicating(复制)Multiplexing(多路复用)

Replicating Selector 会将同一个 Event 发往所有的 Channel。Multiplexing Selector 会根据相应的原则将不同的 Event 发往不同的 Channel。

2)SinkProcessor
SinkProcessor 共有三种类型,分别是 DefaultSinkProcessorLoadBalancingSinkProcessorFailoverSinkProcessor
(1)DefaultSinkProcessor 对应的单个 Sink。
(2)LoadBalancingSinkProcessor 对应 Sink Group,可以实现负载均衡的功能。
(3)FailoverSinkProcessor 对应 Sink Group,可以实现错误恢复的功能。

3.3 Flume 拓扑结构 3.3.1 简单串联


这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。

3.3.2 复制和多路复用


Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个 channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地。

3.3.3 负载均衡或故障转移


Flume 支持使用将多个 sink 逻辑上分到一个sink组,sink 组配合不同的 SinkProcessor 可以实现负载均衡和错误恢复的功能。

负载均衡,使用 LoadBalancingSinkProcessor,根据 header 中的信息写到不同的 sink 中。
错误恢复,使用 FailoverSinkProcessor,配置 sink 的优先级,当高优先级 sink 挂掉后,使用其它 sink 接收。

3.3.4 聚合


这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志 flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析。

3.4 Flume 进阶案例 3.4.1 复制和多路复用

1)案例需求
使用 Flume-1 监控单文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责显示到控制台。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 LocalFileSystem。

2)需求分析

3)实现步骤
(1)准备工作
创建 /opt/module/flume-1.9.0/job/group1 和 /opt/module/flume-1.9.0/files/jobgroup1

[liyibin@hadoop102 flume-1.9.0]$ mkdir /opt/module/flume-1.9.0/job/group1
[liyibin@hadoop102 flume-1.9.0]$ mkdir /opt/module/flume-1.9.0/files/jobgroup1/

(2)创建 flume-file-flume.properties
配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-logger 和 flume-flume-dir。

编辑配置文件

[liyibin@hadoop102 group1]$ vim flume-file-flume.properties

添加如下内容

# 定义 source channle 和 sink,a1 表示 agent 的名称
# c1 表示 channel 的名称
a1.channels = c1 c2
# r1 表示 source 名称
a1.sources = r1
# k1 表示 sink 的名称
a1.sinks = k1 k2

# 配置通道
# a1 的channel 类型
a1.channels.c1.type = memory
# a1 的 channel 总容量
a1.channels.c1.capacity = 1000
# a1 的channel 传输时收集 100 条 event 后再去提交事务
a1.channels.c1.transactionCapacity = 100

# a1 的channel 类型
a1.channels.c2.type = memory
# a1 的 channel 总容量
a1.channels.c2.capacity = 1000
# a1 的channel 传输时收集 100 条 event 后再去提交事务
a1.channels.c2.transactionCapacity = 100

# 配置 source
# a1 输入源类型为 exec
a1.sources.r1.type = exec
# 监控日志
a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/files/file3.txt 
# 连接 source 和 channel,一个 source 可以连接多个 channel
a1.sources.r1.channels = c1 c2
# 将数据流复制给所有 channel 默认 replicating
a1.sources.r1.selector.type = replicating

# 配置 sink
a1.sinks.k1.type = avro
# sink 服务器主机
a1.sinks.k1.hostname = hadoop102
# sink 服务器端口
a1.sinks.k1.port = 4141
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a1.sinks.k1.channel = c1

a1.sinks.k2.type = avro
# sink 服务器主机
a1.sinks.k2.hostname = hadoop102
# sink 服务器端口
a1.sinks.k2.port = 4142
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a1.sinks.k2.channel = c2

(3)创建 flume-flume-logger.properties
编辑配置文件

[liyibin@hadoop102 group1]$ vim flume-flume-logger.properties

添加如下内容

# 定义 source channle 和 sink,a2 表示 agent 的名称
# c1 表示 channel 的名称
a2.channels = c1
# r1 表示 source 名称
a2.sources = r1
# k1 表示 sink 的名称
a2.sinks = k1

# 配置通道
# a2 的channel 类型
a2.channels.c1.type = memory
# a2 的 channel 总容量
a2.channels.c1.capacity = 1000
# a2 的channel 传输时收集 100 条 event 后再去提交事务
a2.channels.c1.transactionCapacity = 100

# 配置 source
# a2 输入源类型为 avro
a2.sources.r1.type = avro
# 绑定的地址和端口
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# 配置 sink
# a1 的输出目的地是控制台
a2.sinks.k1.type = logger
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a2.sinks.k1.channel = c1

(4)创建 flume-flume-dir.properties
编辑配置文件

[liyibin@hadoop102 group1]$ vim flume-flume-dir.properties

添加如下内容

# 定义 source channle 和 sink,a3 表示 agent 的名称
# c1 表示 channel 的名称
a3.channels = c1
# r1 表示 source 名称
a3.sources = r1
# k1 表示 sink 的名称
a3.sinks = k1

# 配置通道
# a3 的channel 类型
a3.channels.c1.type = memory
# a3 的 channel 总容量
a3.channels.c1.capacity = 1000
# a3 的channel 传输时收集 100 条 event 后再去提交事务
a3.channels.c1.transactionCapacity = 100

# 配置 source
# a3 输入源类型为 arvo
a3.sources.r1.type = avro
# 绑定的地址和端口
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
a3.sources.r1.channels = c1

# 配置 sink
# a1 的输出目的地是本地文件系统
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/files/jobgroup1
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a3.sinks.k1.channel = c1

(5)执行配置文件
分别启动对应的 flume 进程:flume-flume-dir,flume-flume-logger,flume-file-flume。

[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-logger.properties -Dflume.root.logger=INFO,console
[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-fle-dir.properties 
[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.properties 

(6)追加信息到 /opt/module/flume-1.9.0/files/jobgroup1

[liyibin@hadoop102 files]$ echo hello >> file3.txt

(7)查看日志打印和 /opt/module/flume-1.9.0/files/jobgroup1 下的文件生成

3.4.2 负载均衡和故障转移

1)案例需求
使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用 FailoverSinkProcessor,实现故障转移的功能。Flume2 挂掉后,Flume-3 会接收数据。

2)需求分析

3)实现步骤
(1)准备工作
创建 /opt/module/flume-1.9.0/job/group2

[liyibin@hadoop102 flume-1.9.0]$ mkdir /opt/module/flume-1.9.0/job/group2

(2)创建 flume-netcat-flume.properties
配置 1 个 netcat source,1 个 channel ,2 个 sink,分别传输给 flume-flume-logger1.properties 和 flume-flume-logger2.properties。

编辑配置文件

[liyibin@hadoop102 group2]$ vim flume-netcat-flume.properties

添加如下内容

# 定义 source channle 和 sink,a1 表示 agent 的名称
# c1 表示 channel 的名称
a1.channels = c1
# r1 表示 source 名称
a1.sources = r1
# k1 表示 sink 的名称
a1.sinks = k1 k2
# 配置 sing 组
a1.sinkgroups = g1

# 配置通道
# a1 的channel 类型
a1.channels.c1.type = memory
# a1 的 channel 总容量
a1.channels.c1.capacity = 1000
# a1 的channel 传输时收集 100 条 event 后再去提交事务
a1.channels.c1.transactionCapacity = 100

# 配置 source
# a1 输入源类型为 netcat
a1.sources.r1.type = netcat
# a1 监听的主机
a1.sources.r1.bind = localhost
# a1 监听的端口
a1.sources.r1.port = 9999
# 连接 source 和 channel,一个 source 可以连接多个 channel
a1.sources.r1.channels = c1

# 配置 sink
# a1 的输出目的地是 avro
a1.sinks.k1.type = avro
# sink 服务器主机
a1.sinks.k1.hostname = hadoop102
# sink 服务器端口
a1.sinks.k1.port = 4141
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a1.sinks.k1.channel = c1

# a1 的输出目的地是 avro
a1.sinks.k2.type = avro
# sink 服务器主机
a1.sinks.k2.hostname = hadoop102
# sink 服务器端口
a1.sinks.k2.port = 4142
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a1.sinks.k2.channel = c1

# sinkgroups 的类型为故障恢复
a1.sinkgroups.g1.processor.type = failover
# 配置优先级
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 5
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sinkgroups.g1.sinks = k1 k2

(3)创建 flume-flume-logger1.properties
编辑配置文件

[liyibin@hadoop102 group2]$ vim flume-flume-logger1.properties

添加如下内容

# 定义 source channle 和 sink,a2 表示 agent 的名称
# c1 表示 channel 的名称
a2.channels = c1
# r1 表示 source 名称
a2.sources = r1
# k1 表示 sink 的名称
a2.sinks = k1

# 配置通道
# a2 的channel 类型
a2.channels.c1.type = memory
# a2 的 channel 总容量
a2.channels.c1.capacity = 1000
# a2 的channel 传输时收集 100 条 event 后再去提交事务
a2.channels.c1.transactionCapacity = 100

# 配置 source
# a2 输入源类型为 avro
a2.sources.r1.type = avro
# 绑定的地址和端口
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
a2.sources.r1.channels = c1

# 配置 sink
# a2 的输出目的地是控制台
a2.sinks.k1.type = logger
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a2.sinks.k1.channel = c1

(4)创建 flume-flume-logger2.properties
编辑配置文件

[liyibin@hadoop102 group2]$ vim flume-flume-logger2.properties

添加如下内容

# 定义 source channle 和 sink,a3 表示 agent 的名称
# c1 表示 channel 的名称
a3.channels = c1
# r1 表示 source 名称
a3.sources = r1
# k1 表示 sink 的名称
a3.sinks = k1

# 配置通道
# a3 的channel 类型
a3.channels.c1.type = memory
# a3 的 channel 总容量
a3.channels.c1.capacity = 1000
# a3 的channel 传输时收集 100 条 event 后再去提交事务
a3.channels.c1.transactionCapacity = 100

# 配置 source
# a3 输入源类型为 avro
a3.sources.r1.type = avro
# 绑定的地址和端口
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
a3.sources.r1.channels = c1

# 配置 sink
# a3 的输出目的地是控制台
a3.sinks.k1.type = logger
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a3.sinks.k1.channel = c1

(5)执行配置文件
分别启动对应的 flume 进程:flume-flume-logger1,flume-flume-logger2,flume-netcat-flume。

[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-logger1.properties -Dflume.root.logger=INFO,console
[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-fle-logger2.properties -Dflume.root.logger=INFO,console
[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.properties 

(6)使用 netcat 工具向本机的 9999 端口发送内容

[liyibin@hadoop102 flume-1.9.0]$ nc localhost 9999

(7)查看 Flume2 及 Flume3 的控制台打印日志

Flume2:
Flume3: 无打印

(8)将 Flume2 kill,观察 Flume3 的控制台打印情况。

Flume2 已经 kill
Flume3:

注:使用 jps -ml 查看 Flume 进程。

3.4.3 聚合

1)案例需求:
hadoop102 上的 Flume-1 监控文件/opt/module/flume-1.9.0/files/jobgroup3/group.log,hadoop103 上的 Flume-2 监控某一个端口的数据流,Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台。

2)需求分析

3)实现步骤
(1)准备工作
创建 /opt/module/flume-1.9.0/job/group3

[liyibin@hadoop102 flume-1.9.0]$ mkdir /opt/module/flume-1.9.0/job/group3

(2)创建 flume-file-flume.properties
配置 Source 用于监控 group.log 文件,配置 Sink 输出数据到下一级 Flume。

编辑配置文件

[liyibin@hadoop102 group2]$ vim flume-file-flume.properties

添加如下内容

# 定义 source channle 和 sink,a1 表示 agent 的名称
# c1 表示 channel 的名称
a1.channels = c1
# r1 表示 source 名称
a1.sources = r1
# k1 表示 sink 的名称
a1.sinks = k1

# 配置通道
# a1 的channel 类型
a1.channels.c1.type = memory
# a1 的 channel 总容量
a1.channels.c1.capacity = 1000
# a1 的channel 传输时收集 100 条 event 后再去提交事务
a1.channels.c1.transactionCapacity = 100

# 配置 source
# a1 输入源类型为 exec
a1.sources.r1.type = exec
# 监控日志
a1.sources.r1.command = tail -F /opt/module/flume-1.9.0/files/jobgroup3/group.log
# 连接 source 和 channel,一个 source 可以连接多个 channel
a1.sources.r1.channels = c1

# 配置 sink
a1.sinks.k1.type = avro
# sink 服务器主机
a1.sinks.k1.hostname = hadoop104
# sink 服务器端口
a1.sinks.k1.port = 4141
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a1.sinks.k1.channel = c1

(3)创建 flume-netcat-flume.properties
编辑配置文件

[liyibin@hadoop102 group3]$ vim flume-netcat-flume.properties

添加如下内容

# 定义 source channle 和 sink,a2 表示 agent 的名称
# c1 表示 channel 的名称
a2.channels = c1
# r1 表示 source 名称
a2.sources = r1
# k1 表示 sink 的名称
a2.sinks = k1

# 配置通道
# a2 的channel 类型
a2.channels.c1.type = memory
# a2 的 channel 总容量
a2.channels.c1.capacity = 1000
# a2 的channel 传输时收集 100 条 event 后再去提交事务
a2.channels.c1.transactionCapacity = 100

# 配置 source
# a2 输入源类型为 netcat
a2.sources.r1.type = netcat
# 监控主机和端口
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 9999
# 连接 source 和 channel,一个 source 可以连接多个 channel
a2.sources.r1.channels = c1

# 配置 sink
a2.sinks.k1.type = avro
# sink 服务器主机
a2.sinks.k1.hostname = hadoop104
# sink 服务器端口
a2.sinks.k1.port = 4141
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a2.sinks.k1.channel = c1

(4)创建 flume-flume-logger.properties
编辑配置文件

[liyibin@hadoop102 group3]$ vim flume-flume-logger.properties

添加如下内容

# 定义 source channle 和 sink,a3 表示 agent 的名称
# c1 表示 channel 的名称
a3.channels = c1
# r1 表示 source 名称
a3.sources = r1
# k1 表示 sink 的名称
a3.sinks = k1

# 配置通道
# a3 的channel 类型
a3.channels.c1.type = memory
# a3 的 channel 总容量
a3.channels.c1.capacity = 1000
# a3 的channel 传输时收集 100 条 event 后再去提交事务
a3.channels.c1.transactionCapacity = 100

# 配置 source
# a3 输入源类型为 avro
a3.sources.r1.type = avro
# 绑定主机和端口
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# 连接 source 和 channel,一个 source 可以连接多个 channel
a3.sources.r1.channels = c1

# 配置 sink
a3.sinks.k1.type = logger
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a3.sinks.k1.channel = c1

(5)分发 flume-1.9.0

[liyibin@hadoop102 module]$ xsync flume-1.9.0/

(5)执行配置文件
分别启动对应的 flume 进程:hadoop104 启动 flume-flume-logger,hadoop102 flume-file-flume,hadoop103 启动 flume-netcat-flume。

[liyibin@hadoop104 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume-flume-logger.properties -Dflume.root.logger=INFO,console
[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume-file-flume.properties
[liyibin@hadoop103 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-netcat-flume.properties 

(6)在 hadoop103 上向/opt/module 目录下的 group.log 追加内容

[liyibin@hadoop102 jobgroup3]$ echo hello >> group.log

(7)在 hadoop103 上向 9999 端口发送数据

[liyibin@hadoop103 ~]$ nc localhost 9999
world
OK

(8)检查 hadoop104 上数据

3.5 自定义 Interceptor

1)案例需求
使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

2)需求分析
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing(多路复用) 结构,Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel 中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。

在该案例中,我们以端口数据模拟日志,以是否包含 ”flume” 模拟不同类型的日志,我们需要自定义 interceptor 区分数据中是否包含 ”flume”,将其分别发往不同的分析系统(Channel)。

3)实现步骤
(1)创建一个 maven 项目,导入以下依赖


    org.apache.flume
    flume-ng-core
    1.9.0

(2)定义 TypeInterceptor 类实现 Interceptor 接口

public class TypeInterceptor implements Interceptor {

    @Override
    public void initialize() {
    }

    
    @Override
    public Event intercept(Event event) {
        // 事件头
        Map headers = event.getHeaders();
        // 事件的 body 数据
        String body = new String(event.getBody());

        if (body.contains("flume")) {
            headers.put("type", "first");
        } else {
            headers.put("type", "second");
        }

        return event;
    }

    @Override
    public List intercept(List list) {
        for (Event event : list) {
            Map headers = event.getHeaders();
            String body = new String(event.getBody());
            if (body.contains("flume")) {
                headers.put("type", "first");
            } else {
                headers.put("type", "second");
            }
        }
        return list;
    }

    @Override
    public void close() {
    }

    
    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

(3)编辑 flume 文件
为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor。

编写 flume-netcta-flume.properties

# 定义 source channle 和 sink,a1 表示 agent 的名称
# c1 表示 channel 的名称
a1.channels = c1 c2
# r1 表示 source 名称
a1.sources = r1
# k1 表示 sink 的名称
a1.sinks = k1 k2

# 配置通道
# a1 的channel 类型
a1.channels.c1.type = memory
# a1 的 channel 总容量
a1.channels.c1.capacity = 1000
# a1 的channel 传输时收集 100 条 event 后再去提交事务
a1.channels.c1.transactionCapacity = 100

# a1 的channel 类型
a1.channels.c2.type = memory
# a1 的 channel 总容量
a1.channels.c2.capacity = 1000
# a1 的channel 传输时收集 100 条 event 后再去提交事务
a1.channels.c2.transactionCapacity = 100

# 配置 source
# a1 输入源类型为 netcat
a1.sources.r1.type = netcat
# a1 监听的主机
a1.sources.r1.bind = localhost
# a1 监听的端口
a1.sources.r1.port = 9999
# 连接 source 和 channel,一个 source 可以连接多个 channel
a1.sources.r1.channels = c1 c2
# 拦截器配置
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.liyibin.flume.interceptor.TypeInterceptor$Builder
# 配置 Channel selector,类型多路复用
a1.sources.r1.selector.type = multiplexing
# 事件头部中的 key
a1.sources.r1.selector.header = type
# 事件头部中的 值
a1.sources.r1.selector.mapping.first = c1
a1.sources.r1.selector.mapping.second = c2

# 配置 sink
# a1 的输出目的地是 avro
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a1.sinks.k1.channel = c1

# a1 的输出目的地是 avro
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4141
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a1.sinks.k2.channel = c2

为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink。

flume-flume-logger`.properties

# 定义 source channle 和 sink,a2 表示 agent 的名称
# c1 表示 channel 的名称
a2.channels = c1
# r1 表示 source 名称
a2.sources = r1
# k1 表示 sink 的名称
a2.sinks = k1

# 配置通道
# a2 的channel 类型
a2.channels.c1.type = memory
# a2 的 channel 总容量
a2.channels.c1.capacity = 1000
# a2 的channel 传输时收集 100 条 event 后再去提交事务
a2.channels.c1.transactionCapacity = 100

# 配置 source
# a2 输入源类型为 avro
a2.sources.r1.type = avro
# 绑定的地址和端口
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 4141
a2.sources.r1.channels = c1

# 配置 sink
# a2 的输出目的地是控制台
a2.sinks.k1.type = logger
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a2.sinks.k1.channel = c1

为 hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink。

flume-flume-logger2.properties

# 定义 source channle 和 sink,a3 表示 agent 的名称
# c1 表示 channel 的名称
a3.channels = c1
# r1 表示 source 名称
a3.sources = r1
# k1 表示 sink 的名称
a3.sinks = k1

# 配置通道
# a3 的channel 类型
a3.channels.c1.type = memory
# a3 的 channel 总容量
a3.channels.c1.capacity = 1000
# a3 的channel 传输时收集 100 条 event 后再去提交事务
a3.channels.c1.transactionCapacity = 100

# 配置 source
# a3 输入源类型为 avro
a3.sources.r1.type = avro
# 绑定的地址和端口
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
a3.sources.r1.channels = c1

# 配置 sink
# a3 的输出目的地是控制台
a3.sinks.k1.type = logger
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a3.sinks.k1.channel = c1

(4)分别在 hadoop103,hadoop104, hadoop102 上启动 flume 进程,注意先后顺序。

[liyibin@hadoop103 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group4/flume-flume-logger1.properties -Dflume.root.logger=INFO,console

[liyibin@hadoop104 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group4/flume-flume-logger2.properties -Dflume.root.logger=INFO,console

[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume-netcat-flume.properties

(5)在 hadoop102 使用 netcat 向 localhost:44444 发送字母和数字。 (6)观察 hadoop103 和 hadoop104 打印的日志。

[liyibin@hadoop102 flume-1.9.0]$ nc localhost 9999
hello
OK
flume
OK

(6)观察 hadoop103 和 hadoop104 打印的日志。

hadoop103:

hadoop104:

3.6 自定义 Source

1)介绍
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候我们需要根据实际需求自定义某些 source。

官方也提供了自定义 source 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#source

自定义 Source 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。

2)需求
使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。

3)分析

4)编写 MySource

public class MySource extends AbstractSource implements Configurable, PollableSource {

    
    private Long delay;
    private String prefix;
    
    @Override
    public void configure(Context context) {
        prefix = context.getString("pre", "pre-");
        delay = context.getLong("suf", 1000L);
    }

    @Override
    public Status process() throws EventDeliveryException {
        try {
            // 事件
            Event event = new SimpleEvent();
            // 事件头
            Map headers = new HashMap<>();

            for (int i = 0; i < 5; i++) {
                // 给事件设置头信息
                event.setHeaders(headers);
                event.setBody((prefix + i).getBytes());
                // 将事件写入 channel
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            }
        } catch (Exception e) {
            // 异常
            return Status.BACKOFF;
        }
        
        // 正常
        return Status.READY;
    }

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
}

5)测试
(1)打包
将写好的代码打包,并放到 flume 的 lib 目录下。

(2)配置文件

# 定义 source channle 和 sink,a1 表示 agent 的名称
# c1 表示 channel 的名称
a1.channels = c1
# r1 表示 source 名称
a1.sources = r1
# k1 表示 sink 的名称
a1.sinks = k1

# 配置通道
# a1 的channel 类型
a1.channels.c1.type = memory
# a1 的 channel 总容量
a1.channels.c1.capacity = 1000
# a1 的channel 传输时收集 100 条 event 后再去提交事务
a1.channels.c1.transactionCapacity = 100

# 配置 source
# a1 输入源类型为自定义 source 的类路径
a1.sources.r1.type = com.liyibin.flume.source.MySource
a1.sources.r1.pre = flume-
a1.sources.r1.delay = 1500
# 连接 source 和 channel,一个 source 可以连接多个 channel
a1.sources.r1.channels = c1

# 配置 sink
# a1 的输出目的地是控制台
a1.sinks.k1.type = logger
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a1.sinks.k1.channel = c1

(3)开启任务

[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/mysource.properties -Dflume.root.logger=INFO,console

(4)查看打印

3.7 自定义 Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候我们就需要根据实际需求自定义某些 Sink。

自定义 sink 的接口:https://flume.apache.org/FlumeDeveloperGuide.html#sink

Sink 需要继承 AbstractSink 类并实现 Configurable 接口。实现相应方法:

  1. configure(Context context)//初始化 context(读取配置文件内容)。
  2. process()//从 Channel 读取获取数据(event),这个方法将被循环调用。

使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。

2)需求
使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。

3)分析
(1)编码

  1. configure():读取任务配置文件中的配置信息。
  2. process():从Channel中取数据,添加前后缀,打印日志。

(2)打包到集群,编写配置文件

4)编写 Sink

public class MySink extends AbstractSink implements Configurable {

    private static final Logger log = LoggerFactory.getLogger(MySink.class);

    
    private String prefix;
    private String suffix;

    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix", "pre-");
        suffix = context.getString("suffix");
    }

    @Override
    public Status process() throws EventDeliveryException {
        // 声明状态返回值信息
        Status status;

        // 获取当前 Sink 绑定的 Channel
        Channel channel = getChannel();

        // 获取事务
        Transaction tx = channel.getTransaction();
        tx.begin();

        try {
            // 获取事件
            Event event;
            do {
                event = channel.take();
            } while (event == null);

            // 处理事件
            log.info(prefix + new String(event.getBody()) + suffix);

            // 提交事务
            tx.commit();

            status = Status.READY;
        } catch (Exception e) {
            tx.rollback();
            status = Status.BACKOFF;
        } finally {
            // 关闭事务
            tx.close();
        }

        return status;
    }
}

5)测试
(1)打包
将写好的代码打包,并放到 flume 的 lib 目录下。

(2)flume 配置文件

# 定义 source channle 和 sink,a1 表示 agent 的名称
# c1 表示 channel 的名称
a1.channels = c1
# r1 表示 source 名称
a1.sources = r1
# k1 表示 sink 的名称
a1.sinks = k1

# 配置通道
# a1 的channel 类型
a1.channels.c1.type = memory
# a1 的 channel 总容量
a1.channels.c1.capacity = 1000
# a1 的channel 传输时收集 100 条 event 后再去提交事务
a1.channels.c1.transactionCapacity = 100

# 配置 source
# a1 输入源类型为 netcat
a1.sources.r1.type = netcat
# a1 监听的主机
a1.sources.r1.bind = localhost
# a1 监听的端口
a1.sources.r1.port = 9999
# 连接 source 和 channel,一个 source 可以连接多个 channel
a1.sources.r1.channels = c1

# 配置 sink
# a1 的输出目的地是控制台
a1.sinks.k1.type = com.liyibin.flume.sink.MySink
a1.sinks.k1.prefix = hello-
a1.sinks.k1.suffix = -world
# 连接 sink 和 channel,一个 sink 只能连接一个 channel
a1.sinks.k1.channel = c1

(4)开启任务

[liyibin@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/mysink.properties -Dflume.root.logger=INFO,console

[liyibin@hadoop102 ~]$ nc localhost 9999
moring
OK
left
OK

(5)结果展示

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5480039.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存