kafka-go源码解析一(Dialer)

kafka-go源码解析一(Dialer),第1张

概要

git 地址 -- GitHub - segmentio/kafka-go: Kafka library in Go

Dialer类型的主要作用是创建与kafka集群的连接。通过分析它的行为,可知它是建立在底层socket之上的,同时它也是作为上层api reader的一个基础组件。

物理连接上,它考虑了超时控制,安全连接tls,认证sasl,域名转换,重试机制等。

它的另一个重要作用是建立与指定topic partition leader 对应的broker的连接。

源码分析 类型
type Dialer struct {
	ClientID string  // 唯一id

	// 创建底层socket的连接函数,如果不设置使用net包的相应函数
	DialFunc func(ctx context.Context, network string, address string) (net.Conn, error)

	// 超时时间
	Timeout time.Duration

	// 过期时间
	Deadline time.Time

	// 本地地址
	LocalAddr net.Addr

	DualStack bool
	FallbackDelay time.Duration

	// 探活时间间隔
	KeepAlive time.Duration

	// 域名转换器
	Resolver Resolver

	// TLS 安全连接配置,后续再做分析
	TLS *tls.Config

	// SASL 认证机制,后续再做分析
	SASLMechanism sasl.Mechanism

	// 事务性id,后续再做分析
	TransactionalID string
}
核心函数 1、建立和broker的连接,并包装生成kafka connection
func (d *Dialer) connect(ctx context.Context, network, address string, connCfg ConnConfig) (*Conn, error) {
	if d.Timeout != 0 { // 设置context的过期时间
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, d.Timeout)
		defer cancel()
	}

	if !d.Deadline.IsZero() { // 设置context的过期时间
		var cancel context.CancelFunc
		ctx, cancel = context.WithDeadline(ctx, d.Deadline)
		defer cancel()
	}

	c, err := d.dialContext(ctx, network, address) // 建立socket连接,见后
	if err != nil {
		return nil, err
	}

	conn := NewConnWith(c, connCfg) // 根据配置包装成kafka connection

	if d.SASLMechanism != nil { //权限验证
		host, port, err := splitHostPortNumber(address)
		if err != nil {
			return nil, err
		}
		metadata := &sasl.Metadata{
			Host: host,
			Port: port,
		}
		if err := d.authenticateSASL(sasl.WithMetadata(ctx, metadata), conn); err != nil {
			_ = conn.Close()
			return nil, err
		}
	}

	return conn, nil
}

1.1 建立socket连接

func (d *Dialer) dialContext(ctx context.Context, network string, addr string) (net.Conn, error) {
	address, err := lookupHost(ctx, addr, d.Resolver) // 如果提供的是域名,根据域名解析出ip地址
	if err != nil {
		return nil, err
	}

	dial := d.DialFunc
	if dial == nil { // 如果没有设置,使用默认的拨号函数
		dial = (&net.Dialer{
			LocalAddr:     d.LocalAddr,
			DualStack:     d.DualStack,
			FallbackDelay: d.FallbackDelay,
			KeepAlive:     d.KeepAlive,
		}).DialContext
	}

	conn, err := dial(ctx, network, address)
	if err != nil {
		return nil, err
	}

	if d.TLS != nil { // 安全验证
		。。。
	}

	return conn, nil
}
2 建立和指定topic&leader partition所对应的broker的连接

传入的address可以是kafka集群中任意broker的地址,因为所有broker上都有controller metadata的副本,因此可以获取到某一个特定topic partition leader对应的broker地址

func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error) {
	p, err := d.LookupPartition(ctx, network, address, topic, partition) // 查询特定partition leader的信息
	if err != nil {
		return nil, err
	}
	return d.DialPartition(ctx, network, address, p) // 连接该broker
}

2.1 查询特定partition leader的信息

func (d *Dialer) LookupPartition(ctx context.Context, network string, address string, topic string, partition int) (Partition, error) {
	c, err := d.DialContext(ctx, network, address) // 连接broker
	if err != nil {
		return Partition{}, err
	}
	defer c.Close()

	brkch := make(chan Partition, 1)
	errch := make(chan error, 1)

	go func() {
		for attempt := 0; true; attempt++ { // 添加重试
			if attempt != 0 {
				if !sleep(ctx, backoff(attempt, 100*time.Millisecond, 10*time.Second)) { // 避免重试对服务器的压力
					errch <- ctx.Err()
					return
				}
			}

			partitions, err := c.ReadPartitions(topic) // 读取topic对应的partition内容,后续分析
			if err != nil {
				if isTemporary(err) { // 可重试的错误
					continue
				}
				errch <- err
				return
			}

			for _, p := range partitions {
				if p.ID == partition { // 找到对应的patition
					brkch <- p
					return
				}
			}
		}

		errch <- UnknownTopicOrPartition
	}()

	var prt Partition
	select {
	case prt = <-brkch:
	case err = <-errch:
	case <-ctx.Done():
		err = ctx.Err()
	}
	return prt, err
}

2.2 建立与partition leader对应broker的连接

func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) {
	return d.connect(ctx, network, net.JoinHostPort(partition.Leader.Host, strconv.Itoa(partition.Leader.Port)), ConnConfig{
		ClientID:        d.ClientID,
		Topic:           partition.Topic,
		Partition:       partition.ID,
		Broker:          partition.Leader.ID,
		Rack:            partition.Leader.Rack,
		TransactionalID: d.TransactionalID,
	})
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/994069.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存