golang源码分析:sarama kafka client(part II:消费者)

golang源码分析:sarama kafka client(part II:消费者),第1张

 

golang源码分析:sarama kafka client(part II:消费者) - 墨天轮

这一讲,我们接着介绍下sarama kafka client的消费者的实现,先从例子开始:

package main


import (
  "fmt"
  "log"
  "sync"


  "github.com/Shopify/sarama"
)


// 消费者练习


func main() {
  // 生成消费者 实例
  consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
  if err != nil {
    log.Print(err)
    return
  }
  // 拿到 对应主题下所有分区
  partitionList, err := consumer.Partitions("test")
  if err != nil {
    log.Println(err)
    return
  }


  var wg sync.WaitGroup
  wg.Add(1)
  // 遍历所有分区
  for partition := range partitionList {
    //消费者 消费 对应主题的 具体 分区 指定 主题 分区 offset  return 对应分区的对象
    pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
    if err != nil {
      log.Println(err)
      return
    }


    // 运行完毕记得关闭
    defer pc.AsyncClose()


    // 去出对应的 消息
    // 通过异步 拿到 消息
    go func(sarama.PartitionConsumer) {
      defer wg.Done()
      for msg := range pc.Messages() {
        fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
      }
    }(pc)
  }
  wg.Wait()
}

分三个部分:

1,sarama.NewConsumer ,创建一个consumer

2,consumer.ConsumePartition 从指定topic,指定分区消费消息

3, msg := range pc.Messages() 获取消息

如果不需要拿到所有的分区,也可以只指定comsumer group

package main


import (
  "context"
  "fmt"
  "os"
  "os/signal"
  "sync"


  "github.com/Shopify/sarama"
)


type consumerGroupHandler struct {
  name string
}


func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  for msg := range claim.Messages() {
    fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n", h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
    // 手动确认消息
    sess.MarkMessage(msg, "")
  }
  return nil
}


func handleErrors(group *sarama.ConsumerGroup, wg *sync.WaitGroup) {
  wg.Done()
  for err := range (*group).Errors() {
    fmt.Println("ERROR", err)
  }
}


func consume(group *sarama.ConsumerGroup, wg *sync.WaitGroup, name string) {
  fmt.Println(name + "start")
  wg.Done()
  ctx := context.Background()
  for {
    topics := []string{"test"}
    handler := consumerGroupHandler{name: name}
    err := (*group).Consume(ctx, topics, handler)
    fmt.Println("consume group end")
    if err != nil {
      panic(err)
    }
  }
}


func main() {
  var wg sync.WaitGroup
  config := sarama.NewConfig()
  config.Consumer.Return.Errors = false
  config.Version = sarama.V0_10_2_0
  client, err := sarama.NewClient([]string{"localhost:9092"}, config)
  defer client.Close()
  if err != nil {
    panic(err)
  }
  group1, err := sarama.NewConsumerGroupFromClient("c1", client)
  if err != nil {
    panic(err)
  }
  group2, err := sarama.NewConsumerGroupFromClient("c2", client)
  if err != nil {
    panic(err)
  }
  group3, err := sarama.NewConsumerGroupFromClient("c3", client)
  if err != nil {
    panic(err)
  }
  defer group1.Close()
  defer group2.Close()
  defer group3.Close()
  wg.Add(3)
  go consume(&group1, &wg, "c1")
  go consume(&group2, &wg, "c2")
  go consume(&group3, &wg, "c3")
  wg.Wait()
  signals := make(chan os.Signal, 1)
  signal.Notify(signals, os.Interrupt)
  select {
  case <-signals:
  }
}

我们从NewConsumerGroup作为入口开始源码分析:

consumer_group.go

func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
    client, err := NewClient(addrs, config)
    c, err := newConsumerGroup(groupID, client)
}

先创建一个client,然后生成一个consumerGroup 对象:

type ConsumerGroup interface {
  Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
  // Errors returns a read channel of errors that occurred during the consumer life-cycle.
  // By default, errors are logged and not returned over this channel.
  // If you want to implement any custom error handling, set your config's
  // Consumer.Return.Errors setting to true, and read from this channel.
  Errors() <-chan error
  // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
  // this function before the object passes out of scope, as it will otherwise leak memory.
  Close() error
}

