git 地址 -- GitHub - segmentio/kafka-go: Kafka library in Go
Dialer类型的主要作用是创建与kafka集群的连接。通过分析它的行为,可知它是建立在底层socket之上的,同时它也是作为上层api reader的一个基础组件。
物理连接上,它考虑了超时控制,安全连接tls,认证sasl,域名转换,重试机制等。
它的另一个重要作用是建立与指定topic partition leader 对应的broker的连接。
源码分析 类型type Dialer struct {
ClientID string // 唯一id
// 创建底层socket的连接函数,如果不设置使用net包的相应函数
DialFunc func(ctx context.Context, network string, address string) (net.Conn, error)
// 超时时间
Timeout time.Duration
// 过期时间
Deadline time.Time
// 本地地址
LocalAddr net.Addr
DualStack bool
FallbackDelay time.Duration
// 探活时间间隔
KeepAlive time.Duration
// 域名转换器
Resolver Resolver
// TLS 安全连接配置,后续再做分析
TLS *tls.Config
// SASL 认证机制,后续再做分析
SASLMechanism sasl.Mechanism
// 事务性id,后续再做分析
TransactionalID string
}
核心函数
1、建立和broker的连接,并包装生成kafka connection
func (d *Dialer) connect(ctx context.Context, network, address string, connCfg ConnConfig) (*Conn, error) {
if d.Timeout != 0 { // 设置context的过期时间
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, d.Timeout)
defer cancel()
}
if !d.Deadline.IsZero() { // 设置context的过期时间
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, d.Deadline)
defer cancel()
}
c, err := d.dialContext(ctx, network, address) // 建立socket连接,见后
if err != nil {
return nil, err
}
conn := NewConnWith(c, connCfg) // 根据配置包装成kafka connection
if d.SASLMechanism != nil { //权限验证
host, port, err := splitHostPortNumber(address)
if err != nil {
return nil, err
}
metadata := &sasl.Metadata{
Host: host,
Port: port,
}
if err := d.authenticateSASL(sasl.WithMetadata(ctx, metadata), conn); err != nil {
_ = conn.Close()
return nil, err
}
}
return conn, nil
}
1.1 建立socket连接
func (d *Dialer) dialContext(ctx context.Context, network string, addr string) (net.Conn, error) {
address, err := lookupHost(ctx, addr, d.Resolver) // 如果提供的是域名,根据域名解析出ip地址
if err != nil {
return nil, err
}
dial := d.DialFunc
if dial == nil { // 如果没有设置,使用默认的拨号函数
dial = (&net.Dialer{
LocalAddr: d.LocalAddr,
DualStack: d.DualStack,
FallbackDelay: d.FallbackDelay,
KeepAlive: d.KeepAlive,
}).DialContext
}
conn, err := dial(ctx, network, address)
if err != nil {
return nil, err
}
if d.TLS != nil { // 安全验证
。。。
}
return conn, nil
}
2 建立和指定topic&leader partition所对应的broker的连接
传入的address可以是kafka集群中任意broker的地址,因为所有broker上都有controller metadata的副本,因此可以获取到某一个特定topic partition leader对应的broker地址
func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error) {
p, err := d.LookupPartition(ctx, network, address, topic, partition) // 查询特定partition leader的信息
if err != nil {
return nil, err
}
return d.DialPartition(ctx, network, address, p) // 连接该broker
}
2.1 查询特定partition leader的信息
func (d *Dialer) LookupPartition(ctx context.Context, network string, address string, topic string, partition int) (Partition, error) {
c, err := d.DialContext(ctx, network, address) // 连接broker
if err != nil {
return Partition{}, err
}
defer c.Close()
brkch := make(chan Partition, 1)
errch := make(chan error, 1)
go func() {
for attempt := 0; true; attempt++ { // 添加重试
if attempt != 0 {
if !sleep(ctx, backoff(attempt, 100*time.Millisecond, 10*time.Second)) { // 避免重试对服务器的压力
errch <- ctx.Err()
return
}
}
partitions, err := c.ReadPartitions(topic) // 读取topic对应的partition内容,后续分析
if err != nil {
if isTemporary(err) { // 可重试的错误
continue
}
errch <- err
return
}
for _, p := range partitions {
if p.ID == partition { // 找到对应的patition
brkch <- p
return
}
}
}
errch <- UnknownTopicOrPartition
}()
var prt Partition
select {
case prt = <-brkch:
case err = <-errch:
case <-ctx.Done():
err = ctx.Err()
}
return prt, err
}
2.2 建立与partition leader对应broker的连接
func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) {
return d.connect(ctx, network, net.JoinHostPort(partition.Leader.Host, strconv.Itoa(partition.Leader.Port)), ConnConfig{
ClientID: d.ClientID,
Topic: partition.Topic,
Partition: partition.ID,
Broker: partition.Leader.ID,
Rack: partition.Leader.Rack,
TransactionalID: d.TransactionalID,
})
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)