3、Kafka生产者-向Kafka写入数据

3、Kafka生产者-向Kafka写入数据,第1张

发送消息的主要步骤

格式:每个消息是一个 ProducerRecord 对象, 必须指定 所属的 Topic和Value , 还可以指定Partition及Key

1:序列化 ProducerRecord

2:分区: 如指定Partition,不做任何事情;否则,Partitioner 根据key得到Partition 。生产者向哪个Partition发送

3:消息添加到相应 bach中 ,独立线程将batch 发到Broker上

4:broker收到消息响应 。 成功回RecordMetaData对象 ,包含了Topic信息、Patition信息、消息在Partition中的Offset信息; 失败返回错误

有序场景:不建议retries  0。可max.in.flight.requests.per.connection  1, 影响生产者吞吐量,但保证有序          ps: 同partition消息有序

三个 必选 的属性:

(1) bootstrap.servers ,broker地址清单

(2) key.serializer: 实现org.apache.kafka.common.serialization.Serializer接口的类,key序列化成字节数组。注意: 必须被设置,即使没指定key

(3)value.serializer, value序列化成字节数组

同步发送消息

异步发送消息

(1)acks: 指定多少partition副本收到消息,生产者才会认为写成功

        0,不需等待服务器的响应,吞吐量高,如broker没有收到,生产者不知道

        1,leader partition收到消息,一个即成功

        all,所有partition都收到,才成功,leader和follower共同应答

(2)buffer.memory, 生产者内 缓存区域大小

(3)compression.type ,默认不压缩,设置成snappy、gzip或lz4对发送给broker压缩

(4)retries, 重发消息的次数

(5)batch.size, 发送同一partition消息会先存储在batch中,该参数指定一个batch内存大小,单位byte。不一定填满才发送

(6)linger.ms ,批次时间,batch被填满或者linger.ms达到上限,就把batch中的消息发送出去

(7)max.in.flight.requests.per.connection, 生产者在收到服务器响应之前可以发送的消息个数

创建ProducerRecord时,必须 指定序列化器 ,推荐序列化框架Avro、Thrift、ProtoBuf等

用 Avro 之前,先定义schema(通常用 JSON 写)

(1)创建一个类代表客户,作为消息的value

(2)定义schema

(3)生成Avro对象发送到Kafka

ProducerRecord包含Topic、value,key默认null,ey的两个作用:1)附加信息    2)被写到Topic的哪个partition

key  null ,默认partitioner, RoundRobin均衡分布

key不空,hash进行散列 ,不改变partition数量(永远不加),key和partition映射不变。

自定义paritioner 需实现Partitioner接口

实时同步Hbase WAL日志到kafka,笔者这边使用场景有以下两个:

Hbase提供了跨集群的数据同步方式Replication,可通过自定义Replication Endpoint,把消息写入kafka,先来了解Hbase Replication集群之间进行复制同步的过程,整体数据复制流程如下图:

通过以上Hbase Replication的复制过程,可理解,可通过自定义ReplicationEndpoint把entry解析发送到kafka,即可实现实时解析WAL日志推送到消息系统

Hbase默认对应的RepliactionEndpoint实现是HBaseInterClusterReplicationEndpoint,其中封装replicationWALEntry通过RPC发送到Peer集群,对应方法replicateEntries,可参考该类自定义一个KafkaInterClusterReplicationEndpoint类,改写replicateEntries方法推送数据到kafka

注意java客户端如批量写入Hbase,对应WAL日志是一条记录

注意REPLICATION_SCOPE属于设置为1,表示开启复制

注意ENDPOINT_CLASSNAME属性,修改成自定义的ReplicationEndpoint,CONFIG 属性可配置自定义的参数,可在自定义的ReplicationEndpoint类init方法中通过以下方式获取

串行复制和费串行复制有啥区别,可自行查找资料

1、发后即忘(fire-and-forget)

只管往kafka发送消息而并不关心消息是否正确到达。正常情况没什么问题,不过有些时候(比如不可重试异常)会造成消息的丢失。这种发送方式性能最高,可靠性最差。

2、同步发送(sync)

其实kafkaTemplate.send方法并不是返回void,而是ListenableFuture<SendResult<K, V>>,该类继承了jdk concurrent包的Future。

3、异步发送(async)

在send()方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。有读者或许会有疑问,send()方法的返回值类型就是Future,而Future本身就可以用作异步的逻辑处理。这样做不是不行,只不过Future里的 get()方法在何时调用,以及怎么调用都是需要面对的问题,消息不停地发送,那么诸多消息对应的Future对象的处理难免会引起代码处理逻辑的混乱。使用Callback的方式非常简洁明了,Kafka有响应时就会回调,要么发送成功,要么抛出异常。


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/sjk/9960532.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-03
下一篇 2023-05-03

发表评论

登录后才能评论

评论列表(0条)

保存