kafka专题笔记 - 生产者

kafka专题笔记 - 生产者,第1张

kafka专题笔记 - 生产者 Producer实例构建

下面的代码示例,使用sarama包创建一个producer实例

import (
	"fmt"
	"github.com/Shopify/sarama"
)

var (
	brokerList = "localhost:9092"
	topic      = "localTestTopic"
	config     = initConfig()
	producer   = initProducer()
)

// @title: initConfig
// @description: 初始化生产者配置
func initConfig() (config *sarama.Config) {
	config = sarama.NewConfig()
	// 生产者ACK方式:
	// 	WaitForAll = -1: 需要`min.insync.replicas`的ISR-replicas确认
	// 	NoResponse = 0: 不等待响应
	// 	WaitForLocal = 1: waits for only the local commit to succeed before responding. 不太理解什么是local commit????
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 是否等待成功和失败后的响应,这里需要RequiredAcks != NoResponse
	config.Producer.Return.Successes = true
	config.Producer.Errors = true
	return config
}

// @title: initSyncProducer
// @description: 初始化一个同步生产者
func initSyncProducer() (producer sarama.SyncProducer) {
	producer, err := sarama.NewSyncProducer([]string{brokerList}, config)
	if err != nil {
		panic(err)
	}
	return producer
}

kafka的Producer实例是线程安全的,所以可以在多个线程中共享Producer实例,也可以池化生产者实例供其他线程调用

消息的发送

发送消息代码示例:

// @title: NewMessage
// @description: 生成一个message消息
func NewMessage(value string) *sarama.ProducerMessage {
	msg := &sarama.ProducerMessage{}
	msg.Topic = topic
	msg.Value = sarama.StringEncoder(value)
	return msg
}

func testSendMsg() {
	// acks = NoResponse => offset = 0
	partition, offset, err := producer.SendMessage(NewMessage("test message"))
	if err != nil {
		panic(err)
	}
	fmt.Println(partition, offset)
}

发送消息需要初始化一个消息结构体,消息结构体的定义如下:

// 消息的数据结构
type ProducerMessage struct {
	Topic string
	Partition int32

	Key Encoder
	Value Encoder
	
	Headers []RecordHeader
	metadata interface{}
	Timestamp time.Time
}
  1. Topic:: 消息要发往的主题
  2. Partition: 消息要发往的分区号
  3. Key: 指定消息的键,可以用来计算分区号,从而让消息发送指定的分区,同一个key的消息会被划分到同一个分区
  4. Value: 消息的值,即消息主题内容;这里使用Encoder接口,所以在构建消息的时候,需要通过StringEncoder(value)将字符串构建成满足接口的类型,另外还有ByteEncoder
  5. Timestamp: 消息的时间戳
生产者整体架构 架构模型图

拦截器

拦截器分为生产者拦截器和消费者拦截器。拦截器在消息经过序列化和分区之前进行,主要可以对消息做一些过滤或者修改消息内容。拦截器往往不是必需的。

官方拦截器主要实现三个方法:

type ProducerInterceptor interface {
	func onSend(*ProducerMessage)     // 在消息序列化和计算分区执行onSend()方法,对消息进行一些额外处理,如增加消息前缀、统计消息数量等
	func onAcknowledgement(*ProducerMessage) // 在消息被应答之前或者发送失败时调用onAcknowledgement()方法
	func close() // 主要用于在关闭拦截器时执行一些资源的清理工作
}

在golang的sarama包中,拦截器接口只定义了一个onSend()方法,源码位于github.com/Shopify/sarama/interceptors.go

type ProducerInterceptor interface {
	onSend(*ProducerMessage)
}
自定义拦截器示例
// 自定义一个拦截器结果体,实现onSend()方法
type DefinedInterceptor struct{}

// @title: onSend
// @descriptor: 在消息值中增加前缀pref-
func (di *DefinedInterceptor) onSend(message *sarama.ProducerMessage) {
	value, _ := message.Value.(sarama.StringEncoder)
	message.Value = sarama.StringEncoder(fmt.Sprintf("pref-%s", string(value)))
}

// 将拦截器定义到生产者配置
func testInterceptor() {
	config.Producer.Interceptors = []sarama.ProducerInterceptor{&DefinedInterceptor{}}
}

func TestProducer() {
	// 测试拦截器,在发送消息前,将拦截器加在到生产者配置
	testInterceptor()
	testSendMsg()
}


==============消费结果=================
➜  bin kafka-console-consumer --bootstrap-server localhost:9092 --topic localTestTopic
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
pref-test message
序列化器

消息必须转换成字节数组才能在网络中传播,从客户端发送到kafka服务器,因此序列化是消息发送前的必经之路。

官方序列化器主要实现三个方法:

func configure()   // 配置当前类???不太明白
func serialize() []byte // 序列化 *** 作
func close() // 关闭序列化器

在golang的sarama包中,无显示的序列化器配置,而是通过消息的接口Ecoder接口实现数据的序列化. 在上述消息key/value的定义中,我们使用了StringEncoder,其实现如下:

type Encoder interface {
	Encode() ([]byte, error)
	Length() int
}

type StringEncoder string

func (s StringEncoder) Encode() ([]byte, error) {
	return []byte(s), nil
}

func (s StringEncoder) Length() int {
	return len(s)
}

同样golang还支持byteEncoder, 定义如下:

type ByteEncoder []byte

func (b ByteEncoder) Encode() ([]byte, error) {
	return b, nil
}

func (b ByteEncoder) Length() int {
	return len(b)
}
分区器

