Properties properties = new Properties();
// 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
// 手动指定序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put("client.username","jdq账号");
properties.put("client.password","jdq密码");
KafkaProducer kafkaProducer = new KafkaProducer(properties);
kafkaProducer.send(new ProducerRecord<>("String","String"));
kafkaProducer.close();
回调异步发送注意,jdq需要指定username和password。
返回值实际上是等待队列发送的消息,且发送失败后会自动重试,不需要我们手动进行重试
@Slf4j
public class CustomProduct {
public static void main(String[] args) {
Properties properties = new Properties();
// 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
// 手动指定序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put("client.username", "jdq账号");
properties.put("client.password", "jdq密码");
KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
Future send = kafkaProducer.send(new ProducerRecord<>("String", "String"));
kafkaProducer.send(new ProducerRecord<>("String", "String"), (recordMetadata, e) -> {
if (e == null) {
log.info("主题:{},分区:{}",recordMetadata.topic(),recordMetadata.partition());
}
});
kafkaProducer.close();
}
}
同步发送
同步发送是指,发送到等待队列中的数据,必须等待消息已经发送到kafka集群中,生产者才会发送下一条消息。即等待队列等待数据累计到batch.size或者时间到达linger.ms之后,通过sender发送到kafka集群中,再根据ack应答的登记,最后由selector决定删除信息还是重试,最后直到清理队列中的消息,消费者再发送下一条数据。
同步发送的标志是让send方法有返回值,即
Future send = kafkaProducer.send(new ProducerRecord<>("String", "String"));
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)