项目需求不能丢失任何一条消息,网上手动提交offset的代码好像有点小问题,在这里改一下
package main import ( "context" "fmt" sarama "github.com/Shopify/sarama" ) type consumerGroupHandler struct { name string } func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { i := 0 for msg := range claim.Messages() { fmt.Println(msg.Offset) sess.MarkMessage(msg, "") i++ if i%15 == 0 { sess.Commit() } } return nil } func main() { fmt.Println("consumer_test") config := sarama.NewConfig() config.Consumer.Return.Errors = false config.Version = sarama.V0_11_0_2 config.Consumer.Offsets.AutoCommit.Enable = false config.Consumer.Offsets.Initial = sarama.OffsetOldest group, err := sarama.NewConsumerGroup([]string{"192.168.1.125:9092"}, "t", config) //sarama.NewConsumerGroupFromClient() if err != nil { panic(err) } defer group.Close() for { handler := consumerGroupHandler{name: "sera"} err := group.Consume(context.Background(), []string{"test"}, handler) if err != nil { fmt.Println(err.Error()) } } }
说明:
1.newconsumergroup第一个参数为broker地址,第二个参数为消费者组的名称,返回值为生成的消费组
根据笔者的理解,消费者和消费者组都只能消费一次,对于broker来说并不关心两者的区别;或者说消费者组就是消费者,只不过内部
开了多线程
2.consume第二个参数为topic名称,第三个参数可能是实例化消费者?
3.实例化中sess.markmessage用于在本地标记该消息已被消费,需要用commit进行提交
参考:https://www.cnblogs.com/wishFreedom/p/15131600.html
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)