Kafka(Go)教程(十一)---Consumer Group & Rebalance

Kafka(Go)教程(十一)---Consumer Group & Rebalance,第1张

来自:指月 https://www.lixueduan.com

原文:https://www.lixueduan.com/post/kafka/11-consumer-group-rebalance/

本文主要讲述了 Kafka 的消费者组(Consumer Group)和 消费者组的 Rebalance 及如何避免无效 Rebalance。

Kakfa 相关代码见 Github

1. 传统消息模型

传统消息模型一般分为消息队列模型和发布订阅模型:

1)消息队列模型的缺陷在于消息一旦被消费,就会从队列中被删除,而且只能被下游的一个 Consumer 消费。这种模型的伸缩性(scalability)很差,因为下游的多个 Consumer 都要“抢”这个共享消息队列的消息。

2)发布 / 订阅模型倒是允许消息被多个 Consumer 消费,但它的问题也是伸缩性不高,因为每个订阅者都必须要订阅主题的所有分区。这种全量订阅的方式既不灵活,也会影响消息的真实投递效果。

Kafka 的 Consumer Group 机制正好避开这两种模型的缺陷,又兼具它们的优点。

Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型:

如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型。 2. Consumer Group

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。

组内可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。

Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。 在实际场景中,使用进程更为常见一些。 Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。 这个分区当然也可以被其他的 Group 消费。

理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数,这样能最大限度地实现高伸缩性。

注意:Consumer Group 中的实例是以 分区 为单位进行消费的,如果实例数大于分区数就会导致有的实例无法消费到任何消息。

假如有 6 个分区,Consumer Group 中却有 8 个实例,那么有两个实例将不会被分配任何分区,它们永远处于空闲状态。

因此,在实际使用过程中一般不推荐设置大于总分区数的 Consumer 实例。

3. Rebalance Rebalance 流程

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。

比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

那么 Consumer Group 何时进行 Rebalance 呢?Rebalance 的触发条件有 3 个。

1)组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,亦或是有 Consumer 实例崩溃被“踢出”组。2)订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。3)订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

假设目前某个 Consumer Group 下有两个 Consumer,比如 A 和 B,当第三个成员 C 加入时,Kafka 会触发 Rebalance,并根据默认的分配策略重新为 A、B 和 C 分配分区,如下图所示:

Rebalance 的弊端

在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病的一个方面。

其次,目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。

其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例 A 继续消费分区 1、2、3,而不是被重新分配其他的分区。这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。

最后,Rebalance 实在是太慢了。

曾经,有个国外用户的 Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!这完全是不能忍受的。最悲剧的是,目前社区对此无能为力,至少现在还没有特别好的解决方案。

因此一些大数据框架都使用的 standalone consumer。

避免无效 Rebalance

要避免 Rebalance,还是要从 Rebalance 发生的时机入手:

1)组成员数量发生变化2)订阅主题数量发生变化3)订阅主题的分区数发生变化

后面两个通常都是运维的主动 *** 作,所以它们引发的 Rebalance 大都是不可避免的。所以我们主要说说因为组成员数量变化而引发的 Rebalance 该如何避免。

如果 Consumer Group 下的 Consumer 实例数量发生变化,就一定会引发 Rebalance。这是 Rebalance 发生的最常见的原因。

除了手动启动或停止之外,Consumer 实例可能会被 Coordinator 错误地认为“已停止”从而被“踢出”Group。如果是这个原因导致的 Rebalance,我们就不能不管了。

Coordinator 即协调者,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。

当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。

Consumer 端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是 10 秒,即如果 Coordinator 在 10 秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。可以这么说,session.timeout.ms 决定了 Consumer 存活性的时间间隔。

Consumer 还提供了一个允许你控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中。

Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。因此,你需要仔细地设置 session.timeout.msheartbeat.interval.ms 的值。我在这里给出一些推荐数值,你可以“无脑”地应用在你的生产环境中。

设置 session.timeout.ms = 6s。设置 heartbeat.interval.ms = 2s。要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,你最好将该参数值设置得大一点,比你的下游最大处理时间稍长一点。

最后 Consumer 端的因为 GC 设置不合理导致程序频发 Full GC 也可能引发非预期 Rebalance。

小结,调整以下 4 个参数以避免无效 Rebalance:

session.timeout.msheartbeat.interval.msmax.poll.interval.msGC 参数 4. Go 客户端
// sarama 库中消费者组为一个接口 sarama.ConsumerGroup 所有实现该接口的类型都能当做消费者组使用。

// MyConsumerGroupHandler 实现 sarama.ConsumerGroup 接口,作为自定义ConsumerGroup
type MyConsumerGroupHandler struct {
	name  string
	count int64
}

// Setup 执行在 获得新 session 后 的第一步, 在 ConsumeClaim() 之前
func (MyConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }

// Cleanup 执行在 session 结束前, 当所有 ConsumeClaim goroutines 都退出时
func (MyConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

// ConsumeClaim 具体的消费逻辑
func (h MyConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("[consumer] name:%s topic:%q partition:%d offset:%d\n", h.name, msg.Topic, msg.Partition, msg.Offset)
		// 标记消息已被消费 内部会更新 consumer offset
		sess.MarkMessage(msg, "")
		h.count++
		if h.count%1 == 0 {
			fmt.Printf("name:%s 消费数:%v\n", h.name, h.count)
		}
	}
	return nil
}

func ConsumerGroup(topic, group, name string) {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	cg, err := sarama.NewConsumerGroup([]string{conf.HOST}, group, config)
	if err != nil {
		log.Fatal("NewConsumerGroup err: ", err)
	}
	defer cg.Close()
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		handler := MyConsumerGroupHandler{name: name}
		for {
			fmt.Println("running: ", name)
			/*
				应该在一个无限循环中不停地调用 Consume()
				因为每次 Rebalance 后需要再次执行 Consume() 来恢复连接
				Consume 开始才发起 Join Group 请求 如果当前消费者加入后成为了 消费者组 leader,则还会进行 Rebalance 过程,从新分配
				组内每个消费组需要消费的 topic 和 partition,最后 Sync Group 后才开始消费
			*/
			err = cg.Consume(ctx, []string{topic}, handler)
			if err != nil {
				log.Println("Consume err: ", err)
			}
			// 如果 context 被 cancel 了,那么退出
			if ctx.Err() != nil {
				return
			}
		}
	}()
	wg.Wait()
}

注意点:

1)需要实现 sarama.ConsumerGroup 接口2)具体消费逻辑在 ConsumeClaim 方法中实现,消费完成后通过sess.MarkMessage标记消息已经被消费3)需要在 for 循环中调用 Consume 方法,因为 Rebalance 的存在会导致连接终端或者被分配到其他 Broker 上去,需要重新建立连接。 5. 小结

Kakfa 相关代码见 Github

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。

建议消费者组中的实例数等于 Topic 的分区数。

Rebalance 弊端:

影响 Consumer 端 TPS很慢效率不高

非必要 Rebalance:

因为Consumer没能及时发送心跳请求,导致被踢出”Group而引发的。Consumer消费时间过长导致的。

减少 Rebalance 的 4个参数:

session.timeout.msheartbeat.interval.msmax.poll.interval.msGC 参数 6. 参考

https://github.com/Shopify/sarama

《Kafka 核心技术与实战》

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/995265.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存