- 一、kafka消费者提交的内容
- 二、自动提交
- 三、自动提交代码示例
- 四、手动提交
- 五、手动同步提交代码示例
- 六、手动异步提交代码示例
- 消费组+消费的某个主题+消费的某个分区及消费的偏移量
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群_consumer_offsets主题里面。
二、自动提交//指定了消费者是否自动提交消费位移,默认为true。 // 如果需要减少重复消费或者数据丢失,你可以设置为false。 // 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。 props.put("enable.auto.commit",true); props.put("auto.commit.interval.ms","1000");
自动提交:消息poll下来以后,直接提交offset:
- enable.auto.commit (bool) :如果为True,将自动定时提交消费者offset。默认为True。
- auto.commit.interval.ms(int) :自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true,默认值为: 5000。
- 当设置 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; public class MyConsumer { private final static String TOPIC_NAME = "optics-topic"; public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092"); props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " + "required username="debezium" password="NGFlM2I1NTJlNmFk";"); props.put("security.protocol","SASL_PLAINTEXT"); props.put("sasl.mechanism","PLAIN"); //消费者组 props.put("group.id", "opticsgroup1"); //反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //指定了消费者是否自动提交消费位移,默认为true。 // 如果需要减少重复消费或者数据丢失,你可以设置为false。 // 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。 props.put("enable.auto.commit",true); props.put("auto.commit.interval.ms","1000"); props.put("max.poll.records",500); //可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会 //认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者 props.put("max.poll.interval.ms",30*1000); //consumer给broker发送心跳的间隔时刻 props.put("heartbeat.interval.ms",1000); //kafka如果超过10秒没有收到消费者宕心跳,则会把消费者提出消费者组,进行rebalance,把分区分配给其他消费者 props.put("session.timeout.ms",10*1000); //新消费组从头消费 props.put("auto.offset.reset","earliest"); //创建消费者 KafkaConsumer四、手动提交consumer = new KafkaConsumer (props); //订阅主题 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord record : records){ System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%sn", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } }
props.put("enable.auto.commit",false);
手动提交:在消费消息后再提交offset。
- 手动同步提交:阻塞到集群返回ack
- 手动异步提交:在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; public class MyConsumer { private final static String TOPIC_NAME = "optics-topic"; public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092"); props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " + "required username="debezium" password="NGFlM2I1NTJlNmFk";"); props.put("security.protocol","SASL_PLAINTEXT"); props.put("sasl.mechanism","PLAIN"); //消费者组 props.put("group.id", "opticsgroup1"); //反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //指定了消费者是否自动提交消费位移,默认为true。 // 如果需要减少重复消费或者数据丢失,你可以设置为false。 // 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。 props.put("enable.auto.commit",true); props.put("auto.commit.interval.ms","1000"); props.put("max.poll.records",500); //可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会 //认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者 props.put("max.poll.interval.ms",30*1000); //consumer给broker发送心跳的间隔时刻 props.put("heartbeat.interval.ms",1000); //kafka如果超过10秒没有收到消费者宕心跳,则会把消费者提出消费者组,进行rebalance,把分区分配给其他消费者 props.put("session.timeout.ms",10*1000); //新消费组从头消费 props.put("auto.offset.reset","earliest"); //创建消费者 KafkaConsumer六、手动异步提交代码示例consumer = new KafkaConsumer (props); //订阅主题 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord record : records){ System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%sn", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } //所有消息已经消费完 if(records.count() > 0){ //有消息 consumer.commitSync(); //阻塞提交成功 } } } }
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; public class MyConsumer { private final static String TOPIC_NAME = "optics-topic"; public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); //设置kafka集群的地址 props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092"); props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " + "required username="debezium" password="NGFlM2I1NTJlNmFk";"); props.put("security.protocol","SASL_PLAINTEXT"); props.put("sasl.mechanism","PLAIN"); //消费者组 props.put("group.id", "opticsgroup1"); //反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //指定了消费者是否自动提交消费位移,默认为true。 // 如果需要减少重复消费或者数据丢失,你可以设置为false。 // 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。 props.put("enable.auto.commit",true); props.put("auto.commit.interval.ms","1000"); props.put("max.poll.records",500); //可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会 //认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者 props.put("max.poll.interval.ms",30*1000); //consumer给broker发送心跳的间隔时刻 props.put("heartbeat.interval.ms",1000); //kafka如果超过10秒没有收到消费者宕心跳,则会把消费者提出消费者组,进行rebalance,把分区分配给其他消费者 props.put("session.timeout.ms",10*1000); //新消费组从头消费 props.put("auto.offset.reset","earliest"); //创建消费者 KafkaConsumerconsumer = new KafkaConsumer (props); //订阅主题 consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for(ConsumerRecord record : records){ System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%sn", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } //所有消息已经消费完 if(records.count() > 0){ //有消息 //手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map map, Exception e) { if (e != null){ System.err.println("Commit failed for " + map); System.err.println("Commit failed exception: " + e.getStackTrace()); } } }); } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)