- 1、启动集群
- 2、kafka创建topic
- 3、编辑conf
- 4、清空日志文件、并执行python
启动zookeeper,master,kafka,flume # 1、三个节点 /usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh start /usr/zookeeper/zookeeper-3.4.10/bin/zkServer.sh status # 2、master节点 启动hadoop /usr/hadoop/hadoop-2.7.3/sbin/start-all.sh # 3、启动kafka(三个节点) cd /usr/kafka/kafka_2.11-2.4.0/ bin/kafka-server-start.sh config/server.properties
当我们启动了zookeeper、hadoop、kafka。
kafka的安装配置可参考链接:Kafka集群分布式部署与测试
2、kafka创建topic
# 创建topic--badou_data kafka-topics.sh --create --topic badou_data --partitions 3 --replication-factor 2 --zookeeper master:2181,slave1:2181,slave2:2181 # 消费badou_data ./bin/kafka-console-consumer.sh --from-beginning --topic badou_data --bootstrap-server master:9092,slave1:9092,slave2:90923、编辑conf
cd /usr/flume/flume-1.7.0
vi conf/flume_kafka.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -f /usr/flume/flume-1.7.0/day6/flume_exec_test.txt # a1.sinks.k1.type = logger # 设置kafka接收器 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # 设置kafka的broker地址和端口号 a1.sinks.k1.brokerList=master:9092 # 设置Kafka的topic a1.sinks.k1.topic=badou_data # 设置序列化的方式 a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder # use a channel which buffers events in memory a1.channels.c1.type=memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel a1.sources.r1.channels=c1 a1.sinks.k1.channel=c1
设置接受sink地址master:9092,启动Flume。
cd /usr/flume/flume-1.7.0 ./bin/flume-ng agent --conf conf --conf-file ./conf/flume_kafka.conf -name a1 -Dflume.root.logger=INFO,console
4、清空日志文件、并执行python
cd /usr/flume/flume-1.7.0/day6 echo '' > flume_exec_test.txt
执行python flume_data_write.py,模拟将后端日志写入到日志文件中 python flume_data_write.py。
import random import time import pandas as pd import json writeFileName="/usr/flume/flume-1.7.0/day6/flume_exec_test.txt" cols = ["order_id","user_id","eval_set","order_number","order_dow","hour","day"] df1 = pd.read_csv('/root/day3/orders.csv') df1.columns = cols df = df1.fillna(0) with open(writeFileName,'a+') as wf: for idx,row in df.iterrows(): d = {} for col in cols: d[col]=row[col] js = json.dumps(d) wf.write(js+'n')
我们会发现,python的数据源源从kafka消费到 flume_exec_test.txt。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)