kafka消费者——confluent

kafka消费者——confluent,第1张

python之kafka消费者——confluent-kafka
  • 一、版本
  • 二、需求概述
  • 三、解决思路
  • 四、执行
    • 4.1 获取每个分区消费的offset信息
    • 4.2 启动一个消费者
    • 4.3 指定offsets消费
    • 4.4 进行消费
    • 4.5 完整代码

一、版本

kafka:2.5.0
python:3.6.1
confluent-kafka:1.5.0
confluent-avro:1.5.0
avro-python3:1.9.1

二、需求概述

前置条件:使用kafka connect 消费kafka 数据写入hive 表。
前端会有一个写入状态表,告诉我们什么时候写完,但是遇到问题是我们会拉取该状态表然后会再加上配置kafka connect 延迟时间去启动一个处理程序去处理hive表的数据,但由于数据激增最高峰已达到90M/S且都是写入了一个分区中,所以导致数据消费延迟,但work的启动却没能自动适配,导致交付的数据出现缺失问题,所以需要启动一个自定义的kafka 消费者去指定偏移量进行消费,判断消费内容是否消费。判断内容忽略,本篇只介绍打通kafka 消费者部分。

三、解决思路
  1. 使用CMAK rest api去拿到当前每个分区消费的offset信息。
  2. 启动一个消费者
  3. 指定分区offset进行消费
四、执行

由于前端写入使用的是confluent-kafka 的包,指定了avro 格式,所以下游消费也会使用该包
confluent-kafka api文档

4.1 获取每个分区消费的offset信息
config = {
    'cluster_name': "cluster_name",
    'consumer_name': "consumer_name",
    'topic_name': "topic_name",
    'host': "http://cmak",
}
base_url = "{host}/api/status/{cluster_name}/{consumer_name}/{topic_name}/KF/topicSummary".format(
    **config)

def get_partition_offset(base_url):
    r = requests.get(base_url, verify=False)
    response = json.loads(r.content)
    return response.get("partitionOffsets", None)

def get_topic_partition(topic, partition_offsets):
    correct = []
    i = 0
    for latest_offset in partition_offsets:
        correct.append(TopicPartition(topic, i, latest_offset))
        i += 1
    return correct

get_partition_offset 获取每个分区正在消费的offset信息,返回一个list列表!
get_topic_partition 根据对应的topic & partition_offsets 信息封装TopicPartition,用于指定分区偏移量消费

4.2 启动一个消费者
topic = "topic_name"
schema_registry_url = "http://kafka-schema-registry:8081"
kafka_topics = ["topic_name"]
kafka_servers = 'host1:9092, host1:9092, host1:9092'

c = Consumer({
    'bootstrap.servers': kafka_servers,
    'group.id': 'test_custom_cosumer'
})
register_client = CachedSchemaRegistryClient(url=schema_registry_url)
c.subscribe(kafka_topics)
partition_offsets = get_partition_offset(base_url)
topic_partition = get_topic_partition(topic, partition_offsets)
4.3 指定offsets消费
c.assign(topic_partition)
4.4 进行消费
while True:
    try:
        msg = c.poll(10)
    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('Message Value - ', unpack(msg.value()))
    print('Message Key - ', unpack(msg.key()))
    print('Topic - ', msg.topic())
    print('Pattition - ', msg.partition())
    print('Offset - ', msg.offset())
4.5 完整代码
import struct

import io

import json
import requests
from confluent_kafka import TopicPartition, Consumer
from avro.io import BinaryDecoder, DatumReader
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

MAGIC_BYTES = 0

topic = "topic_name"
schema_registry_url = "http://kafka-schema-registry:8081"
kafka_topics = ["topic_name"]
kafka_servers = 'host1:9092, host2:9092, host3:9092'

config = {
    'cluster_name': "cluster_name",
    'consumer_name': "consumer_name",
    'topic_name': "topic_name2",
    'host': "http://host",
}

base_url = "{host}/api/status/{cluster_name}/{consumer_name}/{topic_name}/KF/topicSummary".format(
    **config)


def get_partition_offset(base_url):
    r = requests.get(base_url, verify=False)
    response = json.loads(r.content)
    print(response)
    return response.get("partitionOffsets", None)


def get_topic_partition(topic, partition_offsets):
    correct = []
    i = 0
    for latest_offset in partition_offsets:
        correct.append(TopicPartition(topic, i, latest_offset))
        i += 1
    return correct


def unpack(payload):
    magic, schema_id = struct.unpack('>bi', payload[:5])
    if magic == MAGIC_BYTES:
        schema = register_client.get_by_id(schema_id)
        reader = DatumReader(schema)
        output = BinaryDecoder(io.BytesIO(payload[5:]))
        content = reader.read(output)
        return content
    else:
        return payload.decode()


c = Consumer({
    'bootstrap.servers': kafka_servers,
    'group.id': 'test_custom_cosumer'
})
register_client = CachedSchemaRegistryClient(url=schema_registry_url)
c.subscribe(kafka_topics)

partition_offsets = get_partition_offset(base_url)
topic_partition = get_topic_partition(topic, partition_offsets)
c.assign(topic_partition)

while True:
    try:
        msg = c.poll(10)
    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('Message Value - ', unpack(msg.value()))
    print('Message Key - ', unpack(msg.key()))
    print('Topic - ', msg.topic())
    print('Pattition - ', msg.partition())
    print('Offset - ', msg.offset())

c.close()

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/893402.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-14
下一篇 2022-05-14

发表评论

登录后才能评论

评论列表(0条)

保存