kafka客户端sarama使用之手动提交offset

kafka客户端sarama使用之手动提交offset,第1张

kafka客户端sarama使用之手动提交offset

项目需求不能丢失任何一条消息,网上手动提交offset的代码好像有点小问题,在这里改一下

package main

import (
	"context"
	"fmt"

	sarama "github.com/Shopify/sarama"
)

type consumerGroupHandler struct {
	name string
}

func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	i := 0
	for msg := range claim.Messages() {
		fmt.Println(msg.Offset)
		sess.MarkMessage(msg, "")
		i++
		if i%15 == 0 {

			sess.Commit()
		}
	}
	return nil
}

func main() {

	fmt.Println("consumer_test")

	config := sarama.NewConfig()
	config.Consumer.Return.Errors = false
	config.Version = sarama.V0_11_0_2
	config.Consumer.Offsets.AutoCommit.Enable = false
	config.Consumer.Offsets.Initial = sarama.OffsetOldest

	group, err := sarama.NewConsumerGroup([]string{"192.168.1.125:9092"}, "t", config)
	//sarama.NewConsumerGroupFromClient()
	if err != nil {
		panic(err)
	}
	defer group.Close()

	for {
		handler := consumerGroupHandler{name: "sera"}
		err := group.Consume(context.Background(), []string{"test"}, handler)
		if err != nil {
			fmt.Println(err.Error())
		}
	}
}

说明:
1.newconsumergroup第一个参数为broker地址,第二个参数为消费者组的名称,返回值为生成的消费组
根据笔者的理解,消费者和消费者组都只能消费一次,对于broker来说并不关心两者的区别;或者说消费者组就是消费者,只不过内部
开了多线程
2.consume第二个参数为topic名称,第三个参数可能是实例化消费者?
3.实例化中sess.markmessage用于在本地标记该消息已被消费,需要用commit进行提交

参考:https://www.cnblogs.com/wishFreedom/p/15131600.html

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4685825.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存