精华总结:依靠主题分区来类似分库分表的方式提高性能,用 副本主从 同步+ ISR(偏移量和HW) 来保证消息队列的可靠性,消费者提交 消费位移 来保证消息不丢失和重复消费等,用ZK来处理 服务发现 ,负载均衡,选举,集群管理,消费位移记录(以被推荐记录于kafka主题内)等。
HW之前的消息才能被消费者拉取,理解为都同步备份完了,才算生产者消息提交成功,对消费者可见。这种ISR机制影响了性能但是保证了可靠性,保证消息不丢失。消费位移提交,默认的是自动提交,异常下消息会重复消费会丢失,但可以参数配置手动提交,自行在业务处理完再提交。消费者拉的方式自主获取消费,便于消费者自行控制消费速率。默认分区规则是哈希一致性方式。
相比 Redis消息队列 本身的可靠性就不如,被消费者拉取完就认为消费完了,消息丢失,所以一般需要自行维护ack机制。
Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。即使是普通的服务器, Kafka也可以轻松支持每秒百万级的写入请求 ,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。 Kafka速度的秘诀在于 ,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。
一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡,在下一课介绍)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。
Kafka 节点的 broker涉及 Topic、Partition 两个重要概念
在 Kafka 架构中,有几个术语:
Producer :生产者,即消息发送者,push 消息到 Kafka 集群中的 broker(就是 server)中;
Broker :Kafka 集群由多个 Kafka 实例(server) 组成,每个实例构成一个 broker,说白了就是服务器;
Topic :producer 向 kafka 集群 push 的消息会被归于某一类别,即Topic,这本质上只是一个逻辑概念,面向的对象是 producer 和 consumer,producer 只需要关注将消息 push 到哪一个 Topic 中,而 consumer 只需要关心自己订阅了哪个 Topic;
Partition :每一个 Topic 又被分为多个 Partitions,即物理分区;出于负载均衡的考虑,同一个 Topic 的 Partitions 分别存储于 Kafka 集群的多个 broker 上;而为了提高可靠性,这些 Partitions 可以由 Kafka 机制中的 replicas 来设置备份的数量;如上面的框架图所示,每个 partition 都存在两个备份;
Consumer :消费者,从 Kafka 集群的 broker 中 pull 消息、消费消息;
Consumer group :high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;
replicas :partition 的副本,保障 partition 的高可用;
leader :replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;
follower :replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;
controller :Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;
ZooKeeper :Kafka 通过 ZooKeeper 来存储集群的 meta 信息等,文中将详述。
一个 topic 可以认为是一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。 Kafka 机制中,producer push 来的消息是追加(append)到 partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存,如下示意图:
Kafka 中 topic 的每个 partition 有一个预写式的日志文件,虽然 partition 可以继续细分为若干个 segment 文件,但是对于上层应用来说,仍然可以将 partition 看成最小的存储单元(一个有多个 segment 文件拼接的 “巨型” 文件),每个 partition 都由一些列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。
上图中有两个新名词:HW 和 LEO。这里先介绍下 LEO,LogEndOffset 的缩写,表示每个 partition 的 log 最后一条 Message 的位置。HW 是 HighWatermark 的缩写,是指 consumer 能够看到的此 partition 的位置,这个涉及到多副本的概念,这里先提及一下,下文再详述。
言归正传,为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。如下图所示,Kafka 集群中有 4 个 broker,某 topic 有 3 个 partition,且复制因子即副本个数也为 3:
如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。
上一节中讲到了同步副本队列 ISR(In-Sync Replicas)。虽然副本极大的增强了可用性,但是副本数量对 Kafka 的吞吐率有一定影响。默认情况下 Kafka 的 replica 数量为 1,即每个 partition 都只有唯一的 leader,无 follower,没有容灾能力。为了确保消息的可靠性,生产环境中,通常将其值(由 broker 的参数 offsetstopicreplicationfactor 指定)大小设置为大于 1,比如 3。 所有的副本(replicas)统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 维护 ISR 列表,follower 从 leader 同步数据有一些延迟(由参数 replicalagtimemaxms 设置超时阈值),超过阈值的 follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在 OSR 中。AR=ISR+OSR。
上面一节还涉及到一个概念,即 HW。HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。另外每个 replica 都有 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。
下图详细的说明了当 producer 生产消息至 broker 后,ISR 以及 HW 和 LEO 的流转过程:
由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式受限于复制最慢的 follower,会极大的影响吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据,降低可靠性。而 Kafka 使用 ISR 的策略则在可靠性和吞吐率方面取得了较好的平衡。
Kafka 的 ISR 的管理最终都会反馈到 ZooKeeper 节点上,具体位置为:
/brokers/topics/[topic]/partitions/[partition]/state
目前,有两个地方会对这个 ZooKeeper 的节点进行维护。
Controller 来维护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 写入 ZooKeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。
leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR,如果发现 ISR 变化,则会将新的 ISR 的信息返回到 ZooKeeper 的相关节点中。
考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:follower1 同步了消息 4、5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader,那么此时 follower1 中的多出的消息 5 该做如何处理呢?
类似于木桶原理,水位取决于最低那块短板。
如上图,某个 topic 的某 partition 有三个副本,分别为 A、B、C。A 作为 leader 肯定是 LEO 最高,B 紧随其后,C 机器由于配置比较低,网络比较差,故而同步最慢。这个时候 A 机器宕机,这时候如果 B 成为 leader,假如没有 HW,在 A 重新恢复之后会做同步(makeFollower) *** 作,在宕机时 log 文件之后直接做追加 *** 作,而假如 B 的 LEO 已经达到了 A 的 LEO,会产生数据不一致的情况,所以使用 HW 来避免这种情况。 A 在做同步 *** 作的时候,先将 log 文件截断到之前自己的 HW 的位置,即 3,之后再从 B 中拉取消息进行同步。
如果失败的 follower 恢复过来,它首先将自己的 log 文件截断到上次 checkpointed 时刻的 HW 的位置,之后再从 leader 中同步消息。leader 挂掉会重新选举,新的 leader 会发送 “指令” 让其余的 follower 截断至自身的 HW 的位置然后再拉取新的消息。
当 ISR 中的个副本的 LEO 不一致时,如果此时 leader 挂掉,选举新的 leader 时并不是按照 LEO 的高低进行选举,而是按照 ISR 中的顺序选举。
在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的 消费进度 Offset 记录到 ZooKeeper上 ,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。Offset 在 ZooKeeper 中由一个专门节点进行记录,其节点路径为:
#节点内容就是Offset的值。/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
PS:Kafka 已推荐将 consumer 的 Offset 信息保存在 Kafka 内部的 topic 中,即:
__consumer_offsets(/brokers/topics/__consumer_offsets)
并且默认提供了 kafka_consumer_groupssh 脚本供用户查看consumer 信息(命令:sh kafka-consumer-groupssh –bootstrap-server –describe –group )。在当前版本中,offset 存储方式要么存储在本地文件中,要么存储在 broker 端,具体的存储方式取决 offsetstoremethod 的配置,默认是存储在 broker 端。
在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:broker 注册、topic 注册、producer 和 consumer 负载均衡、维护 partition 与 consumer 的关系、记录消息消费的进度以及 consumer 注册等。
参考原文:
再谈基于 Kafka 和 ZooKeeper 的分布式消息队列原理Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中Spark Streaming作为后端流引擎配合Kafka作为前端消息系统正成为当前流处理系统的主流架构之一。
然而,当下越来越多的安全漏洞、数据泄露等问题的爆发,安全正成为系统选型不得不考虑的问题,Kafka由于其安全机制的匮乏,也导致其在数据敏感行业的部署存在严重的安全隐患。本文将围绕Kafka,先介绍其整体架构和关键概念,再深入分析其架构之中存在的安全问题,最后分享下Transwarp在Kafka安全性上所做的工作及其使用方法。
Kafka架构与安全
首先,我们来了解下有关Kafka的几个基本概念:
Topic:Kafka把接收的消息按种类划分,每个种类都称之为Topic,由唯一的Topic Name标识。
Producer:向Topic发布消息的进程称为Producer。
Consumer:从Topic订阅消息的进程称为Consumer。
Broker:Kafka集群包含一个或多个服务器,这种服务器被称为Broker。
Kafka的整体架构如下图所示,典型的Kafka集群包含一组发布消息的Producer,一组管理Topic的Broker,和一组订阅消息的Consumer。Topic可以有多个分区,每个分区只存储于一个Broker。Producer可以按照一定的策略将消息划分给指定的分区,如简单的轮询各个分区或者按照特定字段的Hash值指定分区。Broker需要通过ZooKeeper记录集群的所有Broker、选举分区的Leader,记录Consumer的消费消息的偏移量,以及在Consumer Group发生变化时进行relalance Broker接收和发送消息是被动的:由Producer主动发送消息,Consumer主动拉取消息。
然而,分析Kafka框架,我们会发现以下严重的安全问题:
1网络中的任何一台主机,都可以通过启动Broker进程而加入Kafka集群,能够接收Producer的消息,能够篡改消息并发送给Consumer。
2网络中的任何一台主机,都可以启动恶意的Producer/Consumer连接到Broker,发送非法消息或拉取隐私消息数据。
3Broker不支持连接到启用Kerberos认证的ZooKeeper集群,没有对存放在ZooKeeper上的数据设置权限。任意用户都能够直接访问ZooKeeper集群,对这些数据进行修改或删除。
4Kafka中的Topic不支持设置访问控制列表,任意连接到Kafka集群的Consumer(或Producer)都能对任意Topic读取(或发送)消息。
随着Kafka应用场景越来越广泛,特别是一些数据隐私程度较高的领域(如道路交通的视频监控),上述安全问题的存在犹如一颗定时炸d,一旦内网被黑客入侵或者内部出现恶意用户,所有的隐私数据(如车辆出行记录)都能够轻易地被窃取,而无需攻破Broker所在的服务器。
Kafka安全设计
基于上述分析,Transwarp从以下两个方面增强Kafka的安全性:
身份认证(Authentication):设计并实现了基于Kerberos和基于IP的两种身份认证机制。前者为强身份认证,相比于后者具有更好的安全性,后者适用于IP地址可信的网络环境,相比于前者部署更为简便。
权限控制(Authorization):设计并实现了Topic级别的权限模型。Topic的权限分为READ(从Topic拉取数据)、WRITE(向Topic中生产数据)、CREATE(创建Topic)和DELETE(删除Topic)。
基于Kerberos的身份机制如下图所示:
Broker启动时,需要使用配置文件中的身份和密钥文件向KDC(Kerberos服务器)认证,认证通过则加入Kafka集群,否则报错退出。
Producer(或Consumer)启动后需要经过如下步骤与Broker建立安全的Socket连接:
1Producer向KDC认证身份,通过则得到TGT(票证请求票证),否则报错退出
2Producer使用TGT向KDC请求Kafka服务,KDC验证TGT并向Producer返回SessionKey(会话密钥)和ServiceTicket(服务票证)
3Producer使用SessionKey和ServiceTicket与Broker建立连接,Broker使用自身的密钥解密ServiceTicket,获得与Producer通信的SessionKey,然后使用SessionKey验证Producer的身份,通过则建立连接,否则拒绝连接。
ZooKeeper需要启用Kerberos认证模式,保证Broker或Consumer与其的连接是安全的。
Topic的访问控制列表(ACL)存储于ZooKeeper中,存储节点的路径为/acl/<topic>/<user>,节点数据为R(ead)、W(rite)、C(reate)、D(elete)权限的集合,如/acl/transaction/jack节点的数据为RW,则表示用户jack能够对transaction这个topic进行读和写。
另外,kafka为特权用户,只有kafka用户能够赋予/取消权限。因此,ACL相关的ZooKeeper节点权限为kafka具有所有权限,其他用户不具有任何权限。
构建安全的Kafka服务
首先,我们为Broker启用Kerberos认证模式,配置文件为/etc/kafka/conf/serverproperties,安全相关的参数如下所示:
其中,authentication参数表示认证模式,可选配置项为simple, kerberos和ipaddress,默认为simple。当认证模式为kerberos时,需要额外配置账户属性principal和对应的密钥文件路径keytab
认证模式为ipaddress时,Producer和Consumer创建时不需要做任何改变。而认证模式为kerberos时,需要预先创建好相应的principal和keytab,并使用API进行登录,样例代码如下所示:
public class SecureProducer extends Thread {
private final kafkajavaapiproducerProducer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();
public SecureProducer(String topic) {
AuthenticationManagersetAuthMethod(“kerberos”);
AuthenticationManagerlogin(“producer1″, “/etc/producer1keytab”);
propsput(“serializerclass”, “kafkaserializerStringEncoder”);
propsput(“metadatabrokerlist”,
“172161190:9092,172161192:9092,172161193:9092″);
// Use random partitioner Don’t need the key type Just set it to Integer
// The message is of type String
producer = new kafkajavaapiproducerProducer<Integer, String>(
new ProducerConfig(props));
thistopic = topic;
}
Topic权限管理
Topic的权限管理主要是通过AuthorizationManager这个类来完成的,其类结构如下图所示:
其中,resetPermission(user, Permissions, topic) 为重置user对topic的权限。
grant(user, Permissions, topic) 为赋予user对topic权限。
revoke(user, Permissions, topic) 为取消user对topic权限。
isPermitted(user, Permissions, topic) 为检查user对topic是否具有指定权限。
调用grant或revoke进行权限设置完成后,需要commit命令提交修改到ZooKeeper
Kerberos模式下,AuthorizationManager需要先使用AuthenticationManagerlogin方法登录,与ZooKeeper建立安全的连接,再进行权限设置。示例代码如下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
propssetProperty(“authentication”, “kerberos”);
propssetProperty(“zookeeperconnect”, “172162116:2181,172162117:2181,172162118:2181″);
propssetProperty(“principal”, “kafka/host1@TDH”);
propssetProperty(“keytab”, “/usr/lib/kafka/config/kafkakeytab”);
ZKConfig config = new ZKConfig(props);
AuthenticationManagersetAuthMethod(configauthentication());
AuthenticationManagerlogin(configprincipal(), configkeytab());
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 17216187 on topic test
authzManagerresetPermission(“17216187″,
new Permissions(PermissionsREAD, PermissionsWRITE), “test”);
// grant permission WRITE to ip 17216187 on topic test
authzManagergrant(“17216187″, new Permissions(PermissionsCREATE), “test”);
// revoke permission READ from ip 17216187 on topic test
authzManagerrevoke(“17216187″, new Permissions(PermissionsREAD), “test”);
// commit the permission settings
authzManagercommit();
authzManagerclose();
}
}
ipaddress认证模式下,取消和赋予权限的 *** 作如下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
propssetProperty(“authentication”, “ipaddress”);
propssetProperty(“zookeeperconnect”,
“17216187:2181,17216188:2181,17216189:2181″);
ZKConfig config = new ZKConfig(props);
// new authorization manager
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 17216187 on topic test
authzManagerresetPermission(“17216187″,
new Permissions(PermissionsREAD, PermissionsWRITE), “test”);
// grant permission WRITE to ip 17216187 on topic test
authzManagergrant(“17216187″, new Permissions(PermissionsCREATE), “test”);
// revoke permission READ from ip 17216187 on topic test
authzManagerrevoke(“17216187″, new Permissions(PermissionsREAD), “test”);
// commit the permission settings
authzManagercommit();
authzManagerclose();
}
}
总结与展望
本文通过介绍Kafka现有架构,深入挖掘其中存在的安全问题,并给出Transwarp在Kafka安全上所做的工作及其使用方式。然而,纵观Hadoop & Spark生态系统,安全功能还存在很多问题,各组件的权限系统独立混乱,缺少集中易用的账户管理系统。某些组件的权限管理还很不成熟,如Spark的调度器缺少用户的概念,不能限制具体用户使用资源的多少。Transwarp基于开源版本,在安全方面已有相当多的积累,并持续改进开发,致力于为企业用户提供一个易用、高效、安全和稳定的基础数据平台。
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。kafka 是一个高性能的消息队列,也是一个分布式流处理平台。
kafka中文网
kafka官网
Producer :Producer即生产者,消息的产生者,是消息的入口。
kafka cluster :
Broker :Broker是kafka实例,每个服务器上有一个或多个kafka的实例,姑且认为每个broker对应一台服务器。一个集群由多个broker组成,集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Topic :消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition :Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。 同一个topic在不同的分区的数据是不重复的 ,partition的表现形式就是一个一个的文件夹!
Replication : 每一个分区都有多个副本 ,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message :每一条发送的消息主体。
Consumer :消费者,即消息的消费方,是消息的出口。
Consumer Group :将多个消费组成一个消费者组。在kafka的设计中 同一个分区的数据只能被同一消费者组中的某一个消费者消费 。Partition 的分配问题,即确定哪个 Partition 由哪个 Consumer 来消费。Kafka 有两种分配策略,一个是 RoundRobin,一个是 Range,默认为Range。
一个消费者组内也可以订阅多个topic
多个消费组可以订阅同一个topic 。
Zookeeper :kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
使用brew进行安装,非常方便。
ZooKeeper是一个分布式的,开放源码的 分布式应用程序协调服务 ,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
kafka是基于zookeeper的,启动kafka之前,需要先启动zookeeper
查看启动是否成功
启动kafka
查看启动是否成功
查看topic列表
新起一个终端,作为生产者,用于发送消息,每一行算一条消息,将消息发送到kafka服务器
新起一个终端作为消费者,接收消息
服务关闭的顺序是先kafka,然后zookeeper
再过半小时,你就能明白kafka的工作原理了
Kafka架构原理,也就这么回事!
入门请参照: >建议从头阅读:
银行系统中的消息分发利器Kafka(一)
银行系统中的消息分发利器Kafka(二)
6、Partition
上次我们说到,Kafka可以存储数据,而且数据按照Topic进行分类。
这些存储的数据可能会很大,这可能会给Kafka的Broker带来很大的存储压力。
一个好的解决办法就是把这些数据拆成一个或多个Partition:
然后,把这多个Partition分发到不同的服务器上。
Kafka是一个分布式系统,所以对数据文件的Partition进行分布式管理是很方便的。
随之,另外一个问题来了,我们要把数据分成多少个Partition呢?
在每一个Partition 中,第一个消息的Offset就是0,第二个就是1,以此类推。另外,Offset并不是一个全局的ID,它只作用于所属的Partition。所以,在同一个Partition中,不会有相同的Offset。
结合上面的知识,我们可以知道,如果要在Kafka中定位一个消息信息,就是先找到Topic,然后找到Partition,最后找到Offset。
8、Consumer Group
先把前面的场景复习一下。
首先我们有很多节点的数据要收集,于是我们通过Kafka来实现:
然后我们为每一个节点创建一个Producer:
这时你会发现,处理压力跑到Conumser那里了,于是我们就需要一个Consumer Group了。
Kafka的几个重要的概念就介绍完了。后面我会逐步深入的介绍Kafka的一些细节,欢迎关注~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)