分区器的作用就是计算消息该发往哪个分区,注意,如果消息本身定义了partition字段指定分区,此时不需要分区器参与;其他情况都会经过分区器计算消息被发往的分区

官方定义中,分区器主要实现两个方法:

func partion() int // 计算消息被发往哪个分区,返回分区编号
func close()

golang的sarama包中,partitioner接口定义了分区器:

type Partitioner interface {
	
	Partition(message *ProducerMessage, numPartitions int32) (int32, error)
	RequiresConsistency() bool // sarama中增加这个方法,让接口定义key-> partition的映射关系是否不变
}
自定义分区器示例
// 自定义分区器
type DefinedPartitioner struct{}

// 极端自定义分区器,只发送1号分区
func (dp *DefinedPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
	return 1, nil
}

func (dp *DefinedPartitioner) RequiresConsistency() bool {
	return false
}

func getPartitioner(topic string) sarama.Partitioner {
	return &DefinedPartitioner{}
}

// 将分区器装配到生产者配置
func testPartitioner() {
	config.Producer.Partitioner = getPartitioner
}

func TestProducer() {
	// 测试拦截器
	testInterceptor()
	// 测试分区器
	testPartitioner()
	testSendMsg()
}

=================生产结果======================
GOROOT=/usr/local/go #gosetup
GOPATH=/Users/liushi/go #gosetup
/usr/local/go/bin/go build -i -o /private/var/folders/zd/b2mys8_11x59vgvfvpt4snb40000gn/T/___go_build_demo . #gosetup
/private/var/folders/zd/b2mys8_11x59vgvfvpt4snb40000gn/T/___go_build_demo
1 23
1 24
1 25
1 26
1 27
1 28
1 29
1 30
1 31
1 32

Process finished with exit code 0

可以看到,生产的消息都发送到了1号分区

消息累加器

消息累加器(RecordAccumulator)主要用于缓存消息以便Sender线程(客户端发送消息线程)可以批量发送消息,进而减少网络传输的资源消耗以提升性能

消息累加器缓存区的大小由buffer.memory参数配置,默认为32MB. 消息累加器为每个partition分区维护一个双端队列,队列的元素为ProducerBatch,一个ProducerBatch可以由一个或者多个Message,指一个消息批次,多个消息组合成一个批次发送,可以有效的减少网络传输次数,提高系统吞吐量。

ProducerBatch的大小和batch.size有关。一个消息Message写入缓存后,会从双端队列尾部找到最新的一个batch(如果没有,则新建一个),然后查看当前message是否可以写入这个Batch,可以则写入,不可以则再新建一个Batch。新建Batch时会查看当前Message的大小和batch.size的大小,如果Message较小,则新建ProducerBatch的大小为batch.size,且该Batch的内存可以复用,如果Message更大,则按Message大小创建新的ProducerBatch,这段Batch的内存不可复用

Sender线程

以上,是Producer线程发送消息的主要过程,消息被发送到消息累加器缓存后,由独立的Sender线程进行消息的拉取和传递。

InFlightRequest

InFlightRequest是一块缓冲区,缓存了已发往kafka cluster但还没收到响应的Request,从而了解每个Node的拥塞程度,并控制客户端与kafka cluster每个连接的最多缓存的请求数(由max.in.flight.requests.per.connection参数控制)

ProducerBatch => Request => KafkaCluster

Sender线程从消息累加器中获取(partition, ProducerBatch)的数据信息后,会对消息进行一系列处理:

  • 第一步,将partition信息转换为node节点信息,知道发送到哪个broker节点
  • 第二步,将ProducerBatch信息封装成Request
  • 第三步,将要发送给kafka cluster的Request先缓存入InFlightRequest缓冲区
  • 第四步,将消息发送kafka cluster
  • 第五步,等待kafka cluster的响应,并删除InFlightRequest的缓冲数据
  • 第六步,删除消息累加器中ProducerBatch对应的消息内容,回收或释放累加器缓存
重要的生产者参数
  1. acks: 指定分区中有多少个副本收到这条消息,生产者才能认为这条消息是发送成功的。重要参数,权衡效率和吞吐能力
  2. max.request.size: 生产者发送消息大小的最大值,默认为1MB。一般消息的上限即为1MB
  3. retries: 生产者在发送消息出错时的最大重试次数,默认为0不重试。
    • 注意,重试只在可重试错误下才会触发,比如网络中断错误,leader副本选举等可以触发重试,但是像消息体过大等重试无法解决的问题,生产者不会重试
  4. retry.backoff.ms: 两次重试之间的时间间隔
  5. compression.type: 消息的压缩方式,默认为空,不压缩。消息压缩是CPU/IO之间的抉择,压缩消息可以降低消息大小,减少网络贷款占用,但是会消耗服务器CPU
  6. linger.ms: 等待填充ProducerBatch的时间。
    • 生产者消息通过ProducerBatch为单位发送,但是如果长时间新消息大小无法填满一个Batch,则会造成消息滞后。因此,linger.ms用于定义一个Batch至多等待的时间,如果该时间内Batch仍没有填满,该Batch也会直接发送
    • linger.ms和TCP中的Nagle算法功能类似
  7. request.timeout.ms: 客户端等待响应的最大时间。如果发送的消息长时间收不到服务端响应,客户端则判定发送失败。
  8. buffer.memory: 消息累加器缓冲区的大小,默认32MB。缓冲区满后,生产者send()会阻塞。此时说明生产速度过剩
  9. max.block.ms: 生产阻塞时等待的最大时长。超过该时间后会触发超时报错
  10. batch.size: 每个ProducerBatch的大小

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/4684189.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存