深入浅出理解kafka原理系列之:自动提交和手动提交

深入浅出理解kafka原理系列之:自动提交和手动提交,第1张

深入浅出理解kafka原理系列之:自动提交和手动提交

深入浅出理解kafka原理系列之:自动提交和手动提交
  • 一、kafka消费者提交的内容
  • 二、自动提交
  • 三、自动提交代码示例
  • 四、手动提交
  • 五、手动同步提交代码示例
  • 六、手动异步提交代码示例

一、kafka消费者提交的内容
  • 消费组+消费的某个主题+消费的某个分区及消费的偏移量

消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群_consumer_offsets主题里面。

二、自动提交
//指定了消费者是否自动提交消费位移,默认为true。
// 如果需要减少重复消费或者数据丢失,你可以设置为false。
// 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
props.put("enable.auto.commit",true);
props.put("auto.commit.interval.ms","1000");

自动提交:消息poll下来以后,直接提交offset:

  • enable.auto.commit (bool) :如果为True,将自动定时提交消费者offset。默认为True。
  • auto.commit.interval.ms(int) :自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true,默认值为: 5000。
  • 当设置 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况
三、自动提交代码示例
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyConsumer {
    private final static String TOPIC_NAME = "optics-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
                "required username="debezium" password="NGFlM2I1NTJlNmFk";");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","PLAIN");
        //消费者组
        props.put("group.id", "opticsgroup1");
        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //指定了消费者是否自动提交消费位移,默认为true。
        // 如果需要减少重复消费或者数据丢失,你可以设置为false。
        // 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
        props.put("enable.auto.commit",true);
        props.put("auto.commit.interval.ms","1000");

        props.put("max.poll.records",500);
        //可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会
        //认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者
        props.put("max.poll.interval.ms",30*1000);

        //consumer给broker发送心跳的间隔时刻
        props.put("heartbeat.interval.ms",1000);
        //kafka如果超过10秒没有收到消费者宕心跳,则会把消费者提出消费者组,进行rebalance,把分区分配给其他消费者
        props.put("session.timeout.ms",10*1000);

        //新消费组从头消费
        props.put("auto.offset.reset","earliest");

        //创建消费者
        KafkaConsumer consumer = new KafkaConsumer(props);
        //订阅主题
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {  
            ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord record : records){
                System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%sn",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}
四、手动提交
props.put("enable.auto.commit",false);

手动提交:在消费消息后再提交offset。

  • 手动同步提交:阻塞到集群返回ack
  • 手动异步提交:在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
五、手动同步提交代码示例
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyConsumer {
    private final static String TOPIC_NAME = "optics-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
                "required username="debezium" password="NGFlM2I1NTJlNmFk";");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","PLAIN");
        //消费者组
        props.put("group.id", "opticsgroup1");
        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //指定了消费者是否自动提交消费位移,默认为true。
        // 如果需要减少重复消费或者数据丢失,你可以设置为false。
        // 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
        props.put("enable.auto.commit",true);
        props.put("auto.commit.interval.ms","1000");

        props.put("max.poll.records",500);
        //可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会
        //认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者
        props.put("max.poll.interval.ms",30*1000);

        //consumer给broker发送心跳的间隔时刻
        props.put("heartbeat.interval.ms",1000);
        //kafka如果超过10秒没有收到消费者宕心跳,则会把消费者提出消费者组,进行rebalance,把分区分配给其他消费者
        props.put("session.timeout.ms",10*1000);

        //新消费组从头消费
        props.put("auto.offset.reset","earliest");

        //创建消费者
        KafkaConsumer consumer = new KafkaConsumer(props);
        //订阅主题
        consumer.subscribe(Arrays.asList(TOPIC_NAME));


        while (true) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord record : records){
                System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%sn",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
            //所有消息已经消费完
            if(records.count() > 0){  //有消息
                consumer.commitSync();   //阻塞提交成功
            }
        }
    }
}
六、手动异步提交代码示例
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class MyConsumer {
    private final static String TOPIC_NAME = "optics-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "10.129.88.26:9092,10.129.88.32:9092,10.129.88.39:9092");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule " +
                "required username="debezium" password="NGFlM2I1NTJlNmFk";");
        props.put("security.protocol","SASL_PLAINTEXT");
        props.put("sasl.mechanism","PLAIN");
        //消费者组
        props.put("group.id", "opticsgroup1");
        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //指定了消费者是否自动提交消费位移,默认为true。
        // 如果需要减少重复消费或者数据丢失,你可以设置为false。
        // 如果为true,需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
        props.put("enable.auto.commit",true);
        props.put("auto.commit.interval.ms","1000");

        props.put("max.poll.records",500);
        //可以根据消费速度的快慢来设置,因为如果两次poll的时间超出了30s的时间间隔,kafka会
        //认为消费能力过弱,将其踢出消费组,将分区分配给其他消费者
        props.put("max.poll.interval.ms",30*1000);

        //consumer给broker发送心跳的间隔时刻
        props.put("heartbeat.interval.ms",1000);
        //kafka如果超过10秒没有收到消费者宕心跳,则会把消费者提出消费者组,进行rebalance,把分区分配给其他消费者
        props.put("session.timeout.ms",10*1000);

        //新消费组从头消费
        props.put("auto.offset.reset","earliest");

        //创建消费者
        KafkaConsumer consumer = new KafkaConsumer(props);
        //订阅主题
        consumer.subscribe(Arrays.asList(TOPIC_NAME));


        while (true) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord record : records){
                System.out.printf("topic = %s, partition = %s, offset = %d, key = %s, value =%sn",
                        record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
            //所有消息已经消费完
            if(records.count() > 0){  //有消息
                //手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map map, Exception e) {
                        if (e != null){
                            System.err.println("Commit failed for " + map);
                            System.err.println("Commit failed exception: " + e.getStackTrace());
                        }
                    }
                });
            }
        }
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5676149.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存