Kafka Streams-根据Streams数据发送不同的主题

Kafka Streams-根据Streams数据发送不同的主题,第1张

Kafka Streams-根据Streams数据发送不同的主题

您可以使用

branch
方法来拆分流。此方法使用谓词源流分成几个流。

以下代码取自kafka-streams-examples:

KStream<String, OrderValue>[] forks = ordersWithTotals.branch(    (id, orderValue) -> orderValue.getValue() >= FRAUD_LIMIT,    (id, orderValue) -> orderValue.getValue() < FRAUD_LIMIT);forks[0].mapValues(    orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, FAIL))    .to(ORDER_VALIDATIONS.name(), Produced        .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));forks[1].mapValues(    orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, PASS))    .to(ORDER_VALIDATIONS.name(), Produced  .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5461949.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存