kafka学习笔记

kafka学习笔记,第1张

kafka学习笔记

1.kafka基础:
1.消息:字节数据
2.键(key):可选项
3.批次:提供效率,权衡(时间延迟与吞吐量)
4.主题:(数据库中的表),分表-分区
5.分区:(分区器判断去哪个分区)相当于分类,为何分区:提高性能
6.生产者:发消息,消费者:取消息
7.偏移量:消费者取消息到哪个id
8.消费群组:一个分区被一个消费者消费,有某个消费者挂掉,则该分区由其他一个消费者消费
9.broker:独立kafka主机,一个服务器叫broker,集群:用来复制
10.优点:多生产者多消费者,基于磁盘的数据存储,高伸缩性,高性能
11.常见场景:活动跟踪,传递消息,收集指标和日志,提交日志,流处理。


 





2.创建kafka消费者

2.1.初始化

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");//指定服务
props.put("group.id", "CountryCounter");//消费者的消费组
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//反序列化,将字节数组转换成对象
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);//实例化消费者对象

2.2.订阅主题

//创建完消费者后订阅主题,只需要通过调用subscribe()方法即可,该方法接收一个主题列表
consumer.subscribe(Collections.singletonList("customerCountries"));//订阅主题

//订阅多个主题,通过正则表达式来匹配多个主题,且订阅后如果又有匹配的新主题,那么这个消费组会立即对其进行消费
consumer.subscribe("test.*");

 关于kafka的详细配置参考官方文档:Apache Kafka

 2.3.拉取循环

(25条消息) 深入解析Kafka消费者——提交和偏移量_bobozai86的博客-CSDN博客

//只需要循环不断的拉取消息即可,kafka对外poll方法,(内部实现了协作,分区重平衡、心跳、数据拉取等功能)


try {
   while (true) {  //步骤1,不断拉取
       ConsumerRecords records = consumer.poll(100);  //步骤2在100ms内拉取的数据返回消费者端
       for (ConsumerRecord record : records)  //步骤3遍历拉取的数据记录进行业务处理
       {
           log.debug("topic = %s, partition = %s, offset = %d,
              customer = %s, country = %sn",
              record.topic(), record.partition(), record.offset(),
              record.key(), record.value());
           int updatedCount = 1;
           if (custCountryMap.countainsValue(record.value())) {
               updatedCount = custCountryMap.get(record.value()) + 1;
           }
           custCountryMap.put(record.value(), updatedCount)
           JSonObject json = new JSonObject(custCountryMap);
           System.out.println(json.toString(4))
       }
   }
} finally {
      consumer.close(); //步骤4停止提交offset//原因是卡在了consumer.close()方法里面,它会提交offset信息,如果网络中断或者kafka服务器有问题导致提交不了offset,则consumer.close方法会一直卡住(不停的循环尝试提交offset,永不中断)
}




//其中,代码中标注了几点,说明如下:

//1)这个例子使用无限循环消费并处理数据,这也是使用Kafka最多的一个场景,后面我们会讨论如何更好的退出循环并关闭。
//2)这是上面代码中最核心的一行代码。我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回。
//3)poll()方法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。
//4)主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期。

2.4 提交(commit)与位移(offset)

当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到,kafka让消费者自身来管理消费位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit).

提交当前位移

为了减少消息重复消费活着避免消息丢失,选择主动提交位移,设置auto.commit.offset为false,那么需要通过调用commitSync()来主动提交位移,会提交poll返回的最后位移。

为了避免消息丢失,在完成业务逻辑后才提交位移。

//自动提交
while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %sn", record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
    
    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        log.error("commit failed", e)
    }
}

手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。

异步提交

//异步提交

while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records)
    {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %sn",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    
    consumer.commitAsync();
}

 异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

//基于这种性质,一般情况下对于异步提交,我们可能会通过回调的方式记录提交结果:
while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records) {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %sn",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map offsets, Exception exception) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        } 
    });
}


//而如果想进行重试同时又保证提交顺序的话,一种简单的办法是使用单调递增的序号。每次发起异步提交时增加此序号,并且将此时的序号作为参数传给回调方法;当消息提交失败回调时,检查参数中的序号值与全局的序号值,如果相等那么可以进行重试提交,否则放弃(因为已经有更新的位移提交了)。
混合同步提交与异步提交
//正常情况下,偶然的提交失败并不是什么大问题,因为后续的提交成功就可以了。但是在某些情况下(例如程序退出、重平衡),我们希望最后的提交成功,因此一种非常普遍的方式是混合异步提交和同步提交,如下所示:


try {
    while (true) {
       ConsumerRecords records = consumer.poll(100);
       for (ConsumerRecord record : records) {
           System.out.printf("topic = %s, partition = %s, offset = %d,
           customer = %s, country = %sn",
           record.topic(), record.partition(),
           record.offset(), record.key(), record.value());
       }
       
       consumer.commitAsync();
    }
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}



