package net.testclass.testclasskafka; import org.apache.kafka.clients.consumer.*; import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Properties; public class MyConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.6.102"); //消费者组名 props.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup"); //是否自动提交offset,默认是true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"ture"); //自动提交offset间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //一次poll拉取消息的最大条数 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"500"); //consumer给broker发送消息时间 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"1000"); //kafka如果超过10s没有收到消息的心跳,则会把消费者提出消费者组,进行rebalance,将分区分配给其他消费者 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"10 * 1000"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"30 * 1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); Consumer2、消费者拉取broker消息流程consu = new KafkaConsumer (props); Collection topics = Arrays.asList("mytest"); //消费者订阅topic consu.subscribe(topics); ConsumerRecords consumerRecords = null; while(true){ //接下来就要从topic中拉去数据 consumerRecords = consu.poll(Duration.ofMillis(1000)); //遍历每一条记录 for(ConsumerRecord consumerRecord : consumerRecords){ long offset = consumerRecord.offset(); int partition = consumerRecord.partition(); Object key = consumerRecord.key(); Object value = consumerRecord.value(); System.out.println(offset+" "+partition+" "+key+" "+value); } } } }
(1)消费者消费机制
- 消费者从broker获取数据机制
- 采用pull拉去方式,从broker的partition获取数据
- 消费者从获取哪个分区数据
- 一个partition leader可以由一个消费者组中的一个消费者进行消费
(2)消费者消费分区的分配策略
①轮询分配:按照消费者组进行轮询分配,将同个消费者监听的所有主题的所有partition和所有的consumer都列出来,进行轮询。
- 弊端:如果在同一个消费者组内,订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀
#实际案例 消费者组:有三个消费者C1,C2,C3,共同订阅了3个主题,t0,t1,t2 topic分区情况:t0一个分区(p0),t1两个分区(p0,p1),t2(p0,p1,p2) #共同订阅的轮询分配 消费者具体订阅主题情况:3个消费者都订阅t0,t1,t2 轮询分配:C1拿到t0(p0)、t2(p0)/C2拿到t1(p0)、t2(p1)/C3拿到t1(p1)、t2(p2) #不同订阅的轮询分配 消费者具体订阅主题情况:C1订阅t1,C2订阅了t1,t2,C3订阅了t1,t2,t3 轮询分配方案:C1拿到t0(p0)/C2拿到t1(p0)/C3拿到t1(p1)、t2(p0)、t2(p1)、t2(p2)
②范围分配:按照主题进行分配,如果不平均分配则对每一个Topi而言,前n个消费者会分配多一个分区。
(3)消费速度相关参数
MAX_POLL_RECORDS_CONFIG,"500" consu.poll(Duration.ofMillis(1000));
- 如果一次poll消息到500条,就直接执行业务逻辑;
- 如果一次poll没有到500条,且时间还在1秒内,那么长轮询继续poll,要么到500条,要么到1s;
- 如果多次poll都没有达到500条,且时间到了1s,则直接执行业务逻辑。
(4)心跳设置
HEARTBEAT_INTERVAL_MS_CONFIG,"1000" SESSION_TIMEOUT_MS_CONFIG,"10 * 1000"
- 消费者发送心跳时间间隔为1s;
- 如果集群超过10s没有搜到消费者的心跳,则会把消费者提出消费者组,进行rebalace,把分区分配给其他消费者。
(5)代码实战
- yml配置日志修改
server: port: 8080 logging: config: classpath:logback.xml
- xml文件配置
%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n
- 编写消费者脚本
package net.testclass.testclasskafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class MyConsumer { public static final String TOPIC_NAME = ""; public static Properties getProperties() { Properties props = new Properties(); // brokerID props.put("bootstrap.servers","192.168.6.102:9092,"); //groupID分组内的消费者只能消费该消息一次 props.put("group.id", "testgroup1"); //自动提交offset props.put("enable.auto.commit", "true"); //自动提交时间间隔 props.put("auto.commit.interval.ms","1000"); //反序列化 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); return props; } @Test public void simpleConsumerTest(){ Properties props = getProperties(); KafkaConsumerconsumer = new KafkaConsumer (props); consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME)); while (true) { //拉取时间控制,阻塞超时时间 ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord record : records) { System.err.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(),record.offset(), record.key(), record.value()); } } } }
(6)脚本运行
先运行producer脚本,后运行consumer脚本
- 查看生产者运行结果
- 查看消费者运行结果
- 自动提交:消息poll下来之后,直接提交offset;
- 手动提交:在消息消费后再提交offset;
- 注意:实际业务场景都会手动提交offset
(1)参数说明
ENABLE_AUTO_COMMIT_CONFIG,"ture" AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"
- offset自动提交开关,ture代表自动提交,false则相反
- offset自动提交的时间间隔,消费者poll消息后,会自动向broker的_consumer_offset主题提交当前主题-分区消费的偏移量;
- 注意事项:默认提交会丢消息,如果消费者还没消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息,之前未被消费的消息就丢失掉了
(2)编写脚本
package net.testclass.testclasskafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class MyConsumer { public static final String TOPIC_NAME = ""; public static Properties getProperties() { Properties props = new Properties(); // brokerID props.put("bootstrap.servers","192.168.6.102:9092,"); //groupID分组内的消费者只能消费该消息一次 props.put("group.id", "testgroup1"); //自动提交offset props.put("enable.auto.commit", "true"); //自动提交时间间隔 props.put("auto.commit.interval.ms","1000"); //反序列化 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); return props; } @Test public void simpleConsumerTest(){ Properties props = getProperties(); KafkaConsumerconsumer = new KafkaConsumer (props); consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME)); while (true) { //拉取时间控制,阻塞超时时间 ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord record : records) { System.err.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(),record.offset(), record.key(), record.value()); } } } }
(3)运行结果
先运行producer脚本,后运行consumer脚本
- 查看生产者运行结果
- 查看消费者运行结果
(1)参数说明
ENABLE_AUTO_COMMIT_CONFIG,"false"
- 手动同步提交:线程阻塞知道offset提交成功
- 手动异步提交:线程不会阻塞,通过回调函数记录失败信息,生产环境用的更多
(2)手动同步提交脚本
package net.testclass.testclasskafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class MyConsumer { public static final String TOPIC_NAME = ""; public static Properties getProperties() { Properties props = new Properties(); // brokerID props.put("bootstrap.servers","192.168.6.102:9092,192.168.6.103:9092,192.168.6.104:9092"); //groupID分组内的消费者只能消费该消息一次 props.put("group.id", "testgroup10"); //手动提交offset props.put("enable.auto.commit", "false"); //自动提交时间间隔 //props.put("auto.commit.interval.ms","1000"); //开启从头开始消费 props.put("auto.offset.reset","earliest"); //反序列化 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); return props; } @Test public void simpleConsumerTest(){ Properties props = getProperties(); KafkaConsumerconsumer = new KafkaConsumer (props); consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME)); while (true) { //进行时间控制,阻塞超时时间 ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord record : records) { System.err.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(),record.offset(), record.key(), record.value()); } //手动同步提交 consumer.commitAsync(); } } }
(3)运行结果
先运行consumer,之后运行producer
- 生产者运行结果
- 消费者运行结果
(4)手动异步提交脚本
package net.testclass.testclasskafka; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties; public class MyConsumer { public static final String TOPIC_NAME = ""; public static Properties getProperties() { Properties props = new Properties(); // brokerID props.put("bootstrap.servers","192.168.6.102:9092,192.168.6.103:9092,192.168.6.104:9092"); //groupID分组内的消费者只能消费该消息一次 props.put("group.id", "testgroup10"); //手动提交offset props.put("enable.auto.commit", "false"); //自动提交时间间隔 //props.put("auto.commit.interval.ms","1000"); //开启从头开始消费 props.put("auto.offset.reset","earliest"); //反序列化 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); return props; } @Test public void simpleConsumerTest(){ Properties props = getProperties(); KafkaConsumerconsumer = new KafkaConsumer (props); consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME)); while (true) { //进行时间控制,阻塞超时时间 ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord record : records) { System.err.printf("topic = %s,offset = %d, key = %s, value = %s%n", record.topic(), record.offset(), record.key(), record.value()); } //手动同步提交 //consumer.commitAsync(); //手工异步提交 if (!records.isEmpty()) { consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception exception) { if (exception == null) { System.err.println("手工提交offset成功:" + offsets.toString()); ; } else { System.err.println("手工提交offset失败:" + offsets.toString()); } } }); } } } }
(5)运行结果
4、从头开始消费(1)相关参数说明
MAX_POLL_INTERVAL_MS_CONFIG,"30 * 1000" AUTO_OFFSET_RESET_CONFIG,"earliest"
- 消费者消费速度的快慢,如果两次poll的时间超过30s时间间隔,kafka会认为器消费能力弱,将其剔除消费组,将分区分配给其他消费者消费
- 新消费者组消费方式
- ①latest:只消费自己启动之后发送到主题的时间
- ②earlist:第一次从头开始消费,以后按照offset记录继续消费,这个区别于Consumer.seekToBegining(每次都是从头开始消费)
(2)编写脚本 – 组名&配置同时修改才能生效
package net.testclass.testclasskafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class MyConsumer { public static final String TOPIC_NAME = ""; public static Properties getProperties() { Properties props = new Properties(); // brokerID props.put("bootstrap.servers","192.168.6.102:9092,192.168.6.103:9092,192.168.6.104:9092"); //groupID分组内的消费者只能消费该消息一次 props.put("group.id", "testgroup10"); //自动提交offset props.put("enable.auto.commit", "true"); //自动提交时间间隔 props.put("auto.commit.interval.ms","1000"); //开启从头开始消费 props.put("auto.offset.reset","earliest"); //反序列化 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); return props; } @Test public void simpleConsumerTest(){ Properties props = getProperties(); KafkaConsumerconsumer = new KafkaConsumer (props); consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME)); while (true) { //来时间控制,阻塞超时时间 ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord record : records) { System.err.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(),record.offset(), record.key(), record.value()); } } } }
(3)运行结果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)