第七章 Kafka专题之SpringBoot整合KAFKA之消费者实战案例

第七章 Kafka专题之SpringBoot整合KAFKA之消费者实战案例,第1张

第七章 Kafka专题之SpringBoot整合KAFKA之消费实战案例 1、Java实现KafkaConsumer
package net.testclass.testclasskafka;

import org.apache.kafka.clients.consumer.*;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.6.102");
        //消费者组名
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");

        //是否自动提交offset,默认是true
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"ture");
        //自动提交offset间隔时间
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        
        //一次poll拉取消息的最大条数
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500");



        
        //consumer给broker发送消息时间
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"1000");
        //kafka如果超过10s没有收到消息的心跳,则会把消费者提出消费者组,进行rebalance,将分区分配给其他消费者
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"10 * 1000");



        
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"30 * 1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        Consumer consu = new KafkaConsumer(props);
        Collection topics = Arrays.asList("mytest");
        //消费者订阅topic
        consu.subscribe(topics);
        ConsumerRecords consumerRecords = null;
        while(true){
            //接下来就要从topic中拉去数据
            consumerRecords = consu.poll(Duration.ofMillis(1000));
            //遍历每一条记录
            for(ConsumerRecord consumerRecord : consumerRecords){
                long offset = consumerRecord.offset();
                int partition = consumerRecord.partition();
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                System.out.println(offset+" "+partition+" "+key+" "+value);
            }
        }
    }
}
2、消费者拉取broker消息流程

(1)消费者消费机制

  • 消费者从broker获取数据机制
    • 采用pull拉去方式,从broker的partition获取数据
  • 消费者从获取哪个分区数据
    • 一个partition leader可以由一个消费者组中的一个消费者进行消费

(2)消费者消费分区的分配策略

①轮询分配:按照消费者组进行轮询分配,将同个消费者监听的所有主题的所有partition和所有的consumer都列出来,进行轮询。

  • 弊端:如果在同一个消费者组内,订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀
#实际案例
消费者组:有三个消费者C1,C2,C3,共同订阅了3个主题,t0,t1,t2
topic分区情况:t0一个分区(p0),t1两个分区(p0,p1),t2(p0,p1,p2)

#共同订阅的轮询分配
消费者具体订阅主题情况:3个消费者都订阅t0,t1,t2
轮询分配:C1拿到t0(p0)、t2(p0)/C2拿到t1(p0)、t2(p1)/C3拿到t1(p1)、t2(p2)

#不同订阅的轮询分配
消费者具体订阅主题情况:C1订阅t1,C2订阅了t1,t2,C3订阅了t1,t2,t3
轮询分配方案:C1拿到t0(p0)/C2拿到t1(p0)/C3拿到t1(p1)、t2(p0)、t2(p1)、t2(p2)

②范围分配:按照主题进行分配,如果不平均分配则对每一个Topi而言,前n个消费者会分配多一个分区。

(3)消费速度相关参数

MAX_POLL_RECORDS_CONFIG,"500"
consu.poll(Duration.ofMillis(1000));
  • 如果一次poll消息到500条,就直接执行业务逻辑;
  • 如果一次poll没有到500条,且时间还在1秒内,那么长轮询继续poll,要么到500条,要么到1s;
  • 如果多次poll都没有达到500条,且时间到了1s,则直接执行业务逻辑。

(4)心跳设置

HEARTBEAT_INTERVAL_MS_CONFIG,"1000"
SESSION_TIMEOUT_MS_CONFIG,"10 * 1000"
  • 消费者发送心跳时间间隔为1s;
  • 如果集群超过10s没有搜到消费者的心跳,则会把消费者提出消费者组,进行rebalace,把分区分配给其他消费者。

(5)代码实战

  • yml配置日志修改
server:
  port: 8080
logging:
  config: classpath:logback.xml
  • xml文件配置

 
 	
	
    	%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n
 	
 
 
 
 	
 

  • 编写消费者脚本
package net.testclass.testclasskafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    public static final String TOPIC_NAME = "";

    public static Properties getProperties() {
        Properties props = new Properties();
        // brokerID
        props.put("bootstrap.servers","192.168.6.102:9092,");
        //groupID分组内的消费者只能消费该消息一次
        props.put("group.id", "testgroup1");
        //自动提交offset
        props.put("enable.auto.commit", "true");
        //自动提交时间间隔
        props.put("auto.commit.interval.ms","1000");
        //反序列化
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    @Test
    public void simpleConsumerTest(){
        Properties props = getProperties();
        KafkaConsumer consumer = new KafkaConsumer(props);

        consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));
        while (true) {
            //拉取时间控制,阻塞超时时间
            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord record : records) {
                System.err.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(),record.offset(), record.key(), record.value());
            }
        }
    }
}

(6)脚本运行

先运行producer脚本,后运行consumer脚本
  • 查看生产者运行结果

  • 查看消费者运行结果

3、自动提交&手动提交
  • 自动提交:消息poll下来之后,直接提交offset;
  • 手动提交:在消息消费后再提交offset;
  • 注意:实际业务场景都会手动提交offset
3.1、自动提交offset

(1)参数说明

ENABLE_AUTO_COMMIT_CONFIG,"ture"
AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"
  • offset自动提交开关,ture代表自动提交,false则相反
  • offset自动提交的时间间隔,消费者poll消息后,会自动向broker的_consumer_offset主题提交当前主题-分区消费的偏移量;
  • 注意事项:默认提交会丢消息,如果消费者还没消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息,之前未被消费的消息就丢失掉了

(2)编写脚本

