【Kafka】知识点(二)Kafka API的使用

【Kafka】知识点(二)Kafka API的使用,第1张

【Kafka】知识点(二)Kafka API的使用 一、生产者API:生产数据到Kafka
  • 路径

    • step1:构建ProducerRecord对象

    • step2:调用KafkaProducer的send方法将数据写入Kafka

  • 实施

    package bigdata.itcast.cn.kafka.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    
    public class KafkaProducerTestClient {
        public static void main(String[] args) {
            // todo:1-构建连接
            // 构建一个配置对象
            Properties props = new Properties();
            //指定Kafka服务端地址
            props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
            
            props.put("acks", "all");
            //定义写入Kafka的KV序列化的类型
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // 构建一个连接对象,指定KV的类型,加载配置
            KafkaProducer producer = new KafkaProducer<>(props);
    
            // todo:2-实现 *** 作
            for (int i = 0; i < 10; i++){
                //调用连接对象方法将数据写入Kafka
                //ProducerRecord:每一条写入的数据必须封装为ProducerRecord对象
                //方式一:指定Topic、Key、Value
    //            producer.send(new ProducerRecord("bigdata01", i+"", "itcast"+i));
                //方式二:指定Topic、Value
    //            producer.send(new ProducerRecord("bigdata01",  "itcast"+i));
                //方式三:指定Topic、Partition、Key、Value
                producer.send(new ProducerRecord("bigdata01",0,i+"","itcast"+i));
            }
    
            // todo:3-释放连接
            producer.close();
        }
    }

二、消费者API:消费Topic数据
  • 路径

    • step1:消费者订阅Topic

    • step2:调用poll方法从Kafka中拉取数据,获取返回值

    • step3:从返回值中输出:Topic、Partition、Offset、Key、Value

  • 实施

  • package bigdata.itcast.cn.kafka.consumer;
    ​
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    ​
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    ​
    
    public class KafkaConsumerTestClient {
        public static void main(String[] args) {
            // todo:1-构建消费者连接
            Properties props = new Properties();
            // 指定服务端地址
            props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
            // 指定这个消费者属于哪个消费者组,每个消费者组都会有1个id
            props.setProperty("group.id", "test01");
            // 开启自动提交
            props.setProperty("enable.auto.commit", "true");
            // 自动提交的时间间隔
            props.setProperty("auto.commit.interval.ms", "1000");
            // 消费者读取数据KV的反序列化类型
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            // 构建一个消费者连接对象,指定KV类型以及加载配置
            KafkaConsumer consumer = new KafkaConsumer<>(props);
    ​
            // todo:2-订阅、消费、处理
            //订阅Topic
            consumer.subscribe(Arrays.asList("bigdata01"));
            //消费&处理
            while (true) {
                // 消费:从Kafka拉取数据,拉取到的所有数据放在ConsumerRecords集合对象中
                ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
                // 处理:取出每一条数据
                for (ConsumerRecord record : records){
                    String topic = record.topic();//这条数据属于哪个topic
                    int part = record.partition();//这条数据属于这个Topic的哪个分区
                    long offset = record.offset();//这条数据在这个分区中的offset是什么
                    String key = record.key();//获取数据中的Key
                    String value = record.value();//获取数据中的Value
                    //输出
                    System.out.println(topic+"t"+part+"t"+offset+"t"+key+"t"+value);
                }
    ​
            }
    ​
            // todo:3-实时计算中:消费者是不停的
        }
    }

三、消费者消费过程及问题
  • 路径

    • step1:消费者是如何消费Topic中的数据的?

    • step2:如果消费者故障重启,消费者怎么知道自己上次消费的位置的?

  • 实施

    • Kafka中消费者消费数据的规则

      • 消费者消费Kafka中的Topic根据每个分区的Offset进行消费,每次从上一次的位置继续消费

      • 第一次消费规则:由属性决定

        auto.offset.reset = latest | earliest | none
        latest:默认的值,从Topic每个分区的最新的位置开始消费
        earliest:从最早的位置开始消费,每个分区的offset为0开始消费
        none:如果是第一次消费,这个属性为none,Kafka会抛出异常
        如果不是第一次消费:上面这个属性不起作用
      • 第二次消费开始:根据上一次消费的Offset位置+1继续进行消费

        • 假如上一次消费到的offset=3,那么第二次就该消费offset+1的数据了。

      • 问题1:消费者如何知道上一次消费的位置是什么?

        • 消费者每次成功消费会在自己的内存中记录offset的值,下次直接请求上一次消费的位置

        • Consumer Offset:表示当前消费者组已经消费到的位置

        • Commit Offset:表示下一次要消费的位置,等于Consumer Offset+1

      • 问题2

        • 只有1个消费:如果因为一些原因,消费者故障了,重启消费者,相当于一个新的消费者,原来内存中offset就没有了,消费者怎么知道上一次消费的位置?

        • 如果多个消费者:因为一些原因,某个消费者故障,这个消费者本来负责的分区,Kafka会交给别的消费者来继续消费,这个接盘侠它怎么知道之前的那个消费者消费到的位置?

        • 如果不知道上一次的位置,就无法接着上次的位置继续消费

          • 要么重头消费:数据重复

          • 要么最新消费:数据丢失

        • 原因:offset只放在内存中,进程故障,内存的数据丢失,Offset也就丢失了

        • 解决:将offset持久化存储并且共享

    • Kafka Offset偏移量管理

      • Kafka将每个消费者消费的Commit Offset主动记录在一个Topic中:__consumer_offsets

      • 如果下次消费者没有给定请求offset,kafka就根据自己记录的offset来提供消费的位置

    • 提交的规则:根据时间自动提交

      props.setProperty("enable.auto.commit", "true");//是否自动提交offset
      props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5653226.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)