golang源码分析:sarama kafka client(part III:client的角色)

golang源码分析:sarama kafka client(part III:client的角色),第1张

golang源码分析:sarama kafka client(part III:client的角色)

golang源码分析:sarama kafka client(part III:client的角色) - 墨天轮

理解client的角色对我们理解kafka和sarama非常有帮助。下面将一一详细介绍:

        我们用到了各种各样的client,返回的对象都是一个Broker的指针,本质上讲,我们通过kafka client 最终都是和broker通信,所以用Broker对象封装和kafka的连接,表示Client。不同场景下,Client有不同的角色,角色是通过元数据来确定的。

func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) 
  metadata:= client.cachedmetadata(topic, partitionID)
  err := client.Refreshmetadata(topic)
func (client *client) Refreshmetadata(topics ...string) error
  return client.tryRefreshmetadata(topics, client.conf.metadata.Retry.Max, deadline)
func (client *client) tryRefreshmetadata(topics []string, attemptsRemaining int, deadline time.Time) error 
    broker := client.any()
    response, err := broker.Getmetadata(req)
    err := b.sendAndReceive(request, response)
    shouldRetry, err := client.updatemetadata(response, allKnownmetaData)
  client.deregisterBroker(broker)
   _ = broker.Close()

可以看到,获取元数据的过程是随机选择一个broker(没有元数据不知道身份),然后获取元数据,存储下来。

1,Controller

        首先是Controller,Kafka集群中,首先会选举出一个broker作为controller,然后该controller负责跟其他broker进行协调topic创建,partition主副本选举,topic删除等事务

func (client *client) Controller() (*Broker, error){
       client.refreshmetadata()
       controller := client.cachedController()
       _ = controller.Open(client.conf)
}
func (client *client) cachedController() *Broker {
    return client.brokers[client.controllerID]
 }

通过元数据,获取controllerId,然后通过controllerID找到Controller,和前面介绍的一样,topic和partation相关的增删 *** 作会用到controller,主要在admin.go中

func NewClusterAdminFromClient(client Client) (ClusterAdmin, error)
    _, err := client.Controller()
func (ca *clusterAdmin) Controller() (*Broker, error)
    return ca.client.Controller()
func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
    b, err := ca.Controller()
func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*Topicmetadata, err error)
    controller, err := ca.Controller()
func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
    b, err := ca.Controller()
func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error
    b, err := ca.Controller()

2,Coordinator

每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。

func (client *client) Coordinator(consumerGroup string) (*Broker, error) 
  _ = coordinator.Open(client.conf)

获取元数据后,就会获取Coordinator

func (client *client) getConsumermetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) 
    broker := client.any();
    request := new(FindCoordinatorRequest)
    request.CoordinatorKey = consumerGroup
    request.CoordinatorType = CoordinatorGroup
  response, err := broker.FindCoordinator(request)

调整消费组的地方会用到,admin.go

func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error)
    controller, err := ca.client.Coordinator(group)
    response, err := broker.DescribeGroups(&DescribeGroupsRequest{
      Groups: brokerGroups,
    })
func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
    return coordinator.FetchOffset(request)
func (ca *clusterAdmin) DeleteConsumerGroup(group string) error
    coordinator, err := ca.client.Coordinator(group)
    resp, err := coordinator.DeleteGroups(request)

创建session的时候也会用到consumer_group.go

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
  coordinator, err := c.client.Coordinator(c.groupID)
  return c.newSession(ctx, topics, handler, retries)
}

离开也会用到

func (c *consumerGroup) leave() error {
  c.lock.Lock()
  defer c.lock.Unlock()
  if c.memberID == "" {
    return nil
  }


  coordinator, err := c.client.Coordinator(c.groupID)

心跳维持

func (s *consumerGroupSession) heartbeatLoop() {
  for {
    coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
    }
}

3,any 任意broker

func (client *client) any() *Broker
  _ = client.seedBrokers[0].Open(client.conf)
  _ = broker.Open(client.conf)

4,Leader

写相关的 *** 作都是先写到leader partation,然后同步到副本分区。

func (client *client) Leader(topic string, partitionID int32) (*Broker, error)
    leader, err := client.cachedLeader(topic, partitionID)
func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error)
  _ = b.Open(client.conf)

使用的地方如下:

admin.go

func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error  

async_producer.go

func (pp *partitionProducer) dispatch() 
func (pp *partitionProducer) updateLeader() error
func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) 

consumer.go

func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) 
    child.broker = c.refBrokerConsumer(leader)
      bc = c.newBrokerConsumer(broker)
func (child *partitionConsumer) preferredBroker() (*Broker, error)  

上面就是client的四个角色和应用场景,理解他们的含义对理解kafka和sarama有重要的意义。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5682118.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存