type consumerGroup struct {
  client Client


  config   *Config
  consumer Consumer
  groupID  string
  memberID string
  errors   chan error


  lock      sync.Mutex
  closed    chan none
  closeOnce sync.Once


  userData []byte
}
func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
  consumer, err := NewConsumerFromClient(client) 
}

创建consumerGroup的同时会创建consumer对象:

consumer.go

func NewConsumerFromClient(client Client) (Consumer, error) {
  cli := &nopCloserClient{client}
  return newConsumer(cli)
}
func newConsumer(client Client) (Consumer, error) {
}
type consumer struct {
  conf            *Config
  children        map[string]map[int32]*partitionConsumer
  brokerConsumers map[*Broker]*brokerConsumer
  client          Client
  lock            sync.Mutex
}

创建完ConsumerGroup后我们就开始消费了,对应的接口是Consume

func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
   c.client.RefreshMetadata(topics...)//加载元数据
   sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
   go c.loopCheckPartitionNumbers(topics, sess)
}

RefreshMetadata用于获取对应元数据信息,代码在client.go

func (client *client) RefreshMetadata(topics ...string) error {
  return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
    broker = client.any()
    req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
    response, err := broker.GetMetadata(req)
    shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
}

        每个 partition 与 consumer 的分配关系称作一个 “claim”;一组 ConsumerGroupClain 这一轮的生命周期称作一个 session。session 的退出发生在 ctx 退出,或者 partition rebalance。session 要求客户端与 coordinator 保持一定的心跳,原版 kafka 客户端为此有一条 session.timeout.ms 的配置,客户端需要在时间范围内对 coordinator 发送心跳,不然将视为该客户端退出而出发 Rebalance。

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
    coordinator, err := c.client.Coordinator(c.groupID)
    join, err := c.joinGroupRequest(coordinator, topics)
    groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
  return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}

func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
    offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
    go sess.heartbeatLoop() 
      // start consuming
    for topic, partitions := range claims {
      for _, partition := range partitions {
        sess.waitGroup.Add(1)
        go func(topic string, partition int32) {
        sess.consume(topic, partition)
      }(topic, partition)
    }
  }
}
type consumerGroupSession struct {
  parent       *consumerGroup
  memberID     string
  generationID int32
  handler      ConsumerGroupHandler


  claims  map[string][]int32
  offsets *offsetManager
  ctx     context.Context
  cancel  func()


  waitGroup       sync.WaitGroup
  releaseOnce     sync.Once
  hbDying, hbDead chan none
}

调用了 sess.consume(topic, partition) 这个接口:

func (s *consumerGroupSession) consume(topic string, partition int32) {
  // create new claim
  claim, err := newConsumerGroupClaim(s, topic, partition, offset)
   s.handler.ConsumeClaim(s, claim)
}
func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
  pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
}
type consumerGroupClaim struct {
  topic     string
  partition int32
  offset    int64
  PartitionConsumer
}

调用了ConsumePartition消费对应的partition

consumer.go

func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
    child := &partitionConsumer
    if err := child.chooseStartingOffset(offset); err != nil
      if leader, err = c.client.Leader(child.topic, child.partition); err != nil
      if err := c.addChild(child); err != nil
        c.children[child.topic] = topicChildren
        topicChildren[child.partition] = child
      go withRecover(child.dispatcher)
      go withRecover(child.responseFeeder)
      child.broker = c.refBrokerConsumer(leader)
        bc := c.brokerConsumers[broker]
        bc.refs++
      child.broker.input <- child
}

创建了一个partitionConsumer对象:

type partitionConsumer struct {
  highWaterMarkOffset int64  must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG


  consumer *consumer
  conf     *Config
  broker   *brokerConsumer
  messages chan *ConsumerMessage
  errors   chan *ConsumerError
  feeder   chan *FetchResponse


  preferredReadReplica int32


  trigger, dying chan none
  closeOnce      sync.Once
  topic          string
  partition      int32
  responseResult error
  fetchSize      int32
  offset         int64
  retries        int32
}

