Kafka - a simple consumer demo - c++

Kafka - a simple consumer demo - c++,第1张

Kafka - a simple consumer demo - c++ Kafka

如果有 kafka 基础的同学可以不用看前面的废话,可以从第五条 [配置] 开始看起~ 代码在第七条

前言:官网比我这标准多了~ 官网跳转,大家可以先完成quickStart部分kafka单机生产消费 一、概念简介

Kafka 是一个分布式流处理平台,常被用作消息队列中间件。具有横向扩展,容错,高速等优点。

Kafka中的概念
  • record: 由 key , value , timestamp 组成,Kafka 集群会保持所有的消息,直到它们过期,无论消息是否被消费。Kafka 的性能是和数据大小是无关的,所以可以长期保存数据。
  • producer: 生产者用于发布消息。
  • consumer: 消费者用于订阅消息。
  • consumer group: 相同的 group.id 的消费者将视为同一个消费者组。
  • topic: 消息的一种逻辑分组,用于对消息分类。相同主题的消息放在一个队列中。
  • partition: 消息的一种物理分组,一个主题被称为多个分区,每个分区就是一个顺序的、不可变的消息队列,并且可以持续添加。每个分区对应一个逻辑 log,有多个 segment 组成。
  • offset: 分区中的每个消息都有一个唯一的 id,称之为偏移量。它代表已经消费的位置。可以手动或自动提交偏移量。
  • broker: 一台 Kafka 服务器称之为一个 broker
  • replica: 副本只是一个分区的备份。副本从不读取或写入数据,它们用于防止数据丢失
  • leader: Learder 是负责给定分区的所有读取和写入的节点。每个分区都有一个服务器充当 Leader, producer 和 consumer 只跟 Leader 交互
  • follower: 跟随 Leader 指令的节点成为 Follower。如果 Leader 失败,一个 Follower 将自动成为 Leader。 Follower 作为正常的消费者,拉取消息并更新其自己的数据存储。副本中的一个角色,从 Leader 中复制数据。
  • zookeeper: Kafka 代理都是无状态的,所以使用 Zookeeper 管理集群状态。Zookeeper 用于管理和协调 Kafka 代理
二、适用场景
  • MQ - 构造实时流数据管道,它可以在系统或应用之间可靠的获取数据。
  • 流处理 - 构建实时流式应用程序,对这些流数据进行转换或者影响。
三、四个核心 API
  1. Producer API : 允许一个应用程序发布一串流式数据到一个或者多个 Kafka topic。
  2. Consumer API : 允许一个应用程序订阅一个或者多个 topic,并且对发布给他们的流式数据进行处理。
  3. Streams API : 允许一个应用程序作为一个流处理器,消费一个或者多个 topic 产生的输入流,然后生产一个输出流到一个或者多个 topic 中去,在输入输出流之间进行有效的转换。
  4. Connector API : 允许构建并运行可重用的生产者或者消费者,将 Kafka topics 连接到已存在的应用程序或者数据系统。eg:连接到一个 DB,捕捉表的所有
四、Topics 和 log
  1. topic 就是数据主题。kafka 采用多订阅者模式,一个 topic 可以拥有一个或者多个消费者订阅它的数据。
  2. 对于每一个 topic,Kafka 集群都会维持一个分区日志。每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的 commit log 文件。分区中的每一个记录都会分配一个 id 号来表示顺序,也就是偏移量 offset,offset 用来唯一的标识分区中的每一条记录。
  3. Kafka 集群保留所有发布的记录 - 无论他们是否已被消费 - 并通过一个可配置的参数 – 保留期限 – 来控制。如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。 Kafka 的性能和数据大小无关,所以长时间存储数据没有什么问题。
  4. 消费者可以采用任何顺序来消费,生产者对 topic 内容进行增加并不会影响已存在的消费者消费数据。
  5. 日志中的 partition 有以下几个用途 – 当日志大小超过了单台服务器的限制,允许日志进行扩展。每个独立的分区都必须首先与主机的文件限制,不过一个主题可能有多个分区,所有可以处理无限量的数据 – 第二,可以作为并行的单元集。
五、server.properties
############################# Server Basics #############################

# 用于服务的 broker id。如果没设置,将生成一个唯一的 broker id。为了避免 zk 生成的 id 和用户配置的 id 相冲突,生成的 id 将在 
# reserved.broker.max.id 的值的基础上加1.
broker.id=0 # kafka 核心配置之一. DEFAULT: broker.id = -1.

