golang源码分析:sarama kafka client(part III:client的角色) - 墨天轮
理解client的角色对我们理解kafka和sarama非常有帮助。下面将一一详细介绍:
我们用到了各种各样的client,返回的对象都是一个Broker的指针,本质上讲,我们通过kafka client 最终都是和broker通信,所以用Broker对象封装和kafka的连接,表示Client。不同场景下,Client有不同的角色,角色是通过元数据来确定的。
func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) metadata:= client.cachedmetadata(topic, partitionID) err := client.Refreshmetadata(topic)
func (client *client) Refreshmetadata(topics ...string) error return client.tryRefreshmetadata(topics, client.conf.metadata.Retry.Max, deadline)
func (client *client) tryRefreshmetadata(topics []string, attemptsRemaining int, deadline time.Time) error broker := client.any() response, err := broker.Getmetadata(req) err := b.sendAndReceive(request, response) shouldRetry, err := client.updatemetadata(response, allKnownmetaData) client.deregisterBroker(broker) _ = broker.Close()
可以看到,获取元数据的过程是随机选择一个broker(没有元数据不知道身份),然后获取元数据,存储下来。
1,Controller
首先是Controller,Kafka集群中,首先会选举出一个broker作为controller,然后该controller负责跟其他broker进行协调topic创建,partition主副本选举,topic删除等事务
func (client *client) Controller() (*Broker, error){ client.refreshmetadata() controller := client.cachedController() _ = controller.Open(client.conf) }
func (client *client) cachedController() *Broker { return client.brokers[client.controllerID] }
通过元数据,获取controllerId,然后通过controllerID找到Controller,和前面介绍的一样,topic和partation相关的增删 *** 作会用到controller,主要在admin.go中
func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) _, err := client.Controller() func (ca *clusterAdmin) Controller() (*Broker, error) return ca.client.Controller() func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error b, err := ca.Controller() func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*Topicmetadata, err error) controller, err := ca.Controller() func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error b, err := ca.Controller() func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error b, err := ca.Controller()
2,Coordinator
每个KafkaServer都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。
func (client *client) Coordinator(consumerGroup string) (*Broker, error) _ = coordinator.Open(client.conf)
获取元数据后,就会获取Coordinator
func (client *client) getConsumermetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) broker := client.any(); request := new(FindCoordinatorRequest) request.CoordinatorKey = consumerGroup request.CoordinatorType = CoordinatorGroup response, err := broker.FindCoordinator(request)
调整消费组的地方会用到,admin.go
func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) controller, err := ca.client.Coordinator(group) response, err := broker.DescribeGroups(&DescribeGroupsRequest{ Groups: brokerGroups, }) func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) return coordinator.FetchOffset(request) func (ca *clusterAdmin) DeleteConsumerGroup(group string) error coordinator, err := ca.client.Coordinator(group) resp, err := coordinator.DeleteGroups(request)
创建session的时候也会用到consumer_group.go
func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) { coordinator, err := c.client.Coordinator(c.groupID) return c.newSession(ctx, topics, handler, retries) }
离开也会用到
func (c *consumerGroup) leave() error { c.lock.Lock() defer c.lock.Unlock() if c.memberID == "" { return nil } coordinator, err := c.client.Coordinator(c.groupID)
心跳维持
func (s *consumerGroupSession) heartbeatLoop() { for { coordinator, err := s.parent.client.Coordinator(s.parent.groupID) } }
3,any 任意broker
func (client *client) any() *Broker _ = client.seedBrokers[0].Open(client.conf) _ = broker.Open(client.conf)
4,Leader
写相关的 *** 作都是先写到leader partation,然后同步到副本分区。
func (client *client) Leader(topic string, partitionID int32) (*Broker, error) leader, err := client.cachedLeader(topic, partitionID)
func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error) _ = b.Open(client.conf)
使用的地方如下:
admin.go
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error
async_producer.go
func (pp *partitionProducer) dispatch() func (pp *partitionProducer) updateLeader() error func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError)
consumer.go
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) child.broker = c.refBrokerConsumer(leader) bc = c.newBrokerConsumer(broker) func (child *partitionConsumer) preferredBroker() (*Broker, error)
上面就是client的四个角色和应用场景,理解他们的含义对理解kafka和sarama有重要的意义。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)