kafak消费者从头开始消费(消费者组)

kafak消费者从头开始消费(消费者组),第1张

kafak消费从头开始消费(消费者组) 【README】

本文主要用于描述 kafka 消费者如何从头开始消费;


【1】从头开始消费

1)从头开始消费,需要满足两个条件, 如下:

  • 条件1, 使用一个全新的消费者组id;
  • 条件2,指定 auto.offset.reset 为 earliest ;

2)代码如下:

public static void main(String[] args) {
		
		Properties props = new Properties();
		
		
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
		
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
		
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan2"); // group.id 
		 
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest 
		
		
		KafkaConsumer consumer = new KafkaConsumer<>(props); 
		
		consumer.subscribe(Arrays.asList("third", "second"));
		 
		int i =0;
		while(true) {
			if (i++ > 10) break; // 只消费10条数据 
			
			ConsumerRecords consumerRds  = consumer.poll(100);
			
			
			for(ConsumerRecord rd : consumerRds) {
				System.out.println("[消费者] " + rd.key() + "--" + rd.value()); 
			}
		} 
		 
		consumer.close(); 
	}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5436131.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-11
下一篇 2022-12-11

发表评论

登录后才能评论

评论列表(0条)

保存