SpringBoot使用Kafka
引入kafka依赖:
org.apache.kafka kafka-clients2.6.0 org.apache.kafka kafka-streams2.6.0
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class SendMessage{ public static void sendKafkaMessage(String msg) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.kafka_url);//kafka地址,多个地址用逗号分割 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducerkafkaProducer = new KafkaProducer<>(properties); try { ProducerRecord record = new ProducerRecord (Constants.topic, msg); //topic kafkaProducer.send(record); log.info("消息发送成功: {}", msg); } finally { kafkaProducer.close(); } } }
消息消费工具类:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class HandlerMessage{ public static void main(String [] args){ KafkaConsumerkafkaConsumer = new KafkaConfigUtil().initKafkaConfig(); while(true){ ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { log.info("Received Message: {}", record.value()); kafkaConsumer.commitAsync(); JSonObject kafkaMsg = JSONObject.parseObject(JSONObject.parse(record.value()).toString()); log.info("jsonObject: {}", kafkaMsg.toString()); } } } }
kafka创建topic:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)