项目需求不能丢失任何一条消息,网上手动提交offset的代码好像有点小问题,在这里改一下
package main
import (
"context"
"fmt"
sarama "github.com/Shopify/sarama"
)
type consumerGroupHandler struct {
name string
}
func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
i := 0
for msg := range claim.Messages() {
fmt.Println(msg.Offset)
sess.MarkMessage(msg, "")
i++
if i%15 == 0 {
sess.Commit()
}
}
return nil
}
func main() {
fmt.Println("consumer_test")
config := sarama.NewConfig()
config.Consumer.Return.Errors = false
config.Version = sarama.V0_11_0_2
config.Consumer.Offsets.AutoCommit.Enable = false
config.Consumer.Offsets.Initial = sarama.OffsetOldest
group, err := sarama.NewConsumerGroup([]string{"192.168.1.125:9092"}, "t", config)
//sarama.NewConsumerGroupFromClient()
if err != nil {
panic(err)
}
defer group.Close()
for {
handler := consumerGroupHandler{name: "sera"}
err := group.Consume(context.Background(), []string{"test"}, handler)
if err != nil {
fmt.Println(err.Error())
}
}
}
说明:
1.newconsumergroup第一个参数为broker地址,第二个参数为消费者组的名称,返回值为生成的消费组
根据笔者的理解,消费者和消费者组都只能消费一次,对于broker来说并不关心两者的区别;或者说消费者组就是消费者,只不过内部
开了多线程
2.consume第二个参数为topic名称,第三个参数可能是实例化消费者?
3.实例化中sess.markmessage用于在本地标记该消息已被消费,需要用commit进行提交
参考:https://www.cnblogs.com/wishFreedom/p/15131600.html
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)