Flume框架及常见source组件类型

Flume框架及常见source组件类型,第1张

Flume框架及常见source组件类型

Flume概览

Flume是一个分布式的高可用的消费组件。通过修改配置文件,可以启动不同的agent处理不同来源的数据。

架构




配置步骤

  1. 定义流

要在单个代理中定义流,需要通过通道连接源和接收器,需要列出代理的源、接收器和通道,然后将源和接收器指向一个通道。一个source实例可以指定多个channel,而一个sink实例只能指定一个channel。格式如下:

#列出代理的源、接收器和通道(别名)
.sources  =   
.sinks  =   
.channels  =    
#设置源
 的 通道。 sources..channels =    ... 
# 设置接收器的通道
.sinks..channel  =  


  1. 配置单个组件
    定义好流后,需要设置每个source、sink和channel的属性。
# 源属性
.sources..  =   

# 通道属性
.channel..  =   

# 接收器属性
。 sources..  =  
  1. 在代理中添加多个流
# 列出代理的源、接收器和通道
.sources  =    
.sinks  =    
.channels  =   

将源和接收器链接到相应的通道(用于接收器)的通道(用于接收器)以设置两个不同的流。
例如:一个从外部 avro 客户端到外部 HDFS,另一个从尾部的输出到 avro 接收器

# 列出代理中的源、接收器和通道
agent_foo.sources = avro-AppSrv-source1 
                    exec-tail- source2
agent_foo.sinks =   hdfs-Cluster1-sink1 
                    avro-forward-sink2 
agent_foo.channels = mem-channel-1 
                    file-channel-2 

# flow #1 配置
agent_foo.sources.avro-AppSrv-source1.channels  =  mem-channel-1 
agent_foo.sinks.hdfs-Cluster1-sink1.channel  =  mem-channel-1 

# flow #2 配置
agent_foo.sources .exec-tail-source2.channels  =  file-channel-2 
agent_foo.sinks.avro-forward-sink2.channel  =  file-channel-2
Source类型

Avro Source:
监听AVRO端口来接受来自外部AVRO客户端的事件流。利用Avro Source可以实现多级流动、扇出流、扇入流等效果。另外也可以接受通过flume提供的Avro客户端发送的日志信息。

avro Source参数
Type: 组件类型
avro Bind: 侦听的主机名或IP地址
Port: 绑定的端口

案例:配置文件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#配置source
a1.sources.r1.type=avro
a1.sources.r1.bind=192.168.86.13
a1.sources.r1.port=44444
#描述sink
a1.sinks.k1.type=logger
#描述内存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#为channels绑定sources和sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动flume
/home/software/flume/bin/flume-ng agent -c /home/software/flume/conf -f /home/software/flume/conf/example.conf -n a1 -Dflume.root.logger=DRBUG,console

参数说明: -n 指定agent名称(与配置文件中代理的名字相同)
-c 指定flume中配置文件的目录
-f 指定配置文件
-Dflume.root.logger=DEBUG,console 设置日志等级

通过flume提供的avro客户端向指定机器指定端口发送日志信息:flume-ng avro-client -c ~/home/software/flume/conf -H 192.168.123.102 -p 6666 -F 666.txt
运行结果如下:

Exec Source
Exec 源在启动时运行给定的 Unix 命令,并期望该进程在标准输出上连续生成数据(stderr 被简单地丢弃,除非属性 logStdErr 设置为 true)。如果进程因任何原因退出,源也会退出并且不会产生更多数据)

Type:组件类型
Command:要执行的命令

Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F/var/log/secure a1.sources.r1.channels = c1

JMS Source
JMS 源从 JMS 目标(例如队列或主题)读取消息。作为一个 JMS 应用程序,它应该可以与任何 JMS 提供程序一起使用,但仅通过 ActiveMQ 进行了测试。JMS 源提供可配置的批处理大小、消息选择器、用户/传递和消息到flume 事件转换器。

参数说明
Type:类型
initianContextFactory:初始上下文工厂
connectionFactory:连接工厂
prividerURL:提供者网址
destinationName:目的地名称
destinationType:目的地类型

Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1 .connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = TCP:// MQSERVER:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE

Spooling Directory Source
Spooling Directory Source监测配置的目录下新增的文件,并将文件中的数据读取出来。其中,Spool Source有2个注意地方,第一个是拷贝到spool目录下的文件不可以再打开编辑,第二个是spool目录下不可包含相应的子目录。这个主要用途作为对日志的准实时监控。

参数说明
Type:类型
spoolDir:读取文件的目录

Example for agent named a1:
a1.channels = ch-1 a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var /log/apache/flumeSpool
a1.sources.src-1.fileHeader = true

案例:配置文件
a1.sources=s1
a1.sinks=k1
a1.channels=c1
#配置源
a1.sources.s1.type=spooldir
a1.sources.s1.spoolDir = /home/software/hadoop/logs
a1.sources.s1.fileHeader-true
a1.sources.s1.channels=c1

a1.sinks.k1.type=logger
a1.sinks.k1.channel =c1
a1.channels.c1.type=memory
启动flume后执行文件

运行结果

Kafka Source

参数说明 Type:类型
Kafka.bootstrap.servers:kafka集群中服务器列表
kafka,consumer.group.id:消费者群体的唯一标识 kafka.topics:kafka主题
kafka.topics.regex:kafka主题集的正则表达式

tier1代理实例 tier1.sources.source1.type =
org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = 通道1 tier1.sources.source1.batchSize
= 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources。 source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
#正则表达式订阅主题实例 tier1.sources.source1.kafka .topics.regex = ^topic[0-9]$

Netcat source

参数说明 Type:’类型
Bind:绑定主机名或IP地址
Port: 监听端口

名为 a1 的代理示例:
a1.sources = r1 a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
启动flume打开新的终端输入:netlet master 6666

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5688942.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存