flume与kafka集成配置

flume与kafka集成配置,第1张

简介

Flume代理配置存储在本地配置文件中。这是遵循Javaproperties文件格式的文本文件。可以在同一配置文件中指定一个或多个代理的配置。配置文件包括代理中每个source,sink和channel的属性,以及它们如何连接在一起以形成数据流。

流中的每个组件(source,sink和channel)都有一个名称,类型和特定于该类型和实例化的属性集。例如,一个Avro源需要一个主机名(或IP地址)和一个端口号来接收数据。内存通道可以具有最大队列大小(“capacity”),并且HDFS的sink需要知道文件系统URI,创建文件的路径,文件rotation的frequency(“hdfs.rollInterval”)等。组件的所有此类属性需要在hosting Flume代理的属性文件中进行设置。

代理需要知道要加载哪些单个组件以及如何连接它们才能构成流程。通过列出代理中每个source,sink和channel的名称,然后为每个sink和source指定channel来完成此 *** 作。例如,代理通过称为文件通道的文件通道将事件从名为avroWeb的Avro源流到HDFS接收器hdfs-cluster1。配置文件将包含这些组件的名称和文件通道,作为avroWebsource和hdfs-cluster1sink的共享通道。

使用称为flume-ng的shell脚本启动代理,该脚本位于Flume发行版的bin目录中。您需要在命令行上指定代理名称,配置目录和配置文件:

$ bin/flume-ng agent -n $agent_name -c conf-f conf/flume-conf.properties.template

然后,代理将开始运行在给定属性文件中配置的source,sink和channel。

示例

在这里,我们提供了一个示例配置文件,描述了单节点Flume部署。通过此配置,用户可以生成事件,然后将其记录到控制台。

#example.conf:单节点Flume配置

#在此代理上命名组件

a1.sources   =  r1

a1.sinks   =  k1

a1.channels   =  c1

#描述/配置源

a1.sources.r1.type   =  netcat

a1.sources.r1.bind   =  localhost

a1.sources.r1.port   =  44444

#描述接收器

a1.sinks.k1.type   =  logger

#使用通道将事件缓存在内存

a1.channels.c1.type   =  memory

a1.channels中.c1.capacity   =  1000

a1.channels.c1.transactionCapacity   = 100

#将源和接收器绑定到通道

a1.sources.r1.channels   =  c1

a1.sinks.k1.channel   =  c1

此配置定义了一个名为a1的代理。a1具有侦听端口44444上的数据的source,在内存中缓冲事件数据的通道以及将事件数据记录到控制台的sink。配置文件为各个组件命名,然后描述它们的类型和配置参数。给定的配置文件可能会定义几个命名的代理。当启动给定的Flume进程时,会传递一个标志,告诉它要显示哪个命名的代理。

有了这个配置文件,我们可以如下启动Flume:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

请注意,在完整部署中,我们通常会包含一个选项:-- conf

在.bash_profile中加入flume环境变量

PATH=/usr/flume/bin:$PATH:$HOME/bin

source .bash_profile刷新

使用shell将大量文件分发到不同sources中

定时任务* * * * * sh cp.sh

cp.sh

#!/bin/bash

source ~/.bash_profile

time=`date +%Y%m%d%H%M -d -2min`

echo `date '+%Y-%m-%d %H:%M:%S'`":$time cp start"

for file in `ls *xxxx*`

do

 file_name=`basename $file`

 cp $file /data/1/$file_name.tmp

 mv /data/1/$file_name.tmp  /data/1/$file_name

done &

创建.conf文件

例:

agent1.sources = s1

agent1.channels = c1

agent1.sinks = k1 k1_1

agent1.sources.s1.type = spooldir

agent1.sources.s1.fileSuffix = .comp

agent1.sources.s1.deletePolicy = immediate

agent1.sources.s1.spoolDir=/data/1/

agent1.sources.s1.fileHeader= false

agent1.sources.s1.channels = c1

agent1.sources.s1.trackerDir = /data/flumespool/s1

agent1.sources.s1.ignorePattern = (.)*.\.tmp

agent1.channels.c1.type = memory

agent1.channels.c1.keep-alive = 10

agent1.channels.c1.capacity = 5000

agent1.channels.c1.transactionCapacity = 1000

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1.topic = topic

agent1.sinks.k1.brokerList = kafka_1:9092,kafka_2:9092,kafka_3:9092

agent1.sinks.k1.requiredAcks = 1

agent1.sinks.k1.batchSize = 500

agent1.sinks.k1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1.channel = c1

agent1.sinks.k1_1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1_1.topic = topic

agent1.sinks.k1_1.brokerList =kafka_1:9092,kafka_2:9092,kafka_3:9092

agent1.sinks.k1_1.requiredAcks = 1

agent1.sinks.k1_1.batchSize = 500