同时起了两个协程,这两个协程是核心

1,先看dispatcher,主要是维护订阅者信息

    func (child *partitionConsumer) dispatcher() 
      for range child.trigger
        if err := child.dispatch(); err != nil {
        child.consumer.unrefBrokerConsumer(child.broker)
        child.consumer.removeChild(child)
        close(child.feeder

看下dispatcher协程里的dispatch方法

 func (child *partitionConsumer) dispatch() error 
      if err := child.consumer.client.RefreshMetadata(child.topic); err != nil 
      broker, err := child.preferredBroker()
      child.broker = child.consumer.refBrokerConsumer(broker)
      child.broker.input <- child

先获得一个brokerConsumer 对象:

type brokerConsumer struct {
  consumer         *consumer
  broker           *Broker
  input            chan *partitionConsumer
  newSubscriptions chan []*partitionConsumer
  subscriptions    map[*partitionConsumer]none
  wait             chan none
  acks             sync.WaitGroup
  refs             int
}
func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
   bc = c.newBrokerConsumer(broker)
}
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer 
      go withRecover(bc.subscriptionManager)
      go withRecover(bc.subscriptionConsumer)

起了两个协程:

func (bc *brokerConsumer) subscriptionManager(){
      case event, ok := <-bc.input:
         buffer = append(buffer, event)
      case bc.newSubscriptions <- buffer
         buffer = nil
 }

input里面有新的订阅请求,会appende到newSubscriptions 里面,不是带缓冲的channel,是一个chnel,里面是个slice

func (bc *brokerConsumer) subscriptionConsumer() 
      for newSubscriptions := range bc.newSubscriptions {
          bc.updateSubscriptions(newSubscriptions)
          response, err := bc.fetchNewMessages()
          bc.acks.Add(len(bc.subscriptions))
          child.feeder <- response
          bc.acks.Wait()
          bc.handleResponses()

每次收到消费者变换的消息后,都会调用fetchNewMessages,然后放到feeder里面

func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
     for child := range bc.subscriptions {
        request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
     }
     return bc.broker.Fetch(request)
}

Fetch就是请求broker,获取消息

func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
  response := new(FetchResponse)
  err := b.sendAndReceive(request, response)
}

2,接着看下responseFeeder协程

 func (child *partitionConsumer) responseFeeder() {
   feederLoop:
      从broker获取消息的大循环
      for response := range child.feeder
           for i, msg := range msgs {
               case child.messages <- msg:
                   child.broker.input <- child
                   continue feederLoop
 }

这是整个consumer的消息大循环,不断从feeder里面消费消息,放到messages里面,处理完毕以后将自己放回broker的input里面。

subscriptionManager会从input里面把它取出来,然后取kafka拉取消息,完成了完整的消息循环

最后看下Messages接口

func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
  return child.messages
}

很简单,就是把处理好的消息从messages这个chanel里面取出来。

总结下:

    partitonConsumer 会启动 dispatcher 和 responseFeeder 两个 goroutine:

      1,dispatcher goroutine 用于跟踪 broker 的变化,偏元信息性质的控制侧,dispatcher 这个 goroutine 用于发现 broker 的变化。它会侦听 dispatcher.trigger 这个 channel 的通知,来发现 Partition 的 Leader 变化。而 trigger 这个 channel 的更新来自 brokerConsumer 对象。

最后 child.broker.input<- child 这一句,相当于使 partitionConsumer 加入 brokerConsumer 的订阅。

2, responseFeeder 用于跟踪消息的到来,偏数据侧。

child.feed 这个 channel 也是来自 brokerConsumer。大约是处理来自 brokerConsumer 的消息,转发给 messages chan。

值得留意有一个配置项目 child.conf.Consumer.MaxProcessingTime,默认值为 100ms,看注释它的意思是如果朝 messages chan 写入超过 100ms 仍未成功,则停止再向 Broker 发送 fetch 请求。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/993839.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存