@SpringBootApplication public class CcApplication { public static void main(String[] args) { SpringApplication.run(CcApplication.class, args); String topic = "ty_analysis"; String groupId = "analysis"; Properties properties = new Properties(); properties.put("bootstrap.servers","172.16.9.10:9092"); //必须指定有业务意义的名字 properties.put("group.id",groupId); properties.put("enable.auto.commit","true"); properties.put("auto.commit.interval.ms","1000"); //从最早的消息开始读取 properties.put("auto.offset.reset","earliest"); properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer (properties); //订阅主题,可以订阅多个主题,还可以使用正则表达式订阅主题 //注意:多次订阅,会覆盖前面的 consumer.subscribe(Arrays.asList(topic)); try{ while(true){ //1000是超时设定,如果有定时要求,可设置,否则建议设置个比较大的值 //通常consumer拿到足够多的数据,会立即返回,否则会阻塞 //poll返回则认为是成功消费了消息,如果发现消费慢需要分析是poll慢还是本身业务逻辑处理慢 ConsumerRecords records = consumer.poll(1000); for(ConsumerRecord record : records){ System.out.printf("offset=%d, key=%s,value= %s%n",record.offset(),record.key(),record.value()); } } }finally { consumer.close(); } } }
# 主要的pom依赖org.apache.kafka kafka-clients2.6.0 com.fasterxml.jackson.core jackson-databind2.9.5
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)