flume是一个实时的、分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,Event携带日志数据(字节数组形式)并且携带有头信息,Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责将这些信息传输到目标位置
2.flume的组件3.怎么写Client:Client生产数据,运行在一个独立的线程。
Event: 一个数据单元,消息头和消息体组成。(Events可以是日志记录、 avro 对象等。) Flow: Event从源点到达目的点的迁移的抽象。
Agent: 一个独立的Flume进程,包含组件Source、 Channel、 Sink。(Agent使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。) Source: 数据收集组件。(source从Client收集数据,传递给Channel)
Channel: 中转Event的一个临时存储,保存由Source组件传递过来的Event。(Channel连接 sources 和 sinks ,这个有点像一个队列。)
Sink: 从Channel中读取并移除Event,将Event传递到FlowPipeline中的下一个Agent(如果有的话)(Sink从Channel收集数据,运行在一个独立线程。)
source,channel,sink可以用的种类非常的多,根据自己的需要从官方文档里面挑选
Welcome to Apache Flume — Apache Flumehttps://flume.apache.org/
首先我们写的是从linux本地读取文件,打印到控制台的案例
source(Spooling): 首先加粗的都是必选的字段,这个内容太长了,就截取了一点
type:根据文档内容, 写spooldir
spoolDir:这是你需要监听的文件路径
channels:留在组装的时候写
channels(memory):
可以看出必选的是type,memory
sink(logger):
type:logger,表示打印在控制台
channel :留在组装的时候写
写一个conf文件,内容如下:
#给agent取一个名字,通常就写a1 a1.sources=r1 a1.channels=c1 a1.sinks=k1 #配置source a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/root/data #配置channels a1.channels.c1.type=memory #配置sinks a1.sinks.k1.type=logger #组装 a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
格式:flume-ng agent -n agent名称 写过配置的conf文件
flume-ng agent -n a1 -f ./spoolingtest.conf
运行结果: 监听到了我们指定的/root/data文件夹下面的内容
然后,在flume读取完了我们的文件后,在上面打上了一个COMPLETED的标记
此时我们向这个文件中再次写入数据,是不会打印出改动的内容的,所以我们可以给这个文件改个名字,把标记去掉就可以看到新的内容了。
再给一个复杂的案例,从本地到hdfs
配置文件现在写复杂一点了:
#命名 a.sources=r1 a.sinks=k1 a.channels=c1 #指定spooldir的属性 a.sources.r1.type=spooldir a.sources.r1.spoolDir=/root/data a.sources.r1.fileHeader=true # 将拦截器i1的类型设置为timestamp #会将处理数据的时间以毫秒的格式插入event的header中 a.sources.r1.interceptors=i1 a.sources.r1.interceptors.i1.type=timestamp #指定sink的类型为hdfs(type必须是hdfs) a.sinks.k1.type=hdfs #hdfs.path,必选的设置输出文件路径 a.sinks.k1.hdfs.path=/flume/data/dir1 #指定我们输出文件的前缀是student,后缀是.txt a.sinks.k1.hdfs.filePrefix=student a.sinks.k1.hdfs.fileSuffix=.txt # 指定达到多少数据量写一次文件 单位:bytes a.sinks.k1.hdfs.rollSize = 102400 # 指定多少条写一次文件,原来默认是10行数据就写一次,会产生很多小文件,我们就给他改大一点 #rollSize和rollCount只要满足一个条件就可以被写成文件 a.sinks.k1.hdfs.rollCount = 1000 #文件格式默认是SequenceFile,我们需要改成DataStream a.sinks.k1.hdfs.fileType=DataStream # 指定文件输出格式 为text a.sinks.k1.hdfs.writeFormat=text #指定channel a.channels.c1.type=memory a.channels.c1.capacity=1000 # 表示sink每次会从channel里取多少数据 a.channels.c1.transationCapacity=100 a.sources.r1.channels=c1 a.sinks.k1.channel=c1
flume-ng agent -n a -f spoolingToHDFS.conf
成功将内容传输到hdfs上了
sink(hdfs)中还有3个重要的参数说明一下
hdfs.rollSize 1024 当临时文件达到多少1024byte时,滚动成目标文件,
hdfs.rollCount 10 当 events 数据也就是10行数据的时候,就滚动文件
hdfs.rollInterval 30 间隔30s将临时文件滚动成最终目标文件
可以看下面这张图片,我们的数据读取完成了,但是这个新的临时文件既没有到达我们指定的大小,event也没满足,所以只能等30秒被处理成正常的文件
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)