pulsar 的批量发送与rocketmq不同,本质是client 进行缓存,根据producer创建参数自行进行控制,没有显示batch发送 *** 作,示意代码如下:
ProducerBatchProducer = client.newProducer().topic("mybatch-topic").batchingMaxMessages(10) .batchingMaxPublishDelay(10, TimeUnit.MINUTES).sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(true) .enableBatching(true).create(); // BatchProducer. for (int i = 0; i < 25; i++) { // BatchProducer.newMessage().value(); TypedMessageBuilder w = BatchProducer.newMessage().key("key" + i).value(("value" + i).getBytes()) .property("user-defined-property", "value"); w.sendAsync().thenAccept(messageId -> { System.out.println("Published batch message: " + messageId); }).exceptionally(ex -> { System.err.println("Failed to publish: " + ex); return null; }); ; } //FLUSH will send all remained message to broker BatchProducer.flush();
要注意几点
1、设置合理批量发送数量及最大间隔发送时间(batchingMaxPublishDelay)
2、适当考虑调用flush,以保障未达到batchingMaxMessages还在队列中的消息都发送给broker
具体批量发送的逻辑可以看ProducerImpl类,多个函数都有批量考虑,如private void serializeAndSendMessage。
实际批量发送函数为 doBatchSendAndAdd(msg, callback, payload)
事务消息不能与批量同时使用,事务有超时控制机制,创建代码如下:
Transaction txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
如果使用了batch模式并设置了debug,可能有如下信息不断输出
2021-11-23 16:08:01.791 INFO 1040 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl : [mybatch-topic] [standalone-0-6] Batching the messages from the batch container with 0 messages
同时消息不能设置超时时间,如果设置了可能有如下错误:
only producers disabled sendTimeout are allowed to produce transactional
事务消息可以混杂立即发送和延迟发送消息,完整的发送事务代码如下:
// BatchProducer. Transaction txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); // only producers disabled sendTimeout are allowed to produce transactional // messages for (int i = 0; i < 25; i++) { // BatchProducer.newMessage().value(); TypedMessageBuilder消费侧事务w ; if (i %2 ==0) { w=BatchProducer.newMessage(txn).key("key" + i).value(("value" + i).getBytes()) .property("user-defined-property", "value"); }else { w=BatchProducer.newMessage(txn).deliverAfter(100, TimeUnit.SECONDS).key("key" + i).value(("value" + i).getBytes()) .property("user-defined-property", "value"); } w.sendAsync().thenAccept(messageId -> { System.out.println("Published batch message: " + messageId); }).exceptionally(ex -> { System.err.println("Failed to publish: " + ex); return null; }); } } // BatchProducer.flush(); txn.commit();
pulsar消费也支持事务模式,官方文档给的示意是可以完成多个topic的消费后提交,以下是一个简单批量消费后一起提交的示意代码
Consumer consumer = client.newConsumer().topic("mybatch-topic").subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Key_Shared).batchReceivePolicy(BatchReceivePolicy.builder() .maxNumMessages(5).maxNumBytes(1024 * 1024).timeout(200, TimeUnit.MILLISECONDS).build()) .subscribe(); //Messages m = consumer.batchReceive(); int i=0; Transaction txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); while (true) { // Wait for a message Message msg = consumer.receive(); try { // Do something with the message System.out.println("Message received: " + new String(msg.getData())); // Acknowledge the message so that it can be deleted by the message broker consumer.acknowledgeCumulativeAsync(msg.getMessageId(),txn); } catch (Exception e) { // Message failed to process, redeliver later consumer.negativeAcknowledge(msg); } i++; if (i>=5) { txn.commit(); txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); i=0; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)