日常业务开发很重要、很常用的一章
提纲:如何使用Kafka生产者;如何创建KafkaProducer、ProducerRecords;如何将记录发给Kafka;如何处理从Kafka返回的错误;一些配置项;不同的分区方法、序列化器,以及它们的自定义。
Kafka提供了生产者的API。
ProducerRecord → 序列化器→分区器 → 批次→发送→broker→broker响应
【3.2 创建Kafka生产者】
bootstrap.servers
连接的broker地址,格式 host:port,多个用英文逗号分隔。只要配一个broker,生产者就能根据这个线索找到其他broker。建议至少配俩,搞个备用。
key.serializer
前面说过,broker接受的键值都是字节数组,所以java对象要转换。
value.serializer
值的序列化,必须要指定序列化器
【ProducerRecord 】含义: 发送给Kafka Broker的key/value 值对;
这玩意儿就是被发的消息全貌,叫生产者记录
内部数据结构:
-- Topic (名字)
-- PartitionID ( 可选)
-- Key( 可选 )
-- Value
生产者记录(简称PR)的发送逻辑:
<1> 若指定Partition ID,则PR被发送至指定Partition
<2> 若未指定Partition ID,但指定了Key, PR会按照hasy(key)发送至对应Partition
<3> 若既未指定Partition ID也没指定Key,PR会按照round-robin模式发送到每个Partition
<4> 若同时指定了Partition ID和Key, PR只会发送到指定的Partition (Key不起作用,代码逻辑决定)
生产者记录(PR)的实现:
针对发送逻辑,提供三种构造函数形参:
-- ProducerRecord(topic, partition, key, value)
-- ProducerRecord(topic, key, value)
-- ProducerRecord(topic, value)
【发送消息 3种方式】
发送并忘记
—— 只管发,不管到了没有
同步发送
—— send()以后,返回一个Future对象,用get()等待返回的Recordmetadata()对象,对象里有offset;若返回不允许重试的异常or超过重试次数,就会抛异常。
比如无主no leader异常可以通过为分区选首领解决,所以是可重试异常;
连接异常,通过重新再次建立连接解决,是可重试异常;
消息过大,不重试
异步发送
—— send(),指定一个回调函数,服务器响应时调用这个函数;
发消息更快,
注意看第4点,send()的时候,传了一个回调的对象
【3.4 生产者的配置】
从第37页开始,写了12个配置项,用到的时候再看
【3.5 序列化器】
Kafka提供了默认的字符串、整型、字节数组的序列化器,
【3.5.1 自定义序列化器】——不建议使用
这里说怎么自定义开发序列化器。
讲道理,序列化器,这个词儿真拗口。
有几个序列化框架,Avro(强烈推荐)、Thrift、Protobuf、或自定义。
自定义的话,就比如,把入参模型的A字段序列化成4字节整数、B字段序列化成4字节整数……
如果模型变更,序列化方法就得重写。
【3.5.2 使用Avro序列化】
Apache Avro,一种与编程语言无关的序列化格式。数据被序列化成二进制文件(常用)/JSON文件。他的scheme通过JSON来描述。
后面没看懂。
【3.6 分区】
没有指定key的时候,消息会被随机的发到topic的各个分区上。分区器来干这个活,它用轮询Round Robin算法将消息均衡的各个分区上。
指定key的时候,且使用默认分区器,Kafka会对key进行散列,用的是Kafka自己的散列算法,所以不受Java版本的影响。然后把消息映射到特定分区,同一个key总是被映射到同一个分区上。
获取的是所有的分区,不只是可用的分区,所以如果有的分区挂了,就会发生错误。
分区数量不变时,key和partition之间的映射就不变。如果分区增加了,可能用户0123的旧数据在分区34,新记录写到分区99了。
可以搞自定义分区策略,比如把key为Banana的消息固定写入最后个分区。我当前没有看到这样使用的场景。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)