#安装jdk,flume依赖于java环境 wget http://mirrors.hust.edu.cn/apache/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz2、解压
tar zxf apache-flume-1.6.0-bin.tar.gz -C /opt/3、配置
cd /opt/apache-flume-1.6.0-bin/conf cp flume-conf.properties.template flume-conf.properties vim flume-conf.properties agent.sources = r1 agent.channels = c1 agent.sinks = s1 agent.sources.r1.type = netcat agent.sources.r1.bind = 172.31.103.132 agent.sources.r1.port = 8888 agent.sources.r1.channels = c1 agent.sinks.s1.type = file_roll agent.sinks.s1.sink.directory = /tmp/log/flume agent.sinks.s1.channel = c1 agent.channels.c1.type = memory agent.channels.c1.capacity = 100二、功能验证 1、建立输出目录
mkdir -p /tmp/log/flume2、启动服务
cd /opt/apache-flume-1.6.0-bin nohup bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent & #运行日志位于logs目录,或者启动时添加-Dflume.root.logger=INFO,console 选项前台启动,输出打印日志,查看具体运行日志,服务异常时查原因。3、发送数据
telnet 172.31.103.132 8888 输入: hello world4、查看数据文件 查看 /tmp/log/flume 目录文件
#根据实际情况选择文件名 cat 1540783812706-4 hello world5、与Kafka 集成
#Flume 可以灵活地与Kafka 集成,Flume侧重数据收集,Kafka侧重数据分发。 Flume可配置source为Kafka,也可配置sink 为Kafka。 配置sink为kafka例子如下 agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.s1.topic = test #kafka需先创建topic agent.sinks.s1.brokerList = node1:9092,node2:9092,node3:9092 agent.sinks.s1.requiredAcks = 1 agent.sinks.s1.batchSize = 20 agent.sinks.s1.channel = c16、输出日志文件到hdfs
agent.sources = r1 agent.channels = c1 agent.sinks = s1 agent.sources.r1.type = exec agent.sources.r1.command = tail -f /data/log.txt agent.sources.r1.channels = c1 agent.sinks.s1.type = hdfs agent.sinks.s1.channel = c1 agent.sinks.s1.hdfs.path = hdfs://cluster1/flume/events/%y-%m-%d/%H%M agent.sinks.s1.hdfs.filePrefix = events- agent.sinks.s1.hdfs.round = true agent.sinks.s1.hdfs.roundValue = 10 agent.sinks.s1.hdfs.roundUnit = minute agent.sinks.s1.rollInterval = 3 agent.sinks.s1.rollSize = 200 agent.sinks.s1.rollCount = 10 agent.sinks.s1.batchSize = 5 agent.sinks.s1.hdfs.useLocalTimeStamp = true agent.sinks.s1.hdfs.fileType = DataStream agent.channels.c1.type = memory agent.channels.c1.capacity = 100 agent.sinks.s1.channel = c1
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)