flume使用

flume使用,第1张

flume使用 1.flume介绍

flume是一个实时的、分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,Event携带日志数据(字节数组形式)并且携带有头信息,Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责将这些信息传输到目标位置

2.flume的组件

Client:Client生产数据,运行在一个独立的线程。

Event: 一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。) Flow: Event从源点到达目的点的迁移的抽象。

Agent: 一个独立的Flume进程,包含组件Source、 Channel、 Sink。(Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。) Source: 数据收集组件。(source从Client收集数据,传递给Channel)

Channel: 中转Event的一个临时存储,保存由Source组件传递过来的Event。(Channel连接 sources 和 sinks ,这个有点像一个队列。)

Sink: 从Channel中读取并移除Event,将Event传递到FlowPipeline中的下一个Agent(如果有的话)(Sink从Channel收集数据,运行在一个独立线程。)

3.怎么写

source,channel,sink可以用的种类非常的多,根据自己的需要从官方文档里面挑选

Welcome to Apache Flume — Apache Flumehttps://flume.apache.org/

 首先我们写的是从linux本地读取文件,打印到控制台的案例

source(Spooling): 首先加粗的都是必选的字段,这个内容太长了,就截取了一点

type:根据文档内容, 写spooldir

 spoolDir:这是你需要监听的文件路径 

channels:留在组装的时候写

channels(memory):

可以看出必选的是type,memory

sink(logger):

type:logger,表示打印在控制台

channel :留在组装的时候写

写一个conf文件,内容如下:

#给agent取一个名字,通常就写a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/data
 
#配置channels
a1.channels.c1.type=memory

#配置sinks
a1.sinks.k1.type=logger

#组装
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

格式:flume-ng agent -n agent名称  写过配置的conf文件

flume-ng agent -n a1 -f ./spoolingtest.conf  

运行结果: 监听到了我们指定的/root/data文件夹下面的内容

然后,在flume读取完了我们的文件后,在上面打上了一个COMPLETED的标记 

 

 此时我们向这个文件中再次写入数据,是不会打印出改动的内容的,所以我们可以给这个文件改个名字,把标记去掉就可以看到新的内容了。

再给一个复杂的案例,从本地到hdfs

配置文件现在写复杂一点了:

#命名
a.sources=r1
a.sinks=k1
a.channels=c1

#指定spooldir的属性
a.sources.r1.type=spooldir
a.sources.r1.spoolDir=/root/data
a.sources.r1.fileHeader=true
# 将拦截器i1的类型设置为timestamp 
#会将处理数据的时间以毫秒的格式插入event的header中
a.sources.r1.interceptors=i1
a.sources.r1.interceptors.i1.type=timestamp

#指定sink的类型为hdfs(type必须是hdfs)
a.sinks.k1.type=hdfs
#hdfs.path,必选的设置输出文件路径
a.sinks.k1.hdfs.path=/flume/data/dir1
#指定我们输出文件的前缀是student,后缀是.txt
a.sinks.k1.hdfs.filePrefix=student
a.sinks.k1.hdfs.fileSuffix=.txt
# 指定达到多少数据量写一次文件 单位:bytes
a.sinks.k1.hdfs.rollSize = 102400
# 指定多少条写一次文件,原来默认是10行数据就写一次,会产生很多小文件,我们就给他改大一点
#rollSize和rollCount只要满足一个条件就可以被写成文件
a.sinks.k1.hdfs.rollCount = 1000
#文件格式默认是SequenceFile,我们需要改成DataStream
a.sinks.k1.hdfs.fileType=DataStream
# 指定文件输出格式 为text
a.sinks.k1.hdfs.writeFormat=text


#指定channel
a.channels.c1.type=memory
a.channels.c1.capacity=1000
# 表示sink每次会从channel里取多少数据
a.channels.c1.transationCapacity=100

a.sources.r1.channels=c1
a.sinks.k1.channel=c1
flume-ng agent -n a -f spoolingToHDFS.conf

成功将内容传输到hdfs上了 

sink(hdfs)中还有3个重要的参数说明一下

hdfs.rollSize    1024    当临时文件达到多少1024byte时,滚动成目标文件,  
hdfs.rollCount    10  当 events 数据也就是10行数据的时候,就滚动文件

hdfs.rollInterval    30    间隔30s将临时文件滚动成最终目标文件

可以看下面这张图片,我们的数据读取完成了,但是这个新的临时文件既没有到达我们指定的大小,event也没满足,所以只能等30秒被处理成正常的文件

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5652612.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存