agent1.sinks.k1_1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1_1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1_1.channel = c1

sinks中k1与k1_1实现双线程

使用1个flume连接2个kafka,即同时向2个kafka中录入数据可以在同一agent下配置2个channels和2个sinks,source共用一个

示例:

xxx.conf

agent1.sources = s1

agent1.channels = c1 cx1

agent1.sinks = k1 k1_1 kx1 kx1_1

agent1.sources.s1.type = spooldir

agent1.sources.s1.fileSuffix = .comp

agent1.sources.s1.deletePolicy = immediate

agent1.sources.s1.spoolDir=/data/1/

agent1.sources.s1.fileHeader= false

agent1.sources.s1.channels = c1 cx1

agent1.sources.s1.trackerDir = /data/flumespool/s1

agent1.sources.s1.ignorePattern = (.)*.\.tmp

agent1.channels.c1.type = memory

agent1.channels.c1.keep-alive = 10

agent1.channels.c1.capacity = 5000

agent1.channels.c1.transactionCapacity = 1000

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1.topic = topic1

agent1.sinks.k1.brokerList =kafka1_1:9092,kafka1_2:9092,kafka1_3:9092

agent1.sinks.k1.requiredAcks = 1

agent1.sinks.k1.batchSize = 500

agent1.sinks.k1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1.channel = c1

agent1.sinks.k1_1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1_1.topic = topic1

agent1.sinks.k1_1.brokerList = kafka1_1:9092,kafka1_2:9092,kafka1_3:9092

agent1.sinks.k1_1.requiredAcks = 1

agent1.sinks.k1_1.batchSize = 500

agent1.sinks.k1_1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1_1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1_1.channel = c1

agent1.channels.cx1.type = memory

agent1.channels.cx1.keep-alive = 10

agent1.channels.cx1.capacity = 5000

agent1.channels.cx1.transactionCapacity = 1000

agent1.sinks.kx1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.kx1.topic = topic2

agent1.sinks.kx1.brokerList = kafka2_1:9092,kafka2_2:9092,kafka2_3:9092

agent1.sinks.kx1.requiredAcks = 1

agent1.sinks.kx1.batchSize = 500

agent1.sinks.kx1.kafka.receive.buffer.bytes = 200000

agent1.sinks.kx1.kafka.send.buffer.bytes = 300000

agent1.sinks.kx1.channel = cx1

agent1.sinks.kx1_1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.kx1_1.topic = topic2

agent1.sinks.kx1_1.brokerList = kafka2_1:9092,kafka2_2:9092,kafka2_3:9092

agent1.sinks.kx1_1.requiredAcks = 1

agent1.sinks.kx1_1.batchSize = 500

agent1.sinks.kx1_1.kafka.receive.buffer.bytes = 200000

agent1.sinks.kx1_1.kafka.send.buffer.bytes = 300000

agent1.sinks.kx1_1.channel = cx1

启动命令如下:

flume-ng agent -c /usr/flume/conf/ -f xxx.conf -n agent1 -Dflume.root.logger=INFO,console >log/1`date +%Y%m%d`.log 2>&1 &

安装地址:

安装部署:

本地使用的是CDH 6.3.1 版本,已安装Flume,此处略过安装步骤

使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

安装netcat并检查端口是否被占用

在Flume的安装目录下创建conf/lib目录,并创建flume的配置文件

添加内容如下:

第一种写法:

第二种写法:

参数说明:

--conf/-c:表示配置文件存储在 conf/目录

--name/-n:表示给 agent 起名为 a1

--conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf

文件。

-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger

参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、

error。

通过nc输入的数据,flume监听页面都接受到了,并且输出到了控制台

实时监控 Hive 日志,并上传到 HDFS 中

注:要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive 日志在 Linux 系统中所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行Linux 命令来读取文件。

添加如下内容:

注意: 对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加 timestamp)。

a3.sinks.k3.hdfs.useLocalTimeStamp = true

从日志可以看到文件已经上传到HDFS:

在HDFS上查看:

1小时自动生产一个目录

1分钟自动生产一个文件

tmp结尾的文件为正在写入的文件,时间到了后就会自动重命名

使用 Flume 监听整个目录的文件,并上传至 HDFS

添加如下内容:

flume日志:

从日志输出可以看到原目录的 c.txt直接被修改为 c.txt.COMPLETED,然后c.txt上传到一个另外名字的文件,而且从输出可以看到,多个文件的内容会合并上传到一个hdfs上的文件。

hdfs上看输出:

同样是1分钟一个文件,但是有写入才会创建,如果没有写入是不行的。

  Exec source 适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而 Taildir Source 适合用于监听多个实时追加的文件,并且能够实现断点续传。

案例需求:

使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS 。

添加如下内容:

