kafka-go源码解析四(Writer)

kafka-go源码解析四(Writer),第1张

概要

kafka-go区分同步写与异步写。同步写能严格确保写入的顺序,因为在写成功之前它会block住应用程序,同时返回错误信息。有三种控制写入完成的时机,1是消息发送完成即返回,2是leader收到后即返回,3是isr收到后即返回,越往后数据的可靠性更高,它们均是通过配置参数来控制。异步写不用等返回结果,而是传入一个回调函数来接收处理返回结果(同步写也支持返回前回调)。异步写的性能更优异,而且在很多场景下(有一定的额外逻辑)也仍能保证数据的可靠性。

为了提升写的性能,无论是同步写还是异步写都是以batch的方式执行的。

写模型

代码

核型类型:Writer, partitionWriter,batchQueue,writeBatch 。后文会逐一来介绍。

Writer

直接暴露给应用程序使用的类型

类型
type Writer struct {
	Addr net.Addr // broker地址

	Topic string

	Balancer Balancer // 消息分发(partition)策略

	MaxAttempts int // 投递最大重试次数

	BatchSize int // 一次batch写入的最多消息条数

	BatchBytes int64 // 一次batch写入的最大数据量

	BatchTimeout time.Duration // 一次batch写入的最大间隔时间

	ReadTimeout time.Duration
	WriteTimeout time.Duration

	//  RequireNone (0)  发送出去就认为成功
	//  RequireOne  (1)  leader接收就返回
	//  RequireAll  (-1) 等待所有ISR的返回结果
	RequiredAcks RequiredAcks

	Async bool // 异步写

	Completion func(messages []Message, err error) // 回调函数

	Compression Compression // 压缩方式

	Transport RoundTripper // 底层数据传输类型

	group   sync.WaitGroup
	mutex   sync.Mutex
	closed  bool
	writers map[topicPartition]*partitionWriter // 一个Writer会对应多个partition writer,它们和partition一一对应

	once sync.Once
	*writerStats // 状态记录
}
核心方法

1 写消息

func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
	if !w.enter() { // flag标识,防止在写的过程中Writer被关闭
		return io.ErrClosedPipe
	}
	defer w.leave()

	if len(msgs) == 0 { // 无数据直接返回
		return nil
	}

	balancer := w.balancer()
	batchBytes := w.batchBytes()

	for i := range msgs {
		n := int64(msgs[i].size())
		if n > batchBytes { // 一条数据量太大
			return messageTooLarge(msgs, i)
		}
	}

	assignments := make(map[topicPartition][]int32) // 使用它记录消息分配的结果

	for i, msg := range msgs { // 对每条消息,确定它对应的topic/partition
		topic, err := w.chooseTopic(msg) // 根据消息确定投递的topic
		if err != nil {
			return err
		}

		numPartitions, err := w.partitions(ctx, topic) // 确定该topic对应的partition数量
		if err != nil {
			return err
		}

		partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...) // 使用分发策略将消息确定投递到该topic中的某一个partition中

		key := topicPartition{
			topic:     topic,
			partition: int32(partition),
		}

		assignments[key] = append(assignments[key], int32(i))
	}

	batches := w.batchMessages(msgs, assignments) // 批量发送消息,核心函数,后文继续解释
	if w.Async { // 异步情形下直接返回
		return nil
	}

	done := ctx.Done()
	hasErrors := false
	for batch := range batches {
		select {
		case <-done: // 应用程序取消
			return ctx.Err()
		case <-batch.done: // 该batch完成发送的通知
			if batch.err != nil {
				hasErrors = true
			}
		}
	}

	if !hasErrors { // 无任何错误
		return nil
	}

	werr := make(WriteErrors, len(msgs))

	for batch, indexes := range batches { // 记录发送每一条消息的错误信息
		for _, i := range indexes {
			werr[i] = batch.err
		}
	}
	return werr
}

2 批量写数据

func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 {
	var batches map[*writeBatch][]int32
	if !w.Async {
		batches = make(map[*writeBatch][]int32, len(assignments))
	}

	w.mutex.Lock()
	defer w.mutex.Unlock()

	if w.writers == nil {
		w.writers = map[topicPartition]*partitionWriter{}
	}

	for key, indexes := range assignments {
		writer := w.writers[key] // 找到该partition对应的writer
		if writer == nil {
			writer = newPartitionWriter(w, key)
			w.writers[key] = writer
		}
		wbatches := writer.writeMessages(messages, indexes) // 写消息,通过返回结果来判断发送结束的状态

		for batch, idxs := range wbatches {
			batches[batch] = idxs
		}
	}

	return batches
}
partitionWriter

主要为Writer类型提供方法

