启动zookeeper-server
启动 kafka server
创建topic
""" Top Traffic Simulator """ from time import sleep, time, ctime from random import random, randint, choice from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092') TOPIC = 'toll' VEHICLE_TYPES = ("car", "car", "car", "car", "car", "car", "car", "car", "car", "car", "car", "truck", "truck", "truck", "truck", "van", "van") for _ in range(100000): vehicle_id = randint(10000, 10000000) vehicle_type = choice(VEHICLE_TYPES) now = ctime(time()) plaza_id = randint(4000, 4010) message = f"{now},{vehicle_id},{vehicle_type},{plaza_id}" message = bytearray(message.encode("utf-8")) print(f"A {vehicle_type} has passed by the toll plaza {plaza_id} at {now}.") producer.send(TOPIC, message) sleep(random() * 2)
output:
三、customer订阅topic并转存到mysql中""" Streaming data consumer """ from datetime import datetime from kafka import KafkaConsumer import mysql.connector TOPIC='toll' DATAbase = 'tolldata' USERNAME = 'root' PASSWORD = 'Mjk1NDEtdTMzMjk5' print("Connecting to the database") try: connection = mysql.connector.connect(host='localhost', database=DATAbase, user=USERNAME, password=PASSWORD) except Exception: print("Could not connect to database. Please check credentials") else: print("Connected to database") cursor = connection.cursor() print("Connecting to Kafka") consumer = KafkaConsumer(TOPIC) print("Connected to Kafka") print(f"Reading messages from the topic {TOPIC}") for msg in consumer: # Extract information from kafka message = msg.value.decode("utf-8") # Transform the date format to suit the database schema (timestamp, vehcile_id, vehicle_type, plaza_id) = message.split(",") dateobj = datetime.strptime(timestamp, '%a %b %d %H:%M:%S %Y') timestamp = dateobj.strftime("%Y-%m-%d %H:%M:%S") # Loading data into the database table sql = "insert into livetolldata values(%s,%s,%s,%s)" result = cursor.execute(sql, (timestamp, vehcile_id, vehicle_type, plaza_id)) print(f"A {vehicle_type} was inserted into the database") connection.commit() connection.close()
output:
检查mysql是否正确存入:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)