############################# Socket Server Settings #############################

# 服务器用于从接受网络请求并发送网络响应的线程数
num.network.threads=3

# 服务器用于请求处理的线程数,可能包括磁盘I/O
num.io.threads=8

# 服务端用来处理 socket 链接的 SO_SNDBUFF 缓冲大小(int)。如果值为-1,则使用系统默认值 102400.
socket.send.buffer.bytes=102400

# 服务端用来处理 socket 链接的 SO_RCVBUFF 缓冲大小(int)。如果值为-1,则使用系统默认值 102400.
socket.receive.buffer.bytes=102400

# socket 请求的最大大小,为了防止OOM,不能大于 Java heap 的大小
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
# 一个以逗号分隔的目录列表,用于存储日志文件。
# 如果未设置将使用 log.dir 的配置
log.dirs=/tmp/kafka-logs # kafka 的核心配置之一 (string 类型)。DEFAULT: null
# log.dir = /tmp/kafka-logs 保存日志的目录,对 log.dirs 的补充。 string类型,DEFAULT: /tmp/kafka-logs

# 每个主题的默认日志分区数。更多的分区允许更大的并行,但也会导致跨代理的更多文件!
# DEFAULT: 1 (int 类型)
num.partitions=1

# 每个数据目录,用于启动时日志恢复和关闭时刷新的线程数。
# This value is recommended to be increased for installations with data dirs located in RAID array.
# 官方翻译 - 对于数据目录位于 RAID 阵列的安装,建议增加此值。
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# 对于开发测试以外的任何事,建议使用大于1的任何值来保证可用性,例如3

# offset topic 的副本数 (设置的越大,可用性越高)。内部 topic 将创建失败,直到集群大小满足此副本要求
# DEFAULT: 3 (short 类型)
offsets.topic.replication.factor=1

# 事务 topic 的副本数 (设置的越大,可用性越高)。内部 topic 将创建失败,直到集群大小满足此要求
# DEFAULT: 3 (short 类型)
transaction.state.log.replication.factor=1

# 覆盖事务 topic 的 min.insync.replicas 配置
# 至于 min.insync.replicas 到底是什么,我现在还不理解,而且默认配置文件没有,先暂且不管吧~
# DEFAULT: 2 (int 类型)
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
# 日志刷新策略 - 默认配置文件没写就按照 kafka 的默认配置来吧~
# 消息会立即刷新到文件系统但默认情况下只有 fsync() 进行同步
# 下面是需要权衡的几点,下面的配置控制将数据刷新到磁盘:
# 1. 持久性 - 如果不使用副本,未刷新的数据可能会丢失
# 2. 延迟性 - 当有大量的数据要刷新时,非常大的刷新间隔可能会导致延迟峰值
# 3. 吞吐量 - flush通常是大开销的 *** 作,一个小的flush间隔可能会导致过多的seek
# 在强制将数据刷新到磁盘之前要接受的消息数
# log.flush.interval.messages=10000
############################# Log Retention Policy #############################

# 以下配置控制 log segment 的处理。该策略可以设置为在一段时间后或者累积给定大小后删除段。
# 满足这些条件的任何一个,就会删除一个 segment。删除总是从日志的末尾开始。

# 日志删除的时间阈值 - 小时 为单位。 DEFAULT: 168 (int 类型)
log.retention.hours=168

# 基于大小的日志保留策略。除非剩余的段低于 log.retention.bytes,否则将从日志中修剪段。功能独立于 log.retention.hours。
#log.retention.bytes=1073741824

# 单个日志文件的最大大小
log.segment.bytes=1073741824

# 日志检查时间间隔,查看是否可以根据保留策略进行保留 - 毫秒 为单位。
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# 每一个都会对应一个zk,是一个逗号分隔的字符串。
# e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# 将可选的 chroot 字符串附加到 url 以制定所有 kafka znode 的根目录。
zookeeper.connect=localhost:2181  # kafka的核心配置之一。

# 与 ZK server 建立连接的超时时间,没有配置就使用 zookeeper.session.timeout.ms
# zookeeper.session.timeout.ms = 6000 (int 类型)
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################

