#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author: 风过无言花易落 # @Date : 2020/11/02 14:00 # @Desc : kafka生产脚本 from confluent_kafka import Producer import json import timeit,time,os from faker import Faker import multiprocessing as mp import num global numg numg = num.nummsg f = Faker(locale='zh-CN') class Kafka_producer(): ''' 使用kafka的生产模块 ''' def __init__(self): self.kafkatopic = 'TRANSFER_TOPIC' # topic conf = { "bootstrap.servers": "10.121.114.115:32117,10.121.114.194:31019,10.121.114.120:31117", "security.protocol": "SASL_PLAINTEXT", "sasl.mechanisms":"SCRAM-SHA-256", "sasl.username":"fk-test", "sasl.password":"1qaz@WSX" } # print (conf) self.p = Producer(**conf) def main(p_name, count): start = timeit.default_timer() ''' producer mes :return: ''' processId = os.getpid() print('进程ID',processId) # 测试生产模块 producer = Kafka_producer() # 平均插入的消息数 for i in range(count): cdr = { "siteId": 1, "activityId": "csjgtest", "action_id": "1", "action_type": "1", "url": "http://www.baidu.com/s?wd=werkwle&rsv_spt=1&rsv_iqid=0xb5918fde0000b83b&issp=1&f=8&rsv_bp=1&rsv_idx=2&ie=utf-8&tn=baiduhome_pg&rsv_enter=1&rsv_dl=tb&rsv_sug3=8&rsv_sug1=3&rsv_sug7=101&rsv_sug2=0&rsv_btype=i&prefixsug=werkwle&rsp=5&inputT=744&rsv_sug4=1083", "uid": f.phone_number(), "ip": f.ipv4(), "cookie": 0 } # print(p_name,cdr) parmas_message = json.dumps(params) self.p.poll(0) self.p.produce(self.kafkatopic, parmas_message.encode('utf-8'), callback=delivery_report) # flush(), 强制刷新,扫尾工作,主要是为了,让数据流在管道的传输工程中全部传输过去 self.p.flush() # 在不考虑数据安全的情况可以注掉,影响效率 end = timeit.default_timer() - start return [processId,end,count] def delivery_report(err, msg): if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) if __name__ == '__main__': print('父进程', os.getpid()) #*************************************************************************************************************# count_ms = 10000 # 消息总数 # *************************************************************************************************************# processes = 5 count = int(count_ms / processes) result = [] # 创建进程 pool = mp.Pool(processes = processes) # processes_num 线程池数 for p_name in range(processes): result.append(pool.apply_async(func = main, args = (p_name, count))) pool.close() pool.join() for res in result: process, rtime,coun_nu = res.get() print("process(%s) done. --Running time: %s Seconds"%(process,rtime),coun_nu)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)