Flume是一个分布式的高可用的消费组件。通过修改配置文件,可以启动不同的agent处理不同来源的数据。
架构
配置步骤
- 定义流
要在单个代理中定义流,需要通过通道连接源和接收器,需要列出代理的源、接收器和通道,然后将源和接收器指向一个通道。一个source实例可以指定多个channel,而一个sink实例只能指定一个channel。格式如下:
#列出代理的源、接收器和通道(别名) .sources =.sinks = .channels = #设置源 的 通道。 sources. .channels = ... # 设置接收器的通道 .sinks. .channel =
- 配置单个组件
定义好流后,需要设置每个source、sink和channel的属性。
# 源属性 .sources.. = # 通道属性 .channel. . = # 接收器属性 。 sources. . =
- 在代理中添加多个流
# 列出代理的源、接收器和通道 .sources =.sinks = .channels =
将源和接收器链接到相应的通道(用于接收器)的通道(用于接收器)以设置两个不同的流。
例如:一个从外部 avro 客户端到外部 HDFS,另一个从尾部的输出到 avro 接收器
# 列出代理中的源、接收器和通道 agent_foo.sources = avro-AppSrv-source1 exec-tail- source2 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 file-channel-2 # flow #1 配置 agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 # flow #2 配置 agent_foo.sources .exec-tail-source2.channels = file-channel-2 agent_foo.sinks.avro-forward-sink2.channel = file-channel-2Source类型
Avro Source:
监听AVRO端口来接受来自外部AVRO客户端的事件流。利用Avro Source可以实现多级流动、扇出流、扇入流等效果。另外也可以接受通过flume提供的Avro客户端发送的日志信息。
avro Source参数
Type: 组件类型
avro Bind: 侦听的主机名或IP地址
Port: 绑定的端口
案例:配置文件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#配置source
a1.sources.r1.type=avro
a1.sources.r1.bind=192.168.86.13
a1.sources.r1.port=44444
#描述sink
a1.sinks.k1.type=logger
#描述内存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#为channels绑定sources和sink
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动flume
/home/software/flume/bin/flume-ng agent -c /home/software/flume/conf -f /home/software/flume/conf/example.conf -n a1 -Dflume.root.logger=DRBUG,console
参数说明: -n 指定agent名称(与配置文件中代理的名字相同)
-c 指定flume中配置文件的目录
-f 指定配置文件
-Dflume.root.logger=DEBUG,console 设置日志等级
通过flume提供的avro客户端向指定机器指定端口发送日志信息:flume-ng avro-client -c ~/home/software/flume/conf -H 192.168.123.102 -p 6666 -F 666.txt
运行结果如下:
Exec Source
Exec 源在启动时运行给定的 Unix 命令,并期望该进程在标准输出上连续生成数据(stderr 被简单地丢弃,除非属性 logStdErr 设置为 true)。如果进程因任何原因退出,源也会退出并且不会产生更多数据)
Type:组件类型
Command:要执行的命令
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F/var/log/secure a1.sources.r1.channels = c1
JMS Source
JMS 源从 JMS 目标(例如队列或主题)读取消息。作为一个 JMS 应用程序,它应该可以与任何 JMS 提供程序一起使用,但仅通过 ActiveMQ 进行了测试。JMS 源提供可配置的批处理大小、消息选择器、用户/传递和消息到flume 事件转换器。
参数说明
Type:类型
initianContextFactory:初始上下文工厂
connectionFactory:连接工厂
prividerURL:提供者网址
destinationName:目的地名称
destinationType:目的地类型
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1 .connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = TCP:// MQSERVER:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
Spooling Directory Source
Spooling Directory Source监测配置的目录下新增的文件,并将文件中的数据读取出来。其中,Spool Source有2个注意地方,第一个是拷贝到spool目录下的文件不可以再打开编辑,第二个是spool目录下不可包含相应的子目录。这个主要用途作为对日志的准实时监控。
参数说明
Type:类型
spoolDir:读取文件的目录Example for agent named a1:
a1.channels = ch-1 a1.sources = src-1a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var /log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
案例:配置文件
a1.sources=s1
a1.sinks=k1
a1.channels=c1
#配置源
a1.sources.s1.type=spooldir
a1.sources.s1.spoolDir = /home/software/hadoop/logs
a1.sources.s1.fileHeader-true
a1.sources.s1.channels=c1
a1.sinks.k1.type=logger
a1.sinks.k1.channel =c1
a1.channels.c1.type=memory
启动flume后执行文件
运行结果
Kafka Source
参数说明 Type:类型
Kafka.bootstrap.servers:kafka集群中服务器列表
kafka,consumer.group.id:消费者群体的唯一标识 kafka.topics:kafka主题
kafka.topics.regex:kafka主题集的正则表达式
tier1代理实例 tier1.sources.source1.type =
org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = 通道1 tier1.sources.source1.batchSize
= 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources。 source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
#正则表达式订阅主题实例 tier1.sources.source1.kafka .topics.regex = ^topic[0-9]$
Netcat source
参数说明 Type:’类型
Bind:绑定主机名或IP地址
Port: 监听端口
名为 a1 的代理示例:
a1.sources = r1 a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1
启动flume打开新的终端输入:netlet master 6666
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)