kafka-go区分同步写与异步写。同步写能严格确保写入的顺序,因为在写成功之前它会block住应用程序,同时返回错误信息。有三种控制写入完成的时机,1是消息发送完成即返回,2是leader收到后即返回,3是isr收到后即返回,越往后数据的可靠性更高,它们均是通过配置参数来控制。异步写不用等返回结果,而是传入一个回调函数来接收处理返回结果(同步写也支持返回前回调)。异步写的性能更优异,而且在很多场景下(有一定的额外逻辑)也仍能保证数据的可靠性。
为了提升写的性能,无论是同步写还是异步写都是以batch的方式执行的。
写模型 代码核型类型:Writer, partitionWriter,batchQueue,writeBatch 。后文会逐一来介绍。
Writer直接暴露给应用程序使用的类型
类型type Writer struct {
Addr net.Addr // broker地址
Topic string
Balancer Balancer // 消息分发(partition)策略
MaxAttempts int // 投递最大重试次数
BatchSize int // 一次batch写入的最多消息条数
BatchBytes int64 // 一次batch写入的最大数据量
BatchTimeout time.Duration // 一次batch写入的最大间隔时间
ReadTimeout time.Duration
WriteTimeout time.Duration
// RequireNone (0) 发送出去就认为成功
// RequireOne (1) leader接收就返回
// RequireAll (-1) 等待所有ISR的返回结果
RequiredAcks RequiredAcks
Async bool // 异步写
Completion func(messages []Message, err error) // 回调函数
Compression Compression // 压缩方式
Transport RoundTripper // 底层数据传输类型
group sync.WaitGroup
mutex sync.Mutex
closed bool
writers map[topicPartition]*partitionWriter // 一个Writer会对应多个partition writer,它们和partition一一对应
once sync.Once
*writerStats // 状态记录
}
核心方法
1 写消息
func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
if !w.enter() { // flag标识,防止在写的过程中Writer被关闭
return io.ErrClosedPipe
}
defer w.leave()
if len(msgs) == 0 { // 无数据直接返回
return nil
}
balancer := w.balancer()
batchBytes := w.batchBytes()
for i := range msgs {
n := int64(msgs[i].size())
if n > batchBytes { // 一条数据量太大
return messageTooLarge(msgs, i)
}
}
assignments := make(map[topicPartition][]int32) // 使用它记录消息分配的结果
for i, msg := range msgs { // 对每条消息,确定它对应的topic/partition
topic, err := w.chooseTopic(msg) // 根据消息确定投递的topic
if err != nil {
return err
}
numPartitions, err := w.partitions(ctx, topic) // 确定该topic对应的partition数量
if err != nil {
return err
}
partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...) // 使用分发策略将消息确定投递到该topic中的某一个partition中
key := topicPartition{
topic: topic,
partition: int32(partition),
}
assignments[key] = append(assignments[key], int32(i))
}
batches := w.batchMessages(msgs, assignments) // 批量发送消息,核心函数,后文继续解释
if w.Async { // 异步情形下直接返回
return nil
}
done := ctx.Done()
hasErrors := false
for batch := range batches {
select {
case <-done: // 应用程序取消
return ctx.Err()
case <-batch.done: // 该batch完成发送的通知
if batch.err != nil {
hasErrors = true
}
}
}
if !hasErrors { // 无任何错误
return nil
}
werr := make(WriteErrors, len(msgs))
for batch, indexes := range batches { // 记录发送每一条消息的错误信息
for _, i := range indexes {
werr[i] = batch.err
}
}
return werr
}
2 批量写数据
func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 {
var batches map[*writeBatch][]int32
if !w.Async {
batches = make(map[*writeBatch][]int32, len(assignments))
}
w.mutex.Lock()
defer w.mutex.Unlock()
if w.writers == nil {
w.writers = map[topicPartition]*partitionWriter{}
}
for key, indexes := range assignments {
writer := w.writers[key] // 找到该partition对应的writer
if writer == nil {
writer = newPartitionWriter(w, key)
w.writers[key] = writer
}
wbatches := writer.writeMessages(messages, indexes) // 写消息,通过返回结果来判断发送结束的状态
for batch, idxs := range wbatches {
batches[batch] = idxs
}
}
return batches
}
partitionWriter
主要为Writer类型提供方法
类型type partitionWriter struct {
meta topicPartition
queue batchQueue // 已经写满放入队列的batch
mutex sync.Mutex
currBatch *writeBatch // 当前正在使用的batch,同时是和算法相关的一个指针
w *Writer // 拥有该partitionWriter的Writer实例
}
核心方法
1 创建
func newPartitionWriter(w *Writer, key topicPartition) *partitionWriter {
writer := &partitionWriter{
meta: key,
queue: newBatchQueue(10),
w: w,
}
w.spawn(writer.writeBatches) // 启动后台线程
return writer
}
2 后台goroutine循环写
func (ptw *partitionWriter) writeBatches() {
for {
batch := ptw.queue.Get() // 获取一个batch的requests
if batch == nil { // 退出机制
return
}
ptw.writeBatch(batch)
}
}
3 发送batch消息
func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
var res *ProduceResponse
var err error
key := ptw.meta
for attempt, maxAttempts := 0, ptw.w.maxAttempts(); attempt < maxAttempts; attempt++ {
if attempt != 0 {
。。。 // 重试处理
}
start := time.Now()
res, err = ptw.w.produce(key, batch) // 写数据
if err == nil {
break
}
}
if ptw.w.Completion != nil {
ptw.w.Completion(batch.msgs, err) // 回调通知应用程序
}
batch.complete(err) // 完成batch写入
}
4 暴露给Writer类型的写消息方法
func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*writeBatch][]int32 {
var batches map[*writeBatch][]int32
for _, i := range indexes {
assignMessage:
batch := ptw.currBatch
if batch == nil { // 需要创建新batch
batch = ptw.newWriteBatch()
ptw.currBatch = batch
}
if !batch.add(msgs[i], batchSize, batchBytes) { // 判断是否会导致 batch容量溢出
batch.trigger() // 关闭ready channel
ptw.queue.Put(batch)
ptw.currBatch = nil
goto assignMessage
}
if batch.full(batchSize, batchBytes) { // batch已满
batch.trigger()
ptw.queue.Put(batch)
ptw.currBatch = nil
}
if !ptw.w.Async { // 同步处理,应用程序需要等待该batch的写完成
batches[batch] = append(batches[batch], i)
}
}
return batches
}
5 创建一个batch
func (ptw *partitionWriter) newWriteBatch() *writeBatch {
batch := newWriteBatch(time.Now(), ptw.w.batchTimeout())
ptw.w.spawn(func() { ptw.awaitBatch(batch) })
return batch
}
6 等待batch结束
//Batch结束有两种方式,一是被消息写满,二是batch的生存时间到期了
func (ptw *partitionWriter) awaitBatch(batch *writeBatch) {
select {
case <-batch.timer.C: // 到时间了
ptw.mutex.Lock()
if ptw.currBatch == batch {
ptw.queue.Put(batch)
ptw.currBatch = nil
}
ptw.mutex.Unlock()
case <-batch.ready: // 消息已经写满
batch.timer.Stop() // 停止计时器
}
}
batchQueue
主要是writeBatch的队列形式,提供创建,添加,获取,关闭等方法,代码相对简单,这儿不作介绍
writeBatch 类型type writeBatch struct {
time time.Time // 创建时间
msgs []Message // 消息组
size int // 条数
bytes int64 // 容量
ready chan struct{} // 消息已经写满buffer的标识位
done chan struct{} // 消息已经完成写入的标识位
timer *time.Timer // 定时触发器
err error // 错误信息
}
核心方法
1 新建一个writeBatch
func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch {
return &writeBatch{
time: now,
ready: make(chan struct{}),
done: make(chan struct{}),
timer: time.NewTimer(timeout),
}
}
2 添加一条消息
func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool {
bytes := int64(msg.size())
if b.size > 0 && (b.bytes+bytes) > maxBytes {
return false
}
if cap(b.msgs) == 0 {
b.msgs = make([]Message, 0, maxSize)
}
b.msgs = append(b.msgs, msg)
b.size++
b.bytes += bytes
return true
}
3 判断batch是否写满
func (b *writeBatch) full(maxSize int, maxBytes int64) bool {
return b.size >= maxSize || b.bytes >= maxBytes
}
4 将batch放入queue中排队,等待写入
func (b *writeBatch) trigger() {
close(b.ready)
}
5 完成batch写后的通知
func (b *writeBatch) complete(err error) {
b.err = err
close(b.done)
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)