#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: 风过无言花易落 # @Date : 2019/9/26 14:00 # @Desc : kafka生产脚本(已topic作为启动线程数,消息总数会平均分配入topic中) from kafka import KafkaProducer from kafka.errors import KafkaError import json import timeit,time import threading import random import num global numg numg = num.nummsg class Kafka_producer(): ''' 使用kafka的生产模块 ''' def __init__(self, kafkahost, kafkatopic): self.kafkaHost = kafkahost self.kafkatopic = kafkatopic self.producer = KafkaProducer(bootstrap_servers='{kafka_host}'.format( kafka_host=self.kafkaHost )) def sendjsondata(self, params): try: parmas_message = json.dumps(params) producer = self.producer producer.send(self.kafkatopic, parmas_message.encode('utf-8')) # flush(), 强制刷新,扫尾工作,主要是为了,让数据流在管道的传输工程中全部传输过去 # producer.flush() # 在不考虑数据安全的情况可以注掉,影响效率 except KafkaError as e: print(e) def __del__(self): self.producer.close() def cdrbody(): number,imsi = random.choice(numg) plmn = 46001 prov = 74 up_vol = random.randint(1, 157286) # 上行流量 down_vol = random.randint(1, 157286) # 上行流量 nowDate = time.strftime('%Y%m%d', time.localtime(time.time())) nowTime = time.strftime('%H%M%S', time.localtime(time.time())) return number,imsi,plmn,nowDate,nowTime,up_vol,down_vol ,prov def main(host,count, topic, num, random_type): ''' producer mes :return: ''' # 测试生产模块 producer = Kafka_producer(host, topic) # 平均插入的消息数 for i in range(count): msg_body = [] if random_type: enum = random.randint(1,num) else: enum = num for x in range(enum): number, imsi, plmn, nowDate, nowTime, up_vol, down_vol ,prov = cdrbody() cdr = { "CDR": "3,PC1545036215.2052611843.2000.805306365.VISIT,11,0,86{},{},116.79.197.0,,1697867058,1467184619,116.79.197.0,3GWAP,,0,{},,{},{},1,1234569011,{},{},,46001128251697867058,6" .format(number, imsi, plmn, nowDate, nowTime, up_vol, down_vol) } msg_body.append(cdr) params = { "HEADER": { "PROV_CODE": prov, "SERV_TYPE": "GPP", "DATETIME": "20190926145248", "RECORD_NUM": enum }, "MBODY": msg_body } print(params) # with open('mes.log','a') as fobj: # fobj.write(str(params) + 'n') producer.sendjsondata(params) class Creat_Thread(threading.Thread): def __init__(self, host, count, tName, num, random_type): threading.Thread.__init__(self) self.count = count self.tName = tName self.host = host self.num = num self.random_type = random_type def run(self): start = timeit.default_timer() print("Starting-Topic: " + self.tName) main(self.host,self.count, self.tName, self.num, random_type) end = timeit.default_timer() print("Exiting-Topic:" + self.tName, '--Running time: %s Seconds' % (end - start),self.count) if __name__ == '__main__': host = "10.111.150.192:9092,10.111.150.93:9092,10.111.150.94:9092" #*************************************************************************************************************# topic_list = ['YH-XXJF-74'] # topic,多个topic以逗号分隔 random_type = False # 随机类型 True :随机,False;不随机 count_ms = 1 # 消息总数 num = 25 # 消息体中的话单数,当random_type为真时此变量作为单条消息最大随机条数 # *************************************************************************************************************# topic_nu = len(topic_list) # 获取topic总数 count = int(count_ms / topic_nu) # 创建新线程 threads = [] for tName in topic_list: thread = Creat_Thread(host,count, tName, num, random_type) thread.start() threads.append(thread) # 等待所有线程完成 for t in threads: t.join() print("Exiting Main Thread")
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)