# 以下配置指定 GroupCoordinator 延迟初始消费者重新平衡的时间,以毫秒为单位。
# 随着新成员加入组,重新平衡将进一步延迟 group.initial.rebalance.delay.ms 的值,最多可达 max.poll.interval.ms
# 默认值是3秒
# 这里使用0的原因是,它可以为开发和测试提供更好的开箱即用体验。但是在生产环境中,默认值3秒更为合适!
# 这有助于避免在应用程序启动期间进行不必要的且可能代价高昂的重新平衡。
group.initial.rebalance.delay.ms=0
六、关于 consumer.properties, log4j.properties, producer.properties

这里参考 kafka 官网的配置文档,默认配置都比较简单,需要什么取什么~

七、一个简单的 c++ - consumer 消费者实例

血与泪的教训:别在 mac 本地开发!clang环境需要装12G的Xcode!

# 使用 gcc 也行
yum install clang
# 有版本输出就说明安装ok
clang -v 
# 如果报错:/usr/bin/ld: cannot find -ldstdc++ 就装上
yum install -y libstdc++*

# 安装 kafka c++ 客户端
yum install librdkafka-devel

现在环境就ok了。开始编写代码。

#include 
#include "librdkafka/rdkafkacpp.h"
#include 
#include 
#include 
using namespace std;

void msg_consume(RdKafka::Message* msg) {
        std::cout << "msg::topic_name: " << msg->topic_name().c_str() << endl;
        if (msg->err() == RdKafka::ERR_NO_ERROR) {
                std::cout << "Read msg at offset " << msg->offset() << std::endl;
                if (msg->key()) {
                        std::cout << "Key: " << *msg->key() << std::endl;
                }
                printf("%.*sn", static_cast(msg->len()), static_cast(msg->payload()));
        } else if (msg->err() == RdKafka::ERR__TIMED_OUT) {
                printf("error[%s]n", "ERROR__TIMED_OUT");
        } else {
                printf("error[%s]n", "other");
        }
}

int main() {
        std::string err_string;
        int32_t partition = RdKafka::Topic::PARTITION_UA;
  			// 分区 0
        partition = 0;

  			// 代理地址,也就是 kafka 运行端口
        std::string broker_list = "localhost:9092";
        RdKafka::Conf* global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
        RdKafka::Conf* topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

        int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
  			// 这里配置字段都是约定好的,配置文件我会贴在附录里
        global_conf->set("metadata.broker.list", broker_list, err_string);

  			// 创建消费者
        RdKafka::Consumer* consumer = RdKafka::Consumer::create(global_conf, err_string);
        if (!consumer) {
                printf("failed to create consumer, %sn", err_string.c_str());
                return -1;
        }
        printf("create consumer %s n", consumer->name().c_str());

  			// 创建topic
        std::string topic_name = "kafka-test-wayne";
        RdKafka::Topic* topic = RdKafka::Topic::create(consumer, topic_name, topic_conf, err_string);
  			if (!topic) {
                printf("try create topic[%s] failed, %sn", topic_name.c_str(), err_string.c_str());
                return -1;
        }
        printf("create topic[%s] successd. n", topic_name.c_str());
				
  			// 在 topic 下 partition 区 start offset 处 开始消费
        RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
        if (resp != RdKafka::ERR_NO_ERROR) {
                printf("Failed to start consumer: %sn", RdKafka::err2str(resp).c_str());
                return -1;
        }

  			// 一直消费。control + z 结束
        while (true) {
                RdKafka::Message* msg = consumer->consume(topic, partition, 2000);
                printf("topic[%s], partition[%d], start consumen", topic_name.c_str(), partition);
                msg_consume(msg);
                delete msg;
        }

        consumer->stop(topic, partition);
        consumer->poll(1000);

        delete topic;
        delete consumer;

        return 0;
}

按照 quickStart 中的方法,先开启 zk 和 kafka。
服务端要配置服务器监听端口,在 server.properties 中修改

# 加上这个配置 这样就能监听到9092端口
listeners=PLAINTEXT://:9092

然后开启 kafka 的生产者控制台
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-test-wayne 生产几条数据

编译c++代码,开始消费
clang++ consumer.cpp -std=c++11 -lrdkafka++ 默认会生成 a.out 文件,执行这个文件 ./a.out

可以看到已经成功消费~。

八、附录
  1. c++ kafka 客户端:https://github.com/edenhill/librdkafka
  2. kafka 官网:https://kafka.apachecn.org/
  3. librdkafka配置文件:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  4. 后续代码我会放在 https://github.com/shishc9/kafka-demo

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5634748.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存