下面的代码示例,使用sarama包创建一个producer实例
import ( "fmt" "github.com/Shopify/sarama" ) var ( brokerList = "localhost:9092" topic = "localTestTopic" config = initConfig() producer = initProducer() ) // @title: initConfig // @description: 初始化生产者配置 func initConfig() (config *sarama.Config) { config = sarama.NewConfig() // 生产者ACK方式: // WaitForAll = -1: 需要`min.insync.replicas`的ISR-replicas确认 // NoResponse = 0: 不等待响应 // WaitForLocal = 1: waits for only the local commit to succeed before responding. 不太理解什么是local commit???? config.Producer.RequiredAcks = sarama.WaitForAll // 是否等待成功和失败后的响应,这里需要RequiredAcks != NoResponse config.Producer.Return.Successes = true config.Producer.Errors = true return config } // @title: initSyncProducer // @description: 初始化一个同步生产者 func initSyncProducer() (producer sarama.SyncProducer) { producer, err := sarama.NewSyncProducer([]string{brokerList}, config) if err != nil { panic(err) } return producer }
kafka的Producer实例是线程安全的,所以可以在多个线程中共享Producer实例,也可以池化生产者实例供其他线程调用
消息的发送发送消息代码示例:
// @title: NewMessage // @description: 生成一个message消息 func NewMessage(value string) *sarama.ProducerMessage { msg := &sarama.ProducerMessage{} msg.Topic = topic msg.Value = sarama.StringEncoder(value) return msg } func testSendMsg() { // acks = NoResponse => offset = 0 partition, offset, err := producer.SendMessage(NewMessage("test message")) if err != nil { panic(err) } fmt.Println(partition, offset) }
发送消息需要初始化一个消息结构体,消息结构体的定义如下:
// 消息的数据结构 type ProducerMessage struct { Topic string Partition int32 Key Encoder Value Encoder Headers []RecordHeader metadata interface{} Timestamp time.Time }
- Topic:: 消息要发往的主题
- Partition: 消息要发往的分区号
- Key: 指定消息的键,可以用来计算分区号,从而让消息发送指定的分区,同一个key的消息会被划分到同一个分区
- Value: 消息的值,即消息主题内容;这里使用Encoder接口,所以在构建消息的时候,需要通过StringEncoder(value)将字符串构建成满足接口的类型,另外还有ByteEncoder
- Timestamp: 消息的时间戳
拦截器分为生产者拦截器和消费者拦截器。拦截器在消息经过序列化和分区之前进行,主要可以对消息做一些过滤或者修改消息内容。拦截器往往不是必需的。
官方拦截器主要实现三个方法:
type ProducerInterceptor interface { func onSend(*ProducerMessage) // 在消息序列化和计算分区执行onSend()方法,对消息进行一些额外处理,如增加消息前缀、统计消息数量等 func onAcknowledgement(*ProducerMessage) // 在消息被应答之前或者发送失败时调用onAcknowledgement()方法 func close() // 主要用于在关闭拦截器时执行一些资源的清理工作 }
在golang的sarama包中,拦截器接口只定义了一个onSend()方法,源码位于github.com/Shopify/sarama/interceptors.go
type ProducerInterceptor interface { onSend(*ProducerMessage) }自定义拦截器示例
// 自定义一个拦截器结果体,实现onSend()方法 type DefinedInterceptor struct{} // @title: onSend // @descriptor: 在消息值中增加前缀pref- func (di *DefinedInterceptor) onSend(message *sarama.ProducerMessage) { value, _ := message.Value.(sarama.StringEncoder) message.Value = sarama.StringEncoder(fmt.Sprintf("pref-%s", string(value))) } // 将拦截器定义到生产者配置 func testInterceptor() { config.Producer.Interceptors = []sarama.ProducerInterceptor{&DefinedInterceptor{}} } func TestProducer() { // 测试拦截器,在发送消息前,将拦截器加在到生产者配置 testInterceptor() testSendMsg() } ==============消费结果================= ➜ bin kafka-console-consumer --bootstrap-server localhost:9092 --topic localTestTopic pref-test message pref-test message pref-test message pref-test message pref-test message pref-test message pref-test message pref-test message pref-test message pref-test message序列化器
消息必须转换成字节数组才能在网络中传播,从客户端发送到kafka服务器,因此序列化是消息发送前的必经之路。
官方序列化器主要实现三个方法:
func configure() // 配置当前类???不太明白 func serialize() []byte // 序列化 *** 作 func close() // 关闭序列化器
在golang的sarama包中,无显示的序列化器配置,而是通过消息的接口Ecoder接口实现数据的序列化. 在上述消息key/value的定义中,我们使用了StringEncoder,其实现如下:
type Encoder interface { Encode() ([]byte, error) Length() int } type StringEncoder string func (s StringEncoder) Encode() ([]byte, error) { return []byte(s), nil } func (s StringEncoder) Length() int { return len(s) }
同样golang还支持byteEncoder, 定义如下:
type ByteEncoder []byte func (b ByteEncoder) Encode() ([]byte, error) { return b, nil } func (b ByteEncoder) Length() int { return len(b) }分区器
分区器的作用就是计算消息该发往哪个分区,注意,如果消息本身定义了partition字段指定分区,此时不需要分区器参与;其他情况都会经过分区器计算消息被发往的分区
官方定义中,分区器主要实现两个方法:
func partion() int // 计算消息被发往哪个分区,返回分区编号 func close()
golang的sarama包中,partitioner接口定义了分区器:
type Partitioner interface { Partition(message *ProducerMessage, numPartitions int32) (int32, error) RequiresConsistency() bool // sarama中增加这个方法,让接口定义key-> partition的映射关系是否不变 }自定义分区器示例
// 自定义分区器 type DefinedPartitioner struct{} // 极端自定义分区器,只发送1号分区 func (dp *DefinedPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) { return 1, nil } func (dp *DefinedPartitioner) RequiresConsistency() bool { return false } func getPartitioner(topic string) sarama.Partitioner { return &DefinedPartitioner{} } // 将分区器装配到生产者配置 func testPartitioner() { config.Producer.Partitioner = getPartitioner } func TestProducer() { // 测试拦截器 testInterceptor() // 测试分区器 testPartitioner() testSendMsg() } =================生产结果====================== GOROOT=/usr/local/go #gosetup GOPATH=/Users/liushi/go #gosetup /usr/local/go/bin/go build -i -o /private/var/folders/zd/b2mys8_11x59vgvfvpt4snb40000gn/T/___go_build_demo . #gosetup /private/var/folders/zd/b2mys8_11x59vgvfvpt4snb40000gn/T/___go_build_demo 1 23 1 24 1 25 1 26 1 27 1 28 1 29 1 30 1 31 1 32 Process finished with exit code 0
可以看到,生产的消息都发送到了1号分区
消息累加器消息累加器(RecordAccumulator)主要用于缓存消息以便Sender线程(客户端发送消息线程)可以批量发送消息,进而减少网络传输的资源消耗以提升性能
消息累加器缓存区的大小由buffer.memory参数配置,默认为32MB. 消息累加器为每个partition分区维护一个双端队列,队列的元素为ProducerBatch,一个ProducerBatch可以由一个或者多个Message,指一个消息批次,多个消息组合成一个批次发送,可以有效的减少网络传输次数,提高系统吞吐量。
ProducerBatch的大小和batch.size有关。一个消息Message写入缓存后,会从双端队列尾部找到最新的一个batch(如果没有,则新建一个),然后查看当前message是否可以写入这个Batch,可以则写入,不可以则再新建一个Batch。新建Batch时会查看当前Message的大小和batch.size的大小,如果Message较小,则新建ProducerBatch的大小为batch.size,且该Batch的内存可以复用,如果Message更大,则按Message大小创建新的ProducerBatch,这段Batch的内存不可复用
Sender线程以上,是Producer线程发送消息的主要过程,消息被发送到消息累加器缓存后,由独立的Sender线程进行消息的拉取和传递。
InFlightRequestInFlightRequest是一块缓冲区,缓存了已发往kafka cluster但还没收到响应的Request,从而了解每个Node的拥塞程度,并控制客户端与kafka cluster每个连接的最多缓存的请求数(由max.in.flight.requests.per.connection参数控制)
ProducerBatch => Request => KafkaClusterSender线程从消息累加器中获取(partition, ProducerBatch)的数据信息后,会对消息进行一系列处理:
- 第一步,将partition信息转换为node节点信息,知道发送到哪个broker节点
- 第二步,将ProducerBatch信息封装成Request
- 第三步,将要发送给kafka cluster的Request先缓存入InFlightRequest缓冲区
- 第四步,将消息发送kafka cluster
- 第五步,等待kafka cluster的响应,并删除InFlightRequest的缓冲数据
- 第六步,删除消息累加器中ProducerBatch对应的消息内容,回收或释放累加器缓存
- acks: 指定分区中有多少个副本收到这条消息,生产者才能认为这条消息是发送成功的。重要参数,权衡效率和吞吐能力
- max.request.size: 生产者发送消息大小的最大值,默认为1MB。一般消息的上限即为1MB
- retries: 生产者在发送消息出错时的最大重试次数,默认为0不重试。
- 注意,重试只在可重试错误下才会触发,比如网络中断错误,leader副本选举等可以触发重试,但是像消息体过大等重试无法解决的问题,生产者不会重试
- retry.backoff.ms: 两次重试之间的时间间隔
- compression.type: 消息的压缩方式,默认为空,不压缩。消息压缩是CPU/IO之间的抉择,压缩消息可以降低消息大小,减少网络贷款占用,但是会消耗服务器CPU
- linger.ms: 等待填充ProducerBatch的时间。
- 生产者消息通过ProducerBatch为单位发送,但是如果长时间新消息大小无法填满一个Batch,则会造成消息滞后。因此,linger.ms用于定义一个Batch至多等待的时间,如果该时间内Batch仍没有填满,该Batch也会直接发送
- linger.ms和TCP中的Nagle算法功能类似
- request.timeout.ms: 客户端等待响应的最大时间。如果发送的消息长时间收不到服务端响应,客户端则判定发送失败。
- buffer.memory: 消息累加器缓冲区的大小,默认32MB。缓冲区满后,生产者send()会阻塞。此时说明生产速度过剩
- max.block.ms: 生产阻塞时等待的最大时长。超过该时间后会触发超时报错
- batch.size: 每个ProducerBatch的大小
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)