flume控制台输出:

HDFS查看输出文件:

Taildir Source 维护了一个 json 格式的 position File,其会定期的往 position File中更新每个文件读取到的最新的位置,因此能够实现断点续传

注:

Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码, *** 作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。

改名后inode不会发生变化,这点要注意

flume 有三大组件source 、channel和sink,各个组件之间都可以相互组合使用,各组件间耦合度低。使用灵活,方便。

1.多sink

channel 的内容只输出一次,同一个event 如果sink1 输出,sink2 不输出;如果sink1 输出,sink1 不输出。 最终 sink1+sink2=channel 中的数据。

配置文件如下:

a1.sources=r1a1.sinks= k1 k2a1.channels= c1# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# channela1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c1#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp

2.多 channel 多sink ,每个sink 输出内容一致

(memory channel 用于kafka *** 作,实时性高,file channel 用于 sink file 数据安全性高) 

(多channel 单 sink 的情况没有举例,个人感觉用处不广泛。)

配置文件如下:

a1.sources=r1a1.sinks= k1 k2a1.channels= c1 c2# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1 c2a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log#多个channel 的数据相同a1.sources.r1.selector.type=replicating# channel1a1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#channel2a1.channels.c2.type= filea1.channels.c2.checkpointDir= /opt/apps/flume-1.7.0/checkpointa1.channels.c2.dataDirs= /opt/apps/flume-1.7.0/data#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c2#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp

3. 多source 单 channel 单 sink

多个source 可以读取多种信息放在一个channel 然后输出到同一个地方 

配置文件如下:

a1.sources=r1r2a1.sinks= k1a1.channels= c1# source1a1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# source2a1.sources.r2.type= execa1.sources.r2.shell= /bin/bash -ca1.sources.r2.channels= c1a1.sources.r2.command= tail -F /opt/apps/logs/tail2.log# channel1  in memorya1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy

flume 像乐高积木一样可以自己随心所欲将不同的组件进行搭配使用,耦合度低。

Source

rpc远程过程调用协议,客户机与服务机的调用模式需要对数据进行序列化。

         1:客户机将参数序列化并以二进制形式通过网络传输到服务器。

         2:服务器接收到后进行反序列化再调用方法获取返回值。

         3:服务器将返回值序列化后再通过网络传输给客户机。

         4:客户机接收到结果后再进行反序列化获取结果。

Avro source:

         Avro就是一种序列化形式,avrosource监听一个端口只接收avro序列化后的数据,其他类型的不接收。

         type:avrosource的类型,必须是avro。

bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。

port:绑定的本地的端口。

Thrif source:

         和avro一样是一种数据序列化形式,Thrifsource只采集thrift数据序列化后的数据

Exec source:

         采集linux命令的返回结果传输给channel

         type:source的类型:必须是exec。

command:要执行命令。

tail  –f  若文件被删除即使重新创建同名文件也不会监听

        tail  -F  只要文件同名就可以继续监听

以上可以用在日志文件切割时的监听

JMS Source:

Java消息服务数据源,Java消息服务是一个与具体平台无关的API,这是支持jms规范的数据源采集;

Spooling Directory Source:通过文件夹里的新增的文件作为数据源的采集;

Kafka Source:从kafka服务中采集数据。

NetCat Source:绑定的端口(tcp、udp),将流经端口的每一个文本行数据作为Event输入

        type:source的类型,必须是netcat。

bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。

port:绑定的本地的端口。

HTTP Source:监听HTTP POST和 GET产生的数据的采集

Chanel

         是一个数据存储池,中间通道,从source中接收数据再向sink目的地传输,如果sink写入失败会自动重写因此不会造成数据丢失。

         Memory:用内存存储,但服务器宕机会丢失数据。

                 Typechannel的类型:必须为memory

capacity:channel中的最大event数目

transactionCapacity:channel中允许事务的最大event数目

         File:使用文件存储数据不会丢失数据但会耗费io。

                 Typechannel的类型:必须为 file

checkpointDir :检查点的数据存储目录

dataDirs :数据的存储目录

transactionCapacity:channel中允许事务的最大event数目

         SpillableMemory Channel:内存文件综合使用,先存入内存达到阀值后flush到文件中。

                Typechannel的类型:必须为SPILLABLEMEMORY

memoryCapacity:内存的容量event数

overflowCapacity:数据存到文件的event阀值数

checkpointDir:检查点的数据存储目录

dataDirs:数据的存储目录

         Jdbc:使用jdbc数据源来存储数据。

         Kafka:使用kafka服务来存储数据。

Sink

         各种类型的目的地,接收channel写入的数据并以指定的形式表现出来。Sink有很多种类型。

type:sink的类型 必须是hdfs。

hdfs.path:hdfs的上传路径。

