安装newcat:
sudo yum install -y nc
查看端口是否被占用:
sudo netstat -tunlp | grep 端口号
在flume-netcat-logger.conf文件中添加:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
先对agent令名组件:
a1是当前的agent的名字,sources、sinks、channels 在agent中定义多个source、sink、channel
r1、k1、c1与下面的与之对应
描述配置source----a1的输入源类型为newcat端口类型、a1的的监听主机以及端口号
sink----表示a1的输出目的是控制台logger类型
channel的类型是memory内存型(也可以是File),缓冲的总容量是1000(1000代表1000个事件,并不是字节),收集到100个事件再提交事务
由于source、sink、channel可能较多,所以在最后要做一个绑定,建立单独的通道才能顺利传输
r1写入c1,k1从c1拉数据
channel加了s,所以一个source可以缓存到多个channel,而下面的channel没有加s,所以sink只来自一个channel,一个channel可以被多个sink读取
启动flume监听端口:
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
这个就相当于是一个客户端去连接服务器
这里由于我没有安装集群,所以我就ssh远程连接了虚拟机用来监听
实时监控单个追加文件
监控文件,则令名组件、channel、sink以及绑定都不变,只需要改变source的配置
黑体加粗的是必须要有的配置,没有加粗的是可选项
a1.sources.r1.type = exec a1.sources.r1.command = tail -f 要监控的文件路径 #f-监控失败就失败了,-F则会重新试 f默认先读10行
如果tail后面的参数是f,则会还没有监控文件之前就会打印十条的数据
实时读取本地文件到HDFS:对某一个文件进行监控
1、Flume 要想将数据输出到 HDFS,须持有 Hadoop 相关 jar 包
commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
2、创建 flume-file-hdfs.conf 文件,并将以下内容添加进去
# Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log a2.sources.r2.shell = /bin/bash -c # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k2.hdfs.batchSize = 1000 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 30 #设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a2.sinks.k2.hdfs.rollCount = 0 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
并不是积攒1000个才flush到HDFS ,而是即使没到1000个事件,但是到了一个小时,还是会flush,如果没到一个小时,但是积攒了1000个事件也会flush
然后启动flume:
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
启动hadoop和hive
sbin/start-dfs.sh #启动hadoop sbin/start-yarn.sh
访问hadoop102:50070,查看文件
实时读取目录文件到HDFS:根据某个目录的文件变化,对增加的文件进行监控
创建配置文件 flume-dir-hdfs.conf
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /opt/module/flume/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true #忽略所有以.tmp 结尾的文件,不上传 a3.sources.r3.ignorePattern = ([^ ]*.tmp) # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
开启文件夹监控命令:
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
再到hadoop02:50070上面查看监控的文件
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)