类型
type partitionWriter struct {
	meta  topicPartition
	queue batchQueue // 已经写满放入队列的batch

	mutex     sync.Mutex
	currBatch *writeBatch // 当前正在使用的batch,同时是和算法相关的一个指针

	w *Writer // 拥有该partitionWriter的Writer实例
}
核心方法

1 创建

func newPartitionWriter(w *Writer, key topicPartition) *partitionWriter {
	writer := &partitionWriter{
		meta:  key,
		queue: newBatchQueue(10),
		w:     w,
	}
	w.spawn(writer.writeBatches) // 启动后台线程
	return writer
}

2 后台goroutine循环写

func (ptw *partitionWriter) writeBatches() {
	for {
		batch := ptw.queue.Get() // 获取一个batch的requests

		if batch == nil { // 退出机制
			return
		}

		ptw.writeBatch(batch)
	}
}

3 发送batch消息

func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
	var res *ProduceResponse
	var err error
	key := ptw.meta
	for attempt, maxAttempts := 0, ptw.w.maxAttempts(); attempt < maxAttempts; attempt++ {
		if attempt != 0 {
。。。 // 重试处理
		}

		start := time.Now()
		res, err = ptw.w.produce(key, batch) // 写数据

		if err == nil {
			break
		}
	}

	if ptw.w.Completion != nil {
		ptw.w.Completion(batch.msgs, err) // 回调通知应用程序
	}

	batch.complete(err) // 完成batch写入
}

4 暴露给Writer类型的写消息方法

func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*writeBatch][]int32 {
	var batches map[*writeBatch][]int32
	for _, i := range indexes {
	assignMessage:
		batch := ptw.currBatch
		if batch == nil { // 需要创建新batch
			batch = ptw.newWriteBatch()
			ptw.currBatch = batch
		}
		if !batch.add(msgs[i], batchSize, batchBytes) { // 判断是否会导致 batch容量溢出
			batch.trigger() // 关闭ready channel
			ptw.queue.Put(batch)
			ptw.currBatch = nil
			goto assignMessage
		}

		if batch.full(batchSize, batchBytes) { // batch已满
			batch.trigger()
			ptw.queue.Put(batch)
			ptw.currBatch = nil
		}

		if !ptw.w.Async { // 同步处理,应用程序需要等待该batch的写完成
			batches[batch] = append(batches[batch], i)
		}
	}
	return batches
}

5 创建一个batch

func (ptw *partitionWriter) newWriteBatch() *writeBatch {
	batch := newWriteBatch(time.Now(), ptw.w.batchTimeout())
	ptw.w.spawn(func() { ptw.awaitBatch(batch) })
	return batch
}

等待batch结束

//Batch结束有两种方式,一是被消息写满,二是batch的生存时间到期了
func (ptw *partitionWriter) awaitBatch(batch *writeBatch) {
	select {
	case <-batch.timer.C: // 到时间了
		ptw.mutex.Lock()
		if ptw.currBatch == batch {
			ptw.queue.Put(batch)
			ptw.currBatch = nil
		}
		ptw.mutex.Unlock()
	case <-batch.ready: // 消息已经写满
		batch.timer.Stop() // 停止计时器
	}
}
batchQueue

主要是writeBatch的队列形式,提供创建,添加,获取,关闭等方法,代码相对简单,这儿不作介绍

writeBatch 类型
type writeBatch struct {
	time  time.Time // 创建时间
	msgs  []Message // 消息组
	size  int // 条数
	bytes int64 // 容量
	ready chan struct{} // 消息已经写满buffer的标识位
	done  chan struct{} // 消息已经完成写入的标识位
	timer *time.Timer // 定时触发器
	err   error // 错误信息
}
核心方法

1 新建一个writeBatch

func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch {
	return &writeBatch{
		time:  now,
		ready: make(chan struct{}),
		done:  make(chan struct{}),
		timer: time.NewTimer(timeout),
	}
}

添加一条消息

func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool {
	bytes := int64(msg.size())

	if b.size > 0 && (b.bytes+bytes) > maxBytes {
		return false
	}

	if cap(b.msgs) == 0 {
		b.msgs = make([]Message, 0, maxSize)
	}

	b.msgs = append(b.msgs, msg)
	b.size++
	b.bytes += bytes
	return true
}

3 判断batch是否写满

func (b *writeBatch) full(maxSize int, maxBytes int64) bool {
	return b.size >= maxSize || b.bytes >= maxBytes
}

4 将batch放入queue中排队,等待写入

func (b *writeBatch) trigger() {
	close(b.ready)
}

5 完成batch写后的通知

func (b *writeBatch) complete(err error) {
	b.err = err
	close(b.done)
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/994691.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存