package net.testclass.testclasskafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    public static final String TOPIC_NAME = "";

    public static Properties getProperties() {
        Properties props = new Properties();
        // brokerID
        props.put("bootstrap.servers","192.168.6.102:9092,");
        //groupID分组内的消费者只能消费该消息一次
        props.put("group.id", "testgroup1");
        //自动提交offset
        props.put("enable.auto.commit", "true");
        //自动提交时间间隔
        props.put("auto.commit.interval.ms","1000");
        //反序列化
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    @Test
    public void simpleConsumerTest(){
        Properties props = getProperties();
        KafkaConsumer consumer = new KafkaConsumer(props);

        consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));
        while (true) {
            //拉取时间控制,阻塞超时时间
            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord record : records) {
                System.err.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(),record.offset(), record.key(), record.value());
            }
        }
    }
}

(3)运行结果

先运行producer脚本,后运行consumer脚本
  • 查看生产者运行结果

  • 查看消费者运行结果

3.2、手动提交offset

(1)参数说明

ENABLE_AUTO_COMMIT_CONFIG,"false"
  • 手动同步提交:线程阻塞知道offset提交成功
  • 手动异步提交:线程不会阻塞,通过回调函数记录失败信息,生产环境用的更多

(2)手动同步提交脚本

package net.testclass.testclasskafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    public static final String TOPIC_NAME = "";

    public static Properties getProperties() {
        Properties props = new Properties();
        // brokerID
        props.put("bootstrap.servers","192.168.6.102:9092,192.168.6.103:9092,192.168.6.104:9092");
        //groupID分组内的消费者只能消费该消息一次
        props.put("group.id", "testgroup10");
        //手动提交offset
        props.put("enable.auto.commit", "false");
        //自动提交时间间隔
        //props.put("auto.commit.interval.ms","1000");
        //开启从头开始消费
        props.put("auto.offset.reset","earliest");
        //反序列化
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    @Test
    public void simpleConsumerTest(){
        Properties props = getProperties();
        KafkaConsumer consumer = new KafkaConsumer(props);

        consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));
        while (true) {
            //进行时间控制,阻塞超时时间
            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord record : records) {
                System.err.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(),record.offset(), record.key(), record.value());
            }
            //手动同步提交
            consumer.commitAsync();
        }
    }
}

(3)运行结果

先运行consumer,之后运行producer
  • 生产者运行结果

  • 消费者运行结果

(4)手动异步提交脚本

package net.testclass.testclasskafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

public class MyConsumer {
    public static final String TOPIC_NAME = "";

    public static Properties getProperties() {
        Properties props = new Properties();
        // brokerID
        props.put("bootstrap.servers","192.168.6.102:9092,192.168.6.103:9092,192.168.6.104:9092");
        //groupID分组内的消费者只能消费该消息一次
        props.put("group.id", "testgroup10");
        //手动提交offset
        props.put("enable.auto.commit", "false");
        //自动提交时间间隔
        //props.put("auto.commit.interval.ms","1000");
        //开启从头开始消费
        props.put("auto.offset.reset","earliest");
        //反序列化
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    @Test
    public void simpleConsumerTest(){
        Properties props = getProperties();
        KafkaConsumer consumer = new KafkaConsumer(props);

        consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));
        while (true) {
            //进行时间控制,阻塞超时时间
            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord record : records) {
                System.err.printf("topic = %s,offset = %d, key = %s, value = %s%n", record.topic(), record.offset(), record.key(), record.value());
            }
            //手动同步提交
            //consumer.commitAsync();
            //手工异步提交
            if (!records.isEmpty()) {
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map offsets, Exception exception) {
                        if (exception == null) {
                            System.err.println("手工提交offset成功:" + offsets.toString());
                            ;
                        } else {
                            System.err.println("手工提交offset失败:" + offsets.toString());
                        }
                    }
                });
            }
        }
    }
}

(5)运行结果

4、从头开始消费

(1)相关参数说明

MAX_POLL_INTERVAL_MS_CONFIG,"30 * 1000"
AUTO_OFFSET_RESET_CONFIG,"earliest"
  • 消费者消费速度的快慢,如果两次poll的时间超过30s时间间隔,kafka会认为器消费能力弱,将其剔除消费组,将分区分配给其他消费者消费
  • 新消费者组消费方式
    • ①latest:只消费自己启动之后发送到主题的时间
    • ②earlist:第一次从头开始消费,以后按照offset记录继续消费,这个区别于Consumer.seekToBegining(每次都是从头开始消费)

(2)编写脚本 – 组名&配置同时修改才能生效

package net.testclass.testclasskafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    public static final String TOPIC_NAME = "";

    public static Properties getProperties() {
        Properties props = new Properties();
        // brokerID
        props.put("bootstrap.servers","192.168.6.102:9092,192.168.6.103:9092,192.168.6.104:9092");
        //groupID分组内的消费者只能消费该消息一次
        props.put("group.id", "testgroup10");
        //自动提交offset
        props.put("enable.auto.commit", "true");
        //自动提交时间间隔
        props.put("auto.commit.interval.ms","1000");
        //开启从头开始消费
        props.put("auto.offset.reset","earliest");
        //反序列化
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    @Test
    public void simpleConsumerTest(){
        Properties props = getProperties();
        KafkaConsumer consumer = new KafkaConsumer(props);

        consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));
        while (true) {
            //来时间控制,阻塞超时时间
            ConsumerRecords records = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord record : records) {
                System.err.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(),record.offset(), record.key(), record.value());
            }
        }
    }
}

(3)运行结果

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688837.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存