Kafka消费者发送消息

Kafka消费者发送消息,第1张

异步发送
Properties properties = new Properties();

// 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

// 手动指定序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
properties.put("client.username","jdq账号");
properties.put("client.password","jdq密码");

KafkaProducer kafkaProducer = new KafkaProducer(properties);
kafkaProducer.send(new ProducerRecord<>("String","String"));
kafkaProducer.close();

注意,jdq需要指定username和password。

回调异步发送

返回值实际上是等待队列发送的消息,且发送失败后会自动重试,不需要我们手动进行重试

@Slf4j
public class CustomProduct {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");

        // 手动指定序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        properties.put("client.username", "jdq账号");
        properties.put("client.password", "jdq密码");


        KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
        Future send = kafkaProducer.send(new ProducerRecord<>("String", "String"));
        kafkaProducer.send(new ProducerRecord<>("String", "String"), (recordMetadata, e) -> {
            if (e == null) {
                log.info("主题:{},分区:{}",recordMetadata.topic(),recordMetadata.partition());
            }
        });
        kafkaProducer.close();


    }

}
同步发送 

同步发送是指,发送到等待队列中的数据,必须等待消息已经发送到kafka集群中,生产者才会发送下一条消息。即等待队列等待数据累计到batch.size或者时间到达linger.ms之后,通过sender发送到kafka集群中,再根据ack应答的登记,最后由selector决定删除信息还是重试,最后直到清理队列中的消息,消费者再发送下一条数据。

同步发送的标志是让send方法有返回值,即

Future send = kafkaProducer.send(new ProducerRecord<>("String", "String"));

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/921538.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存