/**
* 工作里这种用法,有,但是不推荐
*/
private static void helloworld() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);
// 消费订阅哪一个Topic或者几个Topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord record : records) {
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
Kafka的Consumer手动提交offset
/**
* 手动提交offset
*/
private static void commitedOffset() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);
// 消费订阅哪一个Topic或者几个Topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord record : records) {
// 想把数据保存到数据库,成功就成功,不成功...
// TODO record 2 db
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
// 如果失败,则回滚, 不要提交offset
}
// 如果成功,手动通知offset提交
consumer.commitAsync();
}
}
Kafka的Consumer手动提交offset,并且手动控制partition
/**
* 手动提交offset,并且手动控制partition
*/
private static void commitedOffsetWithPartition() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);
// 消费订阅哪一个Topic或者几个Topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
// 每个partition单独处理
for (TopicPartition partition : records.partitions()) {
List> pRecord = records.records(partition);
for (ConsumerRecord record : pRecord) {
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
// 单个partition中的offset,并且进行提交
Map offset = new HashMap<>();
offset.put(partition, new OffsetAndMetadata(lastOffset + 1));
// 提交offset
consumer.commitSync(offset);
System.out.println("=============partition - " + partition + " end================");
}
}
}
链接: https://pan.baidu.com/s/1F-ySQ8chot6ro9sAn7Re0A 提取码: ib0i
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)