hdfs.filePrefix:hdfs文件的前缀。默认是:FlumeData

hdfs.rollInterval:间隔多久产生新文件,默认是:30(秒) 0表示不以时间间隔为准。

hdfs.rollSize:文件到达多大再产生一个新文件,默认是:1024(bytes)0表示不以文件大小为准。

hdfs.rollCount:event达到多大再产生一个新文件,默认是:10(个)0表示不以event数目为准。

hdfs.batchSize:每次往hdfs里提交多少个event,默认为100

hdfs.fileType:hdfs文件的格式主要包括:SequenceFile,DataStream ,CompressedStream,如果使用了CompressedStream就要设置压缩方式。

hdfs.codeC:压缩方式:gzip,bzip2, lzo, lzop, snappy

注:%{host}可以使用header的key。以及%Y%m%d来表示时间,但关于时间的表示需要在header里有timestamp这个key。

Logger Sink将数据作为日志处理(根据flume中的设置的日志方式来显示)

要在控制台显示在运行agent的时候加入:-Dflume.root.logger=INFO,console。

type:sink的类型:必须是logger。

maxBytesToLog:打印body的最长的字节数 默认为16

Avro Sink:数据被转换成Avro Event,然后发送到指定的服务端口上。

                 type:sink的类型:必须是 avro。

hostname:指定发送数据的主机名或者ip

port:指定发送数据的端口

实例

1:监听一个文件的增加变化,采集数据并在控制台打印。

在这个例子中我使用exec source,memory chanel,logger sink。可以看我的agent结构图

以下是我创建的exec_source.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=exec

a1.sources.r1.command=tail -F/usr/local/success.log

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

执行命令:

bin/flume-ngagent --conf conf/ --conf-file conf/exec_source.conf --name a1-Dflume.root.logger=INFO,console &

然后更改/usr/local/success.log文件中的内容后可以看到flume采集到了文件的变化并在控制台上打印出来。文件初始内容hello和how are you,剩下的i am fine和ok为新增加内容。

2:监控一个文件变化并将其发送到另一个服务器上然后打印

这个例子可以建立在上一个例子之上,但是需要对flume的结构做一些修改,我使用avro序列化数据再发送到指定的服务器上。详情看结构图。

实际上flume可以进行多个节点关联,本例中我只使用131向139发送数据

131,139上都必须启动agent

服务器131配置

以下是我创建的exec_source_avro_sink.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=exec

a1.sources.r1.command=tail -F/usr/local/success.log

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=avro

a1.sinks.k1.hostname=192.168.79.139

a1.sinks.k1.port=42424

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

执行命令启动agent

bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&

139服务器配置

执行命令拷贝flume到139

scp -r apache-flume-1.7.0-bin/root@192.168.79.139:/usr/local/

修改exec_source_avro_sink.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=42424

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

执行命令启动agent

bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&

结果可以在139控制台上看到131中修改success.log的变化信息

3:avro-client实例

执行bin/flume-ng会提示有命令如下

help                     display this help text

agent                     run aFlume agent

avro-client               run anavro Flume client

version                   show Flume version info

avro-clinet是avro客户端,可以把本地文件以avro序列化方式序列化后发送到指定的服务器端口。本例就是将131的一个文件一次性的发送到139中并打印。

Agent结构图如下

131启动的是一个avro-client,它会建立连接,发送数据,断开连接,它只是一个客户端。

启动一个avro客户端

bin/flume-ngavro-client --conf conf/ --host 192.168.79.139 --port 42424 --filename/usr/local/success.log --headerFile /usr/local/kv.log

--headerFile是用来区分是哪个服务器发送的数据,kv.log中的内容会被发送到139,可以作为标识来使用。

139的avro_client.conf如下

a1.sources=r1

a1.channels=c1

a1.sinks=k1

a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=42424

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动agent

bin/flume-ngagent --conf conf/ --conf-file conf/avro_client.conf --name a1-Dflume.root.logger=INFO,console &

139控制台显示如下

可以看到headers的内容headers:{hostname=192.168.79.131}

注意:

1:Flume服务没有stop命令需要通过kill来杀掉进行,可以使用jps  -m来确认是那个agent的number

[root@shb01 conf]# jps -m

3610 Jps -m

3512 Application --conf-fileconf/exec_source.conf --name a1

2:修改flume的配置文件后如avro_client.conf,flume会自动重启

3:logger sink默认只显示16个字节

4:flume是以event为单位进行数据传输的,其中headers是一个map容器map

Event: { headers:{hostname=192.168.79.131}body: 31 61                                           1a }

5:flume支持多节点关联但是sink和source的类型要一致,比如avro-client发送数据那么接收方的source也必须是avro否则会警告。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/11484994.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-16
下一篇 2023-05-16

发表评论

登录后才能评论

评论列表(0条)

保存