package Kafka_Demo; import org.apache.kafka.clients.consumer.StickyAssignor; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.Recordmetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class Producer { public static void main(String[] args) { producerconfig producerconfig = new producerconfig(); Properties pc = producerconfig.pc(); KafkaProducerproducer = new KafkaProducer<>(pc); / / //**************************************************************************************************************************** //**************************************************************************************************************************** //异步回调(不阻塞) //JavaProducer的send方法会返回一个JavaFuture对象供用户稍后获取发送结果。这就是回调机制。 //metadata 和 exception 不可能同时为空,消息发送成功时,Exception为null,消息发送失败时,metadata为空 StickyAssignor stickyAssignor = new StickyAssignor(); for (int i = 0; i <= 10; i++) { String s = "test---" + i; ProducerRecord first = new ProducerRecord<>("first", s, s); producer.send(first, new Callback() { @Override public void onCompletion(Recordmetadata metadata, Exception exception) { if (exception == null) { System.out.println("异步发送消息成功:" + " topic=" + metadata.topic() + "t" + "partition=" + metadata.partition() + "t" + "offset=" + metadata.offset() + "t" + s); } else { exception.printStackTrace(); } } }); } producer.close(); //**************************************************************************************************************************** // } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)