需要从Kafka读取数据的应用程序使用KafkaConsumer来订阅Kafka主题并从这些主题接收消息。 从Kafka读取数据与从其他消息系统读取数据有点不同,其中涉及一些独特的概念和想法。 如果不先理解这些概念,就很难理解如何使用消费者API。 我们将首先解释一些重要的概念,然后通过一些示例展示使用消费者API实现不同需求的应用程序的不同方式。
4.1 Kafka消费者概念要理解如何从Kafka读取数据,首先需要了解它的消费者和消费者组。 下面几节将介绍这些概念。
消费者及消费者组
假设有一个应用程序需要读取来自Kafka主题的消息,运行一些验证,并将结果写入另一个数据存储。 在这种情况下,应用程序将创建一个消费者对象,订阅适当的主题,并开始接收消息、验证消息并写出结果。 这样可能会正常运行,但如果生产者向主题写入消息的速度超过了应用程序验证消息的速度,该怎么办? 如果仅限于单个消费者读取和处理数据,则应用程序可能会越来越落后,无法跟上传入消息的速度。 显然,需要根据主题来扩大消费。 就像多个生产者可以写入同一个主题一样,我们需要允许多个消费者从同一个主题读取数据,并在他们之间分割数据。
Kafka消费者通常是消费者组的一部分。 当多个消费者订阅了一个主题并属于同一个消费者组时,组中的每个消费者将接收来自主题中不同分区子集的消息。
让我们以主题T1为例,它有四个分区。 现在假设我们创建了一个新的消费者C1,它是G1组中的唯一消费者,并使用它订阅主题T1。 消费者C1将获得来自所有四个T1分区的所有消息。 见图4 - 1。
图4-1 一个消费者组消费四个分区中的消息
如果我们将另一个消费者C2添加到组G1中,每个消费者将只从两个分区获得消息。来自分区0和2的消息可能会转到C1,而来自分区1和3的消息则转到消费者C2。如图4 - 2所示。
图4-2 四个分区划分给组中的两个消费者
如果G1有四个消费者,那么每个消费者都将从单个分区读取消息。如图4-3所示。
图4-3 一组四个消费者,每个消费者一个分区
如果我们在单个主题的组中添加的消费者数量超过了分区的数量,那么一些消费者将处于空闲状态,根本不会收到任何消息。见图4 - 4。
图4-4 一个组中的消费者多于分区意味着有消费者处于空闲状态
我们从Kafka主题扩展数据消费的主要方式是将更多的消费者添加到一个消费者组中。 Kafka消费者通常会做一些高延迟的 *** 作,比如写数据库或者对数据进行耗时的计算。 在这些情况下,单个消费者不可能跟上数据流入主题的速率,通过添加更多的消费者,让每个消费者只拥有分区和消息的一个子集来分担负载,这是我们的主要扩展方法。 这是创建带有大量分区的主题的一个很好的理由——它允许在负载增加时添加更多的消费者。 请记住,在一个主题中添加超过分区数量的消费者是没有意义的——一些消费者将是空闲的。 第二章有关于如何在一个主题中选择分区数量的一些建议。
除了为了扩展单个应用程序而添加消费者外,通常还会有多个应用程序需要从相同的主题读取数据。 事实上,Kafka的主要设计目标之一是让Kafka主题产生的数据在整个组织的许多用例中可用。 在这些情况下,我们希望每个应用程序获得所有消息,而不仅仅是一个子集。 要确保应用程序获得主题中的所有消息,请确保应用程序拥有自己的消费者组。 与许多传统的消息系统不同,Kafka可以扩展到大量的消费者和消费者组,而不会降低性能。
在前面的示例中,如果我们添加一个带有单个消费者的新消费者组(G2),这个消费者将获得主题T1中的所有消息,而不依赖于G1正在做什么。 G2可以有多个消费者,在这种情况下,它们将各自获得分区的一个子集,就像我们在G1中展示的那样,但作为一个整体,G2仍然会获得所有消息,而不考虑其他消费者组。 见图4-5。
图4-5 添加一个新的消费者组,两个组都接收所有消息
综上所述,为每个需要消费来自一个或多个主题的所有消息的应用程序创建一个新的消费者组。 将消费者添加到现有消费者组中,以扩展从主题读取和处理消息的速度,组中的每个消费者将只获得消息的一个子集。
消费者组和分区再平衡
正如我们在前一节中看到的,消费者组中的消费者共享他们订阅的主题中的分区的所有权。 当我们向组中添加一个新的消费者时,它将开始使用之前由另一个消费者消费的分区中的消息。 当消费者关闭或崩溃时,同样的事情也会发生; 它离开组,它使用的分区将由剩余的一个消费者消费。 当消费者组所消费的主题被修改时(例如,如果管理员添加了新的分区),也会将分区重新分配给消费者。
将分区所有权从一个消费者转移到另一个消费者称为再平衡。 再平衡很重要,因为它为消费者组提供了高可用性和可伸缩性(允许我们轻松安全地添加和删除消费者),但在正常情况下,它们可能是不受欢迎的。
根据消费者组使用的分区分配策略,有两种类型的再平衡:
-
Eager rebalances(迫切再平衡)
在迫切再平衡过程中,所有的消费者停止消费,放弃他们对拥有分区的所有权,重新加入消费者组,并获得一个全新的分区分配。 从本质上讲,这是整个消费者组停止消费的一个短暂窗口。 窗口的长度取决于消费者组的大小以及几个配置参数。 图4-6显示了迫切再平衡有两个不同的阶段:首先,所有的消费者放弃他们的分区分配;其次,当他们都完成了这个任务并重新加入组后,他们获得新的分区分配并可以继续消费。
图4-6 迫切再平衡会收回所有分区、暂停消费并重新分配它们
-
Cooperative rebalances(协作再平衡)
协作再平衡(也称为增量再平衡)通常只涉及将分区的一小部分从一个消费者重新分配给另一个消费者,并允许消费者继续处理所有未被重新分配的分区的记录。 这是通过在两个或多个阶段进行再平衡来实现的。 首先,消费者组leader通知所有消费者,他们将失去其分区子集的所有权,然后消费者停止从这些分区消费,并放弃他们在这些分区中的所有权。 在第二个阶段中,消费者组负责人将这些孤立的分区分配给它们的新所有者。 这种增量方法可能需要几次迭代,直到实现稳定的分区分配,但它避免了使用迫切再平衡方法时发生的完全“停止”的不可用性。 这在大型消费者组中尤其重要,因为重新平衡可能需要相当长的时间。 图4-7显示了协作再平衡是如何递增的,并且只涉及消费者和分区的一个子集。
图4-7 协作再平衡只暂停将被重新分配的分区子集的消费
消费者通过将心跳发送到指定为组协调器(group coordinator)的Kafka broker,以此来维护消费者组的成员资格和分配给他们的分区的所有权(对于不同的消费者组,这个broker可能是不同的)。心跳由消费者的后台线程发送,只要消费者以固定的间隔发送心跳,就认为它是活的。
如果消费者停止发送心跳的时间足够长,它的会话将超时,组协调器将认为它已死并触发再平衡。如果消费者崩溃并停止处理消息,则组协调器将在没有心跳的情况下花费几秒钟来决定消费者死亡并触发再平衡。在这几秒钟内,死掉的消费者者所拥有的分区不会处理任何消息。当正常关闭一个消费者时,消费者将通知组协调器它正在离开,并且组协调器将立即触发重新平衡,从而减少处理中的差距。在本章的后面,我们将讨论控制心跳频率、会话超时和其他可用于微调消费者行为的配置参数的配置项。
将分区分配给消费者的过程是如何工作的?
当消费者希望加入一个组时,它向组协调器发送一个JoinGroup请求。第一个加入小组的消费者成为小组组长(leader)。leader从组协调器接收组中所有消费者的列表(包括所有最近发送了心跳,被认为是活的消费者),并负责将分区的子集分配给每个消费者。它使用PartitionAssignor的实现来决定哪些分区应该由哪个消费者处理。
Kafka有很少的内置分区分配策略,我们将在配置部分更深入地讨论。在决定分区分配之后,消费者组长将分配列表发送给GroupCoordinator, GroupCoordinator将此信息发送给所有消费者。每个消费者只看到自己的任务——leader是唯一拥有组中消费者及其任务完整列表的client process。每次再平衡发生时,这个过程都会重复。
静态组成员
缺省情况下,一个消费者作为其消费组成员的身份是暂时的。当消费者离开一个消费者组时,分配给消费者的分区将被撤销,当消费者重新加入时,它将通过rebalance协议被分配一个新的成员ID和一组新的分区。
正常情况下都是这样的,除非使用唯一的group.instance.id配置消费者,这使消费者成为组的静态成员。当消费者第一次以组的静态成员身份加入消费者组时,会按照该组正在使用的分区分配策略为其分配一组分区。但是,当这个消费者关闭时,它不会自动离开组----它一直是组的成员,直到会话超时。当消费者重新加入组时,将使用其静态标识识别它,并在不触发再平衡的情况下重新分配它先前持有的相同分区。为组的每个成员缓存分配的组协调器不需要触发再平衡,但可以将缓存分配发送给重新加入的静态成员。
如果两个消费者使用相同的group.instance.id加入同一个组,第二个消费者将得到一个错误,说具有此ID的消费者已经存在。
当应用程序维护由分配给每个消费者的分区填充的本地状态或缓存时,静态组成员非常有用。当重新创建这个缓存非常耗时时,不希望每次消费者重新启动时都发生这个过程。另一方面,重要的是要记住,当消费者重新启动时,每个消费者拥有的分区不会被重新分配。在一段时间内,没有消费者将使用来自这些分区的消息,当消费者最终重新启动时,它将滞后于这些分区中的最新消息。您应该确信,拥有这些分区的消费者能够在重启后赶上延迟时间。
需要注意的是,消费者组的静态成员在关闭时不会主动离开该组,检测他们何时“真正离开”取决于session.timeout.ms配置。 可以将其设置得足够高,以避免在简单应用程序重新启动时触发重新平衡,但又要设置得足够低,以允许在出现更长的停机时间时自动重新分配其分区,以避免在处理这些分区时出现较大的延迟。
4.2 创建一个Kafka消费者开始消费记录的第一步是创建一个KafkaConsumer实例。创建一个KafkaConsumer与创建一个KafkaProducer非常相似----您将创建一个Java Properties实例,其中包含要传递给消费者的属性。我们将在本章后面深入讨论所有的属性。首先,我们只需要使用三个必需属性:bootstrap.servers, key.deserializer和value.deserializer。
第一个属性,bootstrap.servers是到Kafka集群的连接字符串。 它的使用方式与KafkaProducer中完全相同(参见第3章关于它是如何定义的细节)。 另外两个属性,key.deserializer和value.deserializer类似于为生产者定义的序列化器,但不是指定将Java对象转换为字节数组的类,而是指定可以接受字节数组并将其转换为Java对象的类。
还有第四个属性,它不是严格强制的,但却很常用。这个属性是group.id,它指定了KafkaConsumer实例所属的消费者组。虽然可以创建不属于任何消费者组的消费者,但这是不常见的,因此对于本章的大部分内容,我们将假设消费者是一个组的一部分。
下面的代码片段展示了如何创建一个KafkaConsumer:
Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer (props);
如果你已经阅读了第3章关于创建生产者的内容,应该对这里所看到的大部分内容都很熟悉。我们假设消费的记录使用String对象作为记录的键和值。这里唯一的新属性是group.Id,消费者所属的用户组名称。
4.3 订阅主题(topic)创建消费者之后,下一步是订阅一个或多个主题。subscribe()方法以主题列表作为参数,所以使用起来非常简单:
//在这里,我们只创建了一个包含单个元素的列表:主题名称customerCountries。 consumer.subscribe(Collections.singletonList("customerCountries"));
还可以使用正则表达式调用subscribe。正则表达式可以匹配多个主题名称,如果创建了一个具有匹配名称的新主题,则几乎立即会发生再平衡,消费者将开始从新主题消费。这对于需要从多个主题消费并能够处理主题将包含的不同类型数据的应用程序非常有用。在Kafka和另一个系统或流处理应用程序之间复制数据的应用程序中,使用正则表达式订阅多个主题是最常用的。
例如,要订阅所有test主题,我们可以调用:
consumer.subscribe(Pattern.compile("test.*"));
4.4 轮询(Poll)循环警告
如果Kafka集群有大量的分区,可能有30,000或更多,您应该知道订阅的主题过滤是在客户端完成的。 这意味着,当通过正则表达式而不是通过显式列表订阅一个主题子集时,消费者将定期从broker请求所有主题及其分区的列表。 然后,客户端将使用此列表来检测应该包含在其订阅中的新主题并订阅它们。 当主题列表很大且有许多消费者时,主题和分区列表的大小是很大的,并且正则表达式订阅在broker、客户端和网络上有很大的开销。 在某些情况下,主题元数据(topic metadata)使用的带宽大于用于发送数据的带宽。 这也意味着,为了使用正则表达式订阅,客户端需要权限来描述集群中的所有主题——也就是说,在整个集群上授予完整的describe权限。
消费者API的核心是一个简单的循环,用于轮询服务器以获取更多数据。消费者的主要代码类似如下:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)