1)数据抽取
在/export/servers/apache-flume-1.8.0-bin/conf下创建kafka-hdfs.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092 a1.sources.r1.kafka.consumer.group.id = flume_test a1.sources.r1.kafka.topics = test2 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://node01:8020/rawlog/20%y-%m-%d/%H a1.sinks.k1.hdfs.useLocalTimeStamp = false a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.filePrefix = test a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 12800000000 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.threadsPoolSize = 10 a1.sinks.k1.hdfs.batchSize = 2000 a1.sinks.k1.hdfs.threadsPoolSize = 10 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1500000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2)下载Hadoop依赖
从Hadoop集群中获取相关jar包放到flume/lib文件夹下
hadoop-common-2.6.3.jar hadoop-hdfs-2.6.3.jar commons-configuration-1.6.jar hadoop-auth-2.6.3.jar htrace-core-3.0.4.jar
3)启动
在node01上启动
nohup bin/flume-ng agent -n a1 -c conf -f conf/kafka-hdfs.conf >/dev/null 2>&1 &
bin/flume-ng agent -c conf -f conf/kafka-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)