http://notes.stephenholiday.com/Kafka.pdf
首先我们介绍 Kafka 的基本概念。
topic: 把特定类型的消息流称为 topicproducer: 生产者能把消息发到 topicbroker:被发布的消息会保存在一些服务器,这些服务器称为 brokersconsumer: 消费者能从 broker 订阅一个或多个 topic,并且能从 broker 拉取并处理消息
消息传送就是这么简单,Kafka API 会反映这一简单性。下面展示了生产者发送消息的伪代码。一则消息被定义为一串字节。用户可以自定义编码方式,将消息转成字节数组然后发送出去。而且支持在一次请求中发送多个消息。
Sample producer code:
producer = new Producer(…); message = new Message(“test message str”.getBytes()); set = new MessageSet(message); producer.send(“topic1”, set);
Sample consumer code:
streams[] = Consumer.createMessageStreams(“topic1”, 1) for (message : streams[0]) { bytes = message.payload(); // do something with the bytes }
消费者基于 topic 创建若干个消息流,发送到 topic 的消息最终被到这些消息流。
关于分发的细节将会在 3.2 节介绍。 每个消息流提供了迭代器,消费者通过迭代器来获取并处理每条消息的内容。迭代的过程是不会终止的。如果流里面没有消息,轮询就被阻塞,直到新的消息到达。我们支持集群消费,多个消费者共享一个 topic 的消息,也支持广播消费,每个消费者都消费 topic 的全量消息。
( We support both the point-to-point delivery model in which multiple consumers jointly consume a single copy of all messages in a topic, as well as the publish/subscribe model in which multiple consumers each retrieve its own copy of a topic. )
图一展示了 Kafka 的架构。Kafka 天生是分布式的,一个 kafka 集群拥有多个 brokers 。为了负载均衡,每个 topic 分为多个分区,每个 broker 存储一到多个这些分区。多个 producers 和 consumers 同时发送消息和拉取消息。
在 3.1 节,我们会介绍一个 broker 上的某个分区,分享一些提高分区性能 (partition efficient) 的设计思路。在 3.2 节,我们阐述多个 producers 和 consumers 在分布式环境下如何与 brokers 交互。在 3.3 节,我们讨论 kafka 的消息传递的可靠性。
3.1 单个分区的性能我们做了一些决定来提高分区的性能。
简单存储(Simple storage):kafka 使用了非常简单的存储机制。topic 下的每个分区对应一份逻辑日志。一份日志在物理上用多个文件段来实现,每个文件的大小基本相同,比如 1 G。每当生产者发布一条消息到分区,broker 只是把消息追加到最后一个文件段的末尾。为了更好的性能,当消息达到一定数量,或者经过一定时间才会保存到磁盘。一则消息只有当刷盘之后才会暴露给消费者。
与大多数消息系统不同,kafka 的消息没有使用 message id 来标识,而是利用在 log 文件中的逻辑偏移量(offset)来定位。避免维护一个复杂的、随机访问的、用 message id 查找 message 实际地址的映射机制。注意我们的 message id 是递增的但不一定连续。为了计算下一条消息的id,我们需要记录当前消息的长度。从现在开始,我们说的 message id 和 offsets 是等价的。(译者注:kafka 的 offset 相当于 message id,一般说 offset 更多一些)。
一个消费者实例通常从分区顺序消费消息。如果消费者提交了某条消息的 offset(acknowledges a particular message offset),意味着在此 offset 之前的消息都被该消费者收到了。消费者向 brokers 发起拉取请求,获取一份可供消费的数据。每个拉取请求包含消费者当前的消费位移,以及可接受的字节数。每个 broker 在内存维护了一个排序列表,记录每个文件段第一条消息的位移。broker 通过遍历排序列表,在文件段上找到指定的消息,然后返回给消费者。消费者收到消息之后,计算出下一个待消费消息的偏移值,然后用于下一次拉取请求。图2 展示了 Kafka 日志的模式以及内存的排序列表,每个小格显示了消息位移:
我们非常关注 Kafka 的数据吞吐量。前文介绍了 producers 能够在一次请求中,提交多条消息。虽然终端的 consumers 一次只遍历一个消息流,但它在一次拉取请求,通常拉取特定大小的消息,一般是几百个kb。
另外一个比较不寻常的决定是,我们不把消息存储在 Kafka 应用层的内存,而是依赖文件系统的 page cache。带来的一个巨大的好处,在于避免维护两份缓存——消息只会存在 page cache。还有另外一个好处在于,即使 Kafka 崩溃重启,page cache 的缓存依然有效。又由于 Kafka 不会在自己的进程中持有消息内容,在基于虚拟机实现的程序上,只需要付出很少的 GC 开销。最后,producers 和 consumers 都是持续不断地访问文件段,通常 consumers 的进度会稍稍落后 producers ,因此 *** 作系统的高速缓存得到了高效利用(只需要写缓存和提前读)。我们看到,即使数据量线性增长到 TB 级别, producers 和 consumers 的表现依然强劲。
此外,我们还优化了 consumers 的网络访问。Kafka 是一个多消费者的系统,并且一条消息可以被不同的消费者分别消费。一个典型的从本地文件发送数据到远程套接字(remote socket)包括如下步骤:
- 在 *** 作系统中读取文件到 page cache从 page cache 拷贝数据到应用程序的缓存从应用程序缓存拷贝到另一个内核缓冲区发送内核缓冲区到 socket
这里包括4次数据拷贝和2次系统调用。在 Linux 和其它类 Unix 系统,存在一个 sendfile API ,可以直接从文件通道传输到 socket 通道。这避免了步骤2和3的两次数据拷贝,和一次系统调用。Kafka 利用了这个 API 高效地把日志段的字节数据,从 broker 发送到 consumers。
无状态的 broker(Stateless broker):与大多数其它消息系统不同,consumer 的消费状态并不是由 broker 维护。这样的设计极大地降低了 broker 的复杂度和负载。然而,这导致删除一条消息是比较难的,因为 broker 不知道它是否所有的 consumer 都消费了。Kafka 制定了一个基于时间的保留策略来解决该问题。如果一则消息保存在 broker 的时间超过一定周期,就会被删除,通常是 7 天。这种办法在实践中很有效。大多数 consumers,包括那些离线系统,都按天、按小时或者实时消费消息。Kafka 的性能不会随着数据量变大而下降,这一特性允许设置一个灵活的消息保留策略。
这个设计还带来另一个附加收益。一个 consumer 可以轻易把 offset 向前重置,并消费老的消息。这个特性似乎与队列的通常概念有所冲突,却是一个对于大多数消费者而言都很重要的特性。举例来说,当 consumer 的消费逻辑内部发生 error,修好 error 之后,它可以重新消费特定的消息。这对数据仓库来说很重要。另一个例子,consumers 可以对消息定期的持久化到磁盘里,或者一个全文搜索引擎。如果 consumers 挂了,未刷到磁盘的数据可能会丢。在这种场景,可以检查未被刷盘的数据的最小的 offset,并在 consumers 重启之后从该 offset 开始消费。我们还注意到,在拉模式下重置消费者,会比在推模式下重置消费者更容易实现。(译者注:consumers是从 broker 拉取消息,而不是 broker 推送)
3.2 分布式一致性(Distributed Coordination)我们现在描述了 producers 和 consumers 在分布式场景下如何工作。每个 producer 能发送消息到一个随机选择的分区,或者通过分区 key 和分区函数决定的分区。我们接下来讨论 consumers 是怎么和 brokers 交互的。
Kafka 有消费组(consumer group)的概念。每个消费组包含一个或者多个 consumer,能够消费一个或者多个 topic,每条消息只会投递给消费组中的其中一个 consumer。不同的消费组之间独立地消费全量的消息,并且相互之前不需要协调机制。(译者注:意味着不同消费组之间的消费进度互不影响)。相同消费组内的 consumer 可以是不同的进程或者不同的机器。我们的目标是把 brokers 的消息均匀地分发到不同的 consumers,避免在协调方面有过多的开销(without introducing too much coordination overhead)。
我们第一个决定是使分区(partition)成为 topic 下的最小并行单元。这意味着在任意给定时间,一个分区内的所有消息只会被每个消费组内的一个 consumer 消费(译者注,不同 consumer 不会竞争消费某个分区的消息,避免了加锁和等待)。如果是让组内多个 consumer 同时消费某分区,它们需要协调谁来消费、消费哪些消息,必须引入锁机制和维护消费状态。通常而言,在我们的设计里,只有在再平衡(rebalance)的时候才需要协调消费进程,而这不是一个频繁 *** 作。为了使负载真正均衡,我们要求 topic 下的分区数比消费组内的 consumers 个数多。通过对 topic 进行分区很容易达到这一点。
第二个决定是我们不采用中心主节点,而是采用分布式的方式,让 consumer 相互协调。引入主节点会使系统复杂度增加,因为需要考虑主节点挂掉的情况。为了使协调机制更好地运行,我们引入了高可用的分布式一致性服务 Zookeeper 。Zookeeper 拥有一个非常简单的文件系统。可以创建一个 path,设置 path 的值,读取 path 的值,删除一个 path,并且列举 path 下的子路径。它还可以做一些更有趣的事情,
(a) 可以在 path 上注册监听器, 并且当 path 或者 path 子路径的值发生变化时,通知监听者。
(b) 一个path可以是瞬时的(相对的有持久的),意思是当创建这个 path 的客服端关闭后,Zookeeper server 会删除 path。
(c ) Zookeeper 会复制数据给多个 servers,使得数据高可靠和高可用。
Kafka 使用 Zookeeper 处理以下任务
(1) 探测 brokers 和 consumers 的增加和移除。
(2) 当上述事件发生时,在每个 consumer 之间触发一个 rebalance 过程
(3) 维护各分区的消费关系,跟踪每个分区的消费位移。
特别地,当每个 broker 或者 consumer 启动, Zookeeper 会保存 brokers 或者 consumer 的注册表。
broker 注册表包含 broker 的主机和端口,以及 topic 和 分区 的集合信息。消费注册表包含 consumer 所属的消费组,以及它所订阅的 topic。ownership registry:每个消费组在 Zookeeper 中关联一个关系表和 offset 注册表。关系表为每个分区单独建立一个 path ,path 的值是当前消费该分区的 consumer (我们用一个术语来描述就是,consumer 掌管该分区)。offset registry :offset 注册表为每个被订阅的分区,记录了最后一条被消费的消息的 offset。
在 Zookeeper 创建的 path ,对于 broker 注册表、consumer 注册表、以及关系表来说是瞬时的,而对于 offset 注册表来说是持久的。如果 broker 挂了,上面的所有分区都自动从 broker 注册表中被移除。如果 consumer 挂了,consumer 注册表的注册关系将被删除,关系表的分区订阅关系也被移除。每个消费者在 broker 注册表 和消费者注册表上,注册了 Zookeeper 监听器,每当 broker 集合或者消费组变化时,都会被通知。
当 consumer 启动,或者通过监听器被通知到 broker 和其它 consumer 的变化时,consumers 执行一次 rebalance 过程,决定它应该消费哪些分区。这个过程由算法1 来描述。
通过从 Zookeeper 读取 broker 和 consumer 注册表,计算得到 topic 列表 (T),topic 的分区(P t),以及订阅 topic 的订阅者 (C t)。把 partitions 按照一定的算法,分配给不同的 consumer。consumer 分配到 partition 之后,在 ownership 注册表把自己标记为 partition 的 owner。最后,consumer 启动消费线程,从自己的 partition 拉取消息,拉取的位置记录在 offset registry。当 consumers 从 partition 拉取消息之后,会周期性地更新 offset registry 的消费 offset。
当某个 consumer group 中存在多个 consumer ,每个都会被通知关于 broker 和 consumer 的变更。然后,对不同的 consumer 通知的时机有所不同。因此可能出现某个 consumer 尝试去掌管一个仍被其它 consumer 控制的 partition 。当这种场景发生时,前者 consumer 会释放自己当前拥有的所有 partitions,等待一小会儿再重试 rebalance 的过程。(译者注:避免死锁)。实际场景中,经过少数几次重试 rebalance 之后就会稳定下来。
当某个 consumer group 刚被创建出来,offset registry 里不存在该消费组的 offset 。这种情况下,consumer 要么从分区的最小 offset 或者最大 offset 开始消费,取决于消费者通过 API 设置的配置。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)