//在正常处理流程中,我们使用异步提交来提高性能,但最后使用同步提交来保证位移提交成功。
提交特定位移
//commitSync()和commitAsync()会提交上一次poll()的最大位移,但如果poll()返回了批量消息,而且消息数量非常多,我们可能会希望在处理这些批量消息过程中提交位移,以免重平衡导致从头开始消费和处理。幸运的是,commitSync()和commitAsync()允许我们指定特定的位移参数,参数为一个分区与位移的map。由于一个消费者可能会消费多个分区,所以这种方式会增加一定的代码复杂度,如下所示:

private Map currentOffsets = new HashMap<>();
int count = 0;
 
....
 
while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %sn", record.topic(), record.partition(), record.offset(), record.key(), record.value());
 
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndmetadata(record.offset()+1, "no metadata"));
        if (count % 1000 == 0)
            consumer.commitAsync(currentOffsets, null);
        count++;
} }


//代码中在处理poll()消息的过程中,不断保存分区与位移的关系,每处理1000条消息就会异步提交(也可以使用同步提交)。

优雅退出

//在一般情况下,我们会在一个主线程中循环poll消息并进行处理。当需要退出poll循环时,我们可以使用另一个线程调用consumer.wakeup(),调用此方法会使得poll()抛出WakeupException。如果调用wakup时,主线程正在处理消息,那么在下一次主线程调用poll时会抛出异常。主线程在抛出WakeUpException后,需要调用consumer.close(),此方法会提交位移,同时发送一个退出消费组的消息到Kafka的组协调者。组协调者收到消息后会立即进行重平衡(而无需等待此消费者会话过期)。


//注册JVM关闭时的回调钩子,当JVM关闭时调用此钩子。
Runtime.getRuntime().addShutdownHook(new Thread() {
          public void run() {
              System.out.println("Starting exit...");
              //调用消费者的wakeup方法通知主线程退出
              consumer.wakeup();
              try {
                  //等待主线程退出
                  mainThread.join();
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          } 
});
 
...
 
try {
    // looping until ctrl-c, the shutdown hook will cleanup on exit
    while (true) {
        ConsumerRecords records = consumer.poll(1000);
        System.out.println(System.currentTimeMillis() + "--  waiting for data...");
        for (ConsumerRecord record : records) {
            System.out.printf("offset = %d, key = %s, value = %sn",record.offset(), record.key(), record.value());
        }
        for (TopicPartition tp: consumer.assignment())
            System.out.println("Committing offset at position:" + consumer.position(tp));
        consumer.commitSync();
    }
} catch (WakeupException e) {
    // ignore for shutdown
} finally {
    consumer.close();
    System.out.println("Closed consumer and we are done");
}




3.消费者创建实例:

1.一个消费组G1里只有一个消费者

public class MyConsumer {
    public static void main(String[] args) {
        Properties prop=new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.195.20:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        //latest收最新的数据 none会报错 earliest最早的数据
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1");
       //创建消费者
        KafkaConsumer consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton("kb09two"));//订阅
       
        while (true){
            ConsumerRecords poll = consumer.poll(100);
           for (ConsumerRecord record : poll) {
                System.out.println(record.offset()+"t"+record.key()+"t"+record.value());
                System.out.println(" ");
            }
        }
}}

2.一个消费组G2里有多个消费者

public class MyConsumer {
    public static void main(String[] args) {
        Properties prop=new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.195.20:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        //latest收最新的数据 none会报错 earliest最早的数据
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G2");//这里改为G2
        for (int i = 0; i <3 ; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    KafkaConsumer consumer = new KafkaConsumer<>(prop);
                    consumer.subscribe(Collections.singleton("kb09two"));//订阅
                    while (true){
            ConsumerRecords poll = consumer.poll(100);
            for (ConsumerRecord record : poll) {
                System.out.println(Thread.currentThread().getName()+"t"
                        +record.offset()+"t"+record.key()+"t"+record.value());
            }
        }
                }
            }).start();
        }

    }
}





4.生产者实例:

调用kafka生产者Api,需要导入kafka-clients依赖

  
      org.apache.kafka
      kafka-clients
      2.0.0
    

创建生产者

public class MyProducer {
    public static void main(String[] args) {
        Properties prop=new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.195.20:9092");
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        prop.put(ProducerConfig.ACKS_CONFIG,"-1");
        //创建生产者
        KafkaProducer producer=new KafkaProducer(prop);

        for (int i = 0; i <200 ; i++) {
            ProducerRecord producerRecord =
                    new ProducerRecord<>("kb09two", "hello world" + i);
        producer.send(producerRecord);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("gg");
        }
    }

测试:

在moba上启动zookeeper,Kafka;

1)先创建一个topic

kafka-topics.sh --create --zookeeper 192.168.195.20:2181 --topic kb09two 
--partitions 3 --replication-factor 1


(2)查看topic

kafka-topics.sh --zookeeper 192.168.195.20:2181 --topic kb09two --describe


(3)启动消费者

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kb09two --from-beginning

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5610902.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存