本文主要用于描述 kafka 消费者如何从头开始消费;
【1】从头开始消费
1)从头开始消费,需要满足两个条件, 如下:
- 条件1, 使用一个全新的消费者组id;
- 条件2,指定 auto.offset.reset 为 earliest ;
2)代码如下:
public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan2"); // group.id props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("third", "second")); int i =0; while(true) { if (i++ > 10) break; // 只消费10条数据 ConsumerRecords consumerRds = consumer.poll(100); for(ConsumerRecord rd : consumerRds) { System.out.println("[消费者] " + rd.key() + "--" + rd.value()); } } consumer.close(); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)