Python集成kafka主要用到python-kafka和pyKafka,先简单介绍下两者的使用方式。
一、pyKafka生产者
import time from pykafka import KafkaClient class PyKafkaTest(object): def __init__(self, host): # 可使用kafka的ip端口,也可使用zookeeper的ip端口 self.client = KafkaClient(zookeeper_hosts=host) def producer_test(self, topic): topic = self.client.topics[topic.encode()] producer = topic.get_producer(sync=True) producer.produce('test'.encode(), partition_key='test'.encode(), timestamp=int(time.time() * 1000)) if __name__ == '__main__': host = 'localhost:2181' kafka_client = KafkaTest(host) topic = 'test-topic' kafka_client .producer_test(topic)
这是pyKafka的生产者简单实例,关于更多的使用,可以参考其它文章。
二、python-kafka生产者
from kafka import KafkaProducer class PythonKafkaTest(object): def __init__(self, host): producer = KafkaProducer(bootstrap_servers=host) def producer_test(self, topic): producer.send(topic, key='test'.encode(), value='test'.encode()) if __name__ == '__main__': host = 'localhost:9092' kafka_client = PythonKafkaTest(host) topic = 'test-topic' kafka_client .producer_test(topic)
这是python-kafka生产者的简单使用,详细使用可参考其它博客。
三、logging集成kafka
本文重点是将logging的日志发送到kafka中,logging的一些相关信息可以参考别的博文,这里主要展示如何将logging日志发送至kafka。
首先新建kafka_loggging_handler.py文件:
import json import logging from datetime import datetime from kafka import KafkaProducer import socket from terminal.conf import config, get_conf_name def get_host_ip(): """ 查询本机ip地址 :return: ip """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip # 不放到init里面是因为每次调用KafkaLoggingUtils里面的getLogger,都会初始化一次,就会连接kafka一次,放外面就只会加载一次 # 获取host地址 hosts = 'localhost:9092' # 设置block时间,设置acks为异步 producer = KafkaProducer(bootstrap_servers=hosts, compression_type='gzip',max_block_ms=1000, acks=0) class KafkaLoggingHandler(logging.Handler): def __init__(self): logging.Handler.__init__(self) # 这里是用来确定使用哪一个topic self.config_topic = 'test-topic' # 获取ip self.hostname = get_host_ip() # 重写emit方法 def emit(self, record): # 获取到日志里面需要的数据 logging_dict = getattr(record, '__dict__') # 对日志数据进行进一步处理,添加一些必要的数据 logging_dict['@timestamp'] = str(datetime.now()) logging_dict['HOSTNAME'] = self.hostname # 将数据dump成json字符串 msg = json.dumps(logging_dict) # 发送到kafka,partition_key生成规则,timestamp是创建消息时间,注意需要是bytes类型,因此对字符串 *** 作都是进行encode的处理 try: producer.send(self.config_topic, key='python-logging'.encode('utf-8'), value=msg.encode('utf-8')) except Exception as e: logging.info('发送kafka消息失败', e)
这里需要注意几点:
①需要继承logging.Handler,重写emit方法,在emit方法中发送日志到kafka
②kafka的初始化连接需要放在class外面,否则个调用logging的地方都会进行一次kafka连接的初始化,更不能将kafka连接放在重写的emit方法中,如果放在其中,每次发送数据都会新建连接对象,影响程序性能
接着编写kafka_logging_utils.py,这里主要是定义logging的相关信息,以及在logging中添加1刚才编写的kafka的handler:
import logging from terminal.conf import get_conf_name from terminal.kafkaclient.kafka_logging_handler import KafkaLoggingHandler config_name = get_conf_name() class KafkaLoggingUtils: level = eval('logging.INFO') format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s' datefmt = '%a,%d %b %Y %H:%M:%S' name = __name__ @staticmethod def set_logging_params(params): level, format, datefmt = params if level is not None: KafkaLoggingUtils.level = level if format is not None: KafkaLoggingUtils.format = format if datefmt is not None: KafkaLoggingUtils.datefmt = datefmt @staticmethod def getLogger(name): # logging的一些format配置 logging.basicConfig(level=KafkaLoggingUtils.level, format=KafkaLoggingUtils.format, datefmt=KafkaLoggingUtils.datefmt) logger = logging.getLogger(name) # 将kafkahandler加入logging的handler中 handler = KafkaLoggingHandler() logger.addHandler(hdlr=handler) return logger
这里使用logger.addHandler方法将kafka的handler加入logging中,就能实现logging日志发送至kafka,在需要使用的地方调用:
from kafka_logging_utils import KafkaLoggingUtils logging = KafkaLoggingUtils.getLogger(__name__) logging.info('测试kafka日志')
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)