Python多进程写Kafka

Python多进程写Kafka,第1张

Python多进程写Kafka
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: 风过无言花易落
# @Date  : 2020/11/02 14:00
# @Desc  : kafka生产脚本
from confluent_kafka import Producer
import json
import timeit,time,os
from faker import Faker
import multiprocessing as mp
import num

global numg
numg = num.nummsg
f = Faker(locale='zh-CN')
class Kafka_producer():
    '''
    使用kafka的生产模块
    '''
    def __init__(self):
        self.kafkatopic = 'TRANSFER_TOPIC'  # topic
        conf = {
                "bootstrap.servers": "10.121.114.115:32117,10.121.114.194:31019,10.121.114.120:31117",
                "security.protocol": "SASL_PLAINTEXT",
                "sasl.mechanisms":"SCRAM-SHA-256",
                "sasl.username":"fk-test",
                "sasl.password":"1qaz@WSX"
                          }
        # print (conf)
        self.p = Producer(**conf)

def main(p_name, count):
    start = timeit.default_timer()
    '''
    producer mes
    :return:
    '''
    processId = os.getpid()
    print('进程ID',processId)
    # 测试生产模块
    producer = Kafka_producer()
    # 平均插入的消息数
    for i in range(count):
        cdr = {
            "siteId": 1,
            "activityId": "csjgtest",
            "action_id": "1",
            "action_type": "1",
            "url": "http://www.baidu.com/s?wd=werkwle&rsv_spt=1&rsv_iqid=0xb5918fde0000b83b&issp=1&f=8&rsv_bp=1&rsv_idx=2&ie=utf-8&tn=baiduhome_pg&rsv_enter=1&rsv_dl=tb&rsv_sug3=8&rsv_sug1=3&rsv_sug7=101&rsv_sug2=0&rsv_btype=i&prefixsug=werkwle&rsp=5&inputT=744&rsv_sug4=1083",
            "uid": f.phone_number(),
            "ip": f.ipv4(),
            "cookie": 0
        }
        # print(p_name,cdr)
        parmas_message = json.dumps(params)
        self.p.poll(0)
        self.p.produce(self.kafkatopic, parmas_message.encode('utf-8'), callback=delivery_report)
        # flush(), 强制刷新,扫尾工作,主要是为了,让数据流在管道的传输工程中全部传输过去
        self.p.flush()  # 在不考虑数据安全的情况可以注掉,影响效率
    end = timeit.default_timer() - start
    return [processId,end,count]

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

if __name__ == '__main__':
    print('父进程', os.getpid())
    #*************************************************************************************************************#
    count_ms = 10000 # 消息总数
    # *************************************************************************************************************#
    processes = 5
    count = int(count_ms / processes)
    result = []
    # 创建进程
    pool = mp.Pool(processes = processes) # processes_num 线程池数
    for p_name in range(processes):
        result.append(pool.apply_async(func = main, args = (p_name, count)))
    pool.close()
    pool.join()
    for res in result:
        process, rtime,coun_nu = res.get()
        print("process(%s) done. --Running time: %s Seconds"%(process,rtime),coun_nu)

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5676452.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存