golang源码分析:sarama kafka client(part II:消费者) - 墨天轮
这一讲,我们接着介绍下sarama kafka client的消费者的实现,先从例子开始:
package main
import (
"fmt"
"log"
"sync"
"github.com/Shopify/sarama"
)
// 消费者练习
func main() {
// 生成消费者 实例
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Print(err)
return
}
// 拿到 对应主题下所有分区
partitionList, err := consumer.Partitions("test")
if err != nil {
log.Println(err)
return
}
var wg sync.WaitGroup
wg.Add(1)
// 遍历所有分区
for partition := range partitionList {
//消费者 消费 对应主题的 具体 分区 指定 主题 分区 offset return 对应分区的对象
pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
if err != nil {
log.Println(err)
return
}
// 运行完毕记得关闭
defer pc.AsyncClose()
// 去出对应的 消息
// 通过异步 拿到 消息
go func(sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
wg.Wait()
}
分三个部分:
1,sarama.NewConsumer ,创建一个consumer
2,consumer.ConsumePartition 从指定topic,指定分区消费消息
3, msg := range pc.Messages() 获取消息
如果不需要拿到所有的分区,也可以只指定comsumer group
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"github.com/Shopify/sarama"
)
type consumerGroupHandler struct {
name string
}
func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 手动确认消息
sess.MarkMessage(msg, "")
}
return nil
}
func handleErrors(group *sarama.ConsumerGroup, wg *sync.WaitGroup) {
wg.Done()
for err := range (*group).Errors() {
fmt.Println("ERROR", err)
}
}
func consume(group *sarama.ConsumerGroup, wg *sync.WaitGroup, name string) {
fmt.Println(name + "start")
wg.Done()
ctx := context.Background()
for {
topics := []string{"test"}
handler := consumerGroupHandler{name: name}
err := (*group).Consume(ctx, topics, handler)
fmt.Println("consume group end")
if err != nil {
panic(err)
}
}
}
func main() {
var wg sync.WaitGroup
config := sarama.NewConfig()
config.Consumer.Return.Errors = false
config.Version = sarama.V0_10_2_0
client, err := sarama.NewClient([]string{"localhost:9092"}, config)
defer client.Close()
if err != nil {
panic(err)
}
group1, err := sarama.NewConsumerGroupFromClient("c1", client)
if err != nil {
panic(err)
}
group2, err := sarama.NewConsumerGroupFromClient("c2", client)
if err != nil {
panic(err)
}
group3, err := sarama.NewConsumerGroupFromClient("c3", client)
if err != nil {
panic(err)
}
defer group1.Close()
defer group2.Close()
defer group3.Close()
wg.Add(3)
go consume(&group1, &wg, "c1")
go consume(&group2, &wg, "c2")
go consume(&group3, &wg, "c3")
wg.Wait()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
}
}
我们从NewConsumerGroup作为入口开始源码分析:
consumer_group.go
func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
client, err := NewClient(addrs, config)
c, err := newConsumerGroup(groupID, client)
}
先创建一个client,然后生成一个consumerGroup 对象:
type ConsumerGroup interface {
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
// Errors returns a read channel of errors that occurred during the consumer life-cycle.
// By default, errors are logged and not returned over this channel.
// If you want to implement any custom error handling, set your config's
// Consumer.Return.Errors setting to true, and read from this channel.
Errors() <-chan error
// Close stops the ConsumerGroup and detaches any running sessions. It is required to call
// this function before the object passes out of scope, as it will otherwise leak memory.
Close() error
}
type consumerGroup struct {
client Client
config *Config
consumer Consumer
groupID string
memberID string
errors chan error
lock sync.Mutex
closed chan none
closeOnce sync.Once
userData []byte
}
func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
consumer, err := NewConsumerFromClient(client)
}
创建consumerGroup的同时会创建consumer对象:
consumer.go
func NewConsumerFromClient(client Client) (Consumer, error) {
cli := &nopCloserClient{client}
return newConsumer(cli)
}
func newConsumer(client Client) (Consumer, error) {
}
type consumer struct {
conf *Config
children map[string]map[int32]*partitionConsumer
brokerConsumers map[*Broker]*brokerConsumer
client Client
lock sync.Mutex
}
创建完ConsumerGroup后我们就开始消费了,对应的接口是Consume
func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
c.client.RefreshMetadata(topics...)//加载元数据
sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
go c.loopCheckPartitionNumbers(topics, sess)
}
RefreshMetadata用于获取对应元数据信息,代码在client.go
func (client *client) RefreshMetadata(topics ...string) error {
return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
broker = client.any()
req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
response, err := broker.GetMetadata(req)
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
}
每个 partition 与 consumer 的分配关系称作一个 “claim”;一组 ConsumerGroupClain 这一轮的生命周期称作一个 session。session 的退出发生在 ctx 退出,或者 partition rebalance。session 要求客户端与 coordinator 保持一定的心跳,原版 kafka 客户端为此有一条 session.timeout.ms 的配置,客户端需要在时间范围内对 coordinator 发送心跳,不然将视为该客户端退出而出发 Rebalance。
func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
coordinator, err := c.client.Coordinator(c.groupID)
join, err := c.joinGroupRequest(coordinator, topics)
groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}
func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
go sess.heartbeatLoop()
// start consuming
for topic, partitions := range claims {
for _, partition := range partitions {
sess.waitGroup.Add(1)
go func(topic string, partition int32) {
sess.consume(topic, partition)
}(topic, partition)
}
}
}
type consumerGroupSession struct {
parent *consumerGroup
memberID string
generationID int32
handler ConsumerGroupHandler
claims map[string][]int32
offsets *offsetManager
ctx context.Context
cancel func()
waitGroup sync.WaitGroup
releaseOnce sync.Once
hbDying, hbDead chan none
}
调用了 sess.consume(topic, partition) 这个接口:
func (s *consumerGroupSession) consume(topic string, partition int32) {
// create new claim
claim, err := newConsumerGroupClaim(s, topic, partition, offset)
s.handler.ConsumeClaim(s, claim)
}
func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
}
type consumerGroupClaim struct {
topic string
partition int32
offset int64
PartitionConsumer
}
调用了ConsumePartition消费对应的partition
consumer.go
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
child := &partitionConsumer
if err := child.chooseStartingOffset(offset); err != nil
if leader, err = c.client.Leader(child.topic, child.partition); err != nil
if err := c.addChild(child); err != nil
c.children[child.topic] = topicChildren
topicChildren[child.partition] = child
go withRecover(child.dispatcher)
go withRecover(child.responseFeeder)
child.broker = c.refBrokerConsumer(leader)
bc := c.brokerConsumers[broker]
bc.refs++
child.broker.input <- child
}
创建了一个partitionConsumer对象:
type partitionConsumer struct {
highWaterMarkOffset int64 must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
consumer *consumer
conf *Config
broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
feeder chan *FetchResponse
preferredReadReplica int32
trigger, dying chan none
closeOnce sync.Once
topic string
partition int32
responseResult error
fetchSize int32
offset int64
retries int32
}
同时起了两个协程,这两个协程是核心
1,先看dispatcher,主要是维护订阅者信息
func (child *partitionConsumer) dispatcher()
for range child.trigger
if err := child.dispatch(); err != nil {
child.consumer.unrefBrokerConsumer(child.broker)
child.consumer.removeChild(child)
close(child.feeder
看下dispatcher协程里的dispatch方法
func (child *partitionConsumer) dispatch() error
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil
broker, err := child.preferredBroker()
child.broker = child.consumer.refBrokerConsumer(broker)
child.broker.input <- child
先获得一个brokerConsumer 对象:
type brokerConsumer struct {
consumer *consumer
broker *Broker
input chan *partitionConsumer
newSubscriptions chan []*partitionConsumer
subscriptions map[*partitionConsumer]none
wait chan none
acks sync.WaitGroup
refs int
}
func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
bc = c.newBrokerConsumer(broker)
}
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer
go withRecover(bc.subscriptionManager)
go withRecover(bc.subscriptionConsumer)
起了两个协程:
func (bc *brokerConsumer) subscriptionManager(){
case event, ok := <-bc.input:
buffer = append(buffer, event)
case bc.newSubscriptions <- buffer
buffer = nil
}
input里面有新的订阅请求,会appende到newSubscriptions 里面,不是带缓冲的channel,是一个chnel,里面是个slice
func (bc *brokerConsumer) subscriptionConsumer()
for newSubscriptions := range bc.newSubscriptions {
bc.updateSubscriptions(newSubscriptions)
response, err := bc.fetchNewMessages()
bc.acks.Add(len(bc.subscriptions))
child.feeder <- response
bc.acks.Wait()
bc.handleResponses()
每次收到消费者变换的消息后,都会调用fetchNewMessages,然后放到feeder里面
func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
}
return bc.broker.Fetch(request)
}
Fetch就是请求broker,获取消息
func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
response := new(FetchResponse)
err := b.sendAndReceive(request, response)
}
2,接着看下responseFeeder协程
func (child *partitionConsumer) responseFeeder() {
feederLoop:
从broker获取消息的大循环
for response := range child.feeder
for i, msg := range msgs {
case child.messages <- msg:
child.broker.input <- child
continue feederLoop
}
这是整个consumer的消息大循环,不断从feeder里面消费消息,放到messages里面,处理完毕以后将自己放回broker的input里面。
subscriptionManager会从input里面把它取出来,然后取kafka拉取消息,完成了完整的消息循环
最后看下Messages接口
func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
return child.messages
}
很简单,就是把处理好的消息从messages这个chanel里面取出来。
总结下:
partitonConsumer 会启动 dispatcher 和 responseFeeder 两个 goroutine:
1,dispatcher goroutine 用于跟踪 broker 的变化,偏元信息性质的控制侧,dispatcher 这个 goroutine 用于发现 broker 的变化。它会侦听 dispatcher.trigger 这个 channel 的通知,来发现 Partition 的 Leader 变化。而 trigger 这个 channel 的更新来自 brokerConsumer 对象。
最后 child.broker.input<- child 这一句,相当于使 partitionConsumer 加入 brokerConsumer 的订阅。
2, responseFeeder 用于跟踪消息的到来,偏数据侧。
child.feed 这个 channel 也是来自 brokerConsumer。大约是处理来自 brokerConsumer 的消息,转发给 messages chan。
值得留意有一个配置项目 child.conf.Consumer.MaxProcessingTime,默认值为 100ms,看注释它的意思是如果朝 messages chan 写入超过 100ms 仍未成功,则停止再向 Broker 发送 fetch 请求。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)