- 1, 本文总结了 kafka消费者开发方式;
- 2, 本文使用的是最新的kafka版本 3.0.0;
【1】 kafka消费则 【1.1】消费者与消费者组
1)消费者: 应用程序需要创建消费者对象,订阅主题并开始接收消息;
2)消费者群组:
- kafka消费者从属于消费者群组;
1)1个消费者接收4个分区 (模型1)
(注:kafka分区编号从0开始,所以本文也从0开始)
2)2个消费者接收4个分区(模型2)
3)4个消费者接收4个分区(模型3)
4)5个消费者接收4个分区(模型4)
5)2个消费者组接收4个分区(模型5)
【小结】消费者与消费者组接收分区模型:
- 一个群组里的消费者订阅的是同一个主题,每个消费者接收一部分分区消息;如模型1,2,3;
- 不同群组各自接收同一个主题的全部消息,相互不影响,不竞争;如模型5;
往群组里添加消费者是横向伸缩消费能力的主要方式;
- 不要让 消费者数量超过分区数量,多余消费者会被重置(如模型4);
只要保证每个应用程序的消费者组id 是唯一的,就可以让它们各自都获取全部数据(注意是全部而不是部分); 因为消费组间不竞争消息,但组内竞争(如模型5);
【1.2】消费者群组和分区再均衡
1)分区再均衡: 分区所有权从一个消费者转到另一个消费者 ;
- 分区再均衡期间, 消费者无法读取消息,整个群组一小段时间不可用;
2)消费者维持从属关系:消费者向被指派为 群组协调器的broker (不同群组有不同的协调器)发送心跳来维持 群组从属关系,及它们对分区所有权;
- 消费者会在轮询消息或提交偏移量时发送心跳;如果停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡, 就会触发一次再均衡;
- 补充: kafka 0.10.1 版本中,引入了 独立的心跳线程,即可以在轮询消息的空档发送心跳;
- 属性配置:
- max.poll.interval.ms 设置轮询间隔时长;
- session.timeout.ms 会话超时时间;
3)分区分配过程
- 当消费者要加入群组时,向群组协调器发送一个 JoinGroup请求;
- step1)第一个加入群组的消费者称为群主。群主从协调器哪里获取群组成员列表(列表包含所有最近发送心跳的消费者,是活跃的),并负责给每一个消费者分配分区。 群主使用了一个实现 PartitionAssignor 接口(默认为Range,但可以通过partition.assignment.strategy 来配置)来决定分区所有权应该分给哪一个消费者;
- step2)分配完成后,群主把分配情况发给协调器;协调器再把这些信息发给所有消费者。每个消费者只能看到自己的分区,而群主可以看到所有; 这个过程在分区再均衡时重复发生;
【2】创建kafka消费者
1)创建kafka消费者必须配置的3个属性
- bootstrap.server :kafka集群broker 列表;
- key.deserializer:键的反序列化器的全限定类名;
- value.deserializer:值的反序列化器的全限定类名;
- 若仅设置3个属性,kafka-client3.0会报错,说是必须配置 group.id
2)创建消费者代码示例
// 创建消费者配置信息 Properties props = new Properties(); // 属性配置 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");
3)订阅主题
consumer.subscribe(Arrays.asList("hello10"));
【3】轮询
1)一旦消费者订阅了主题,轮询会处理所有细节, 包括群组协调,分区再均衡, 发送心跳和获取数据;
2)轮询代码(死循环)
try { int i =0; while(!Thread.interrupted()) { // 模拟延时 try { System.out.println(DateUtils.getNowTimestamp() + " 等待消费消息"); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } // 消费消息 ConsumerRecordsconsumerRds = consumer.poll(100); for(ConsumerRecord rd : consumerRds) { System.out.println("消费者Simple-分区【" + rd.partition() + "】offset【" + rd.offset() + "】键=" + rd.key() + "值=" + rd.value()); } // 手动同步提交 consumer.commitSync(); } } finally { // 记得关闭消费者 consumer.close(); }
整体代码如下:
public class MyConsumerSimple { public static void main(String[] args) { // 创建消费者配置信息 Properties props = new Properties(); // 属性配置 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1"); // 关闭自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 创建消费者 KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("hello10")); try { int i =0; while(!Thread.interrupted()) { // 模拟延时 try { System.out.println(DateUtils.getNowTimestamp() + " 等待消费消息"); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } // 消费消息 ConsumerRecords consumerRds = consumer.poll(100); for(ConsumerRecord rd : consumerRds) { System.out.println("消费者Simple-分区【" + rd.partition() + "】offset【" + rd.offset() + "】键=" + rd.key() + "值=" + rd.value()); } // 手动同步提交 consumer.commitSync(); } } finally { // 记得关闭消费者 consumer.close(); } } }
【4】消费者配置
1)fetch.min.bytes : 消费者从服务器获取记录的最小字节数;
2)fetch.max.wait.ms:设置当没有足够数据(fetch.min.bytes设置)时,需要等待的最大时间 ;
- 默认500ms, 属性1(fetch.min.bytes)和属性2 任意一个属性达到,kafka返回消息,
3)max.partition.fetch.bytes:指定服务器从每个分区返回给消费者的最大字节数; 默认值1M;
即 kafkaConsumer.poll() 从每个分区返回的消息大小不超过该设定值;
- 注意1,该值必须要比broker能够接收的最大消息的字节数(max.message.size 配置)要大,否则消费者无法读取这些消息,一直挂起;
- 注意2,该值的设置并不是越大越好,因为需要考虑消费者处理数据的时间;因为消费者需要频繁调用poll方法来避免会话过期和分区再均衡问题;如果单次返回数据太多,可能无法及时轮询发送心跳;
4)session.timeous.ms :指定了消费者被认为死亡之前与服务器断开连接的时间;
- 1,默认 3s; 消费者被认定为死亡,协调器触发分区再均衡,把分区分配给其他消费者;
- 2,该属性与 heartbeat.interval.ms 紧密相关; heartbeat.interval.ms 指定poll()方法向协调器发送心跳频率,session.timeout.ms 指定了消费者可以多久不发送心跳;
- 3,一般需要同时修改这两个属性, 显然 heartbeat.interval.ms 必须比 session.timeout.ms 小,一般 前者是后者的 三分之一 ;
5)auto.offset.reset :指定消费者读取一个没有偏移量的分区或偏移量无效的情况下,该如何读取;
- 消费者第一次读取分区,分区没有保留它的偏移量信息,这叫没有偏移量;
- 消费者断开,然后上线,分区删除了它的偏移量信息,所以无效;
- 取值:默认值,latest, 读取最新消息;earliest, 从起始位置读取分区;
6)enable.auto.commit : 是否自动提交偏移量,默认值true;
- 6.1)自动提交:配置 auto.commit.interval.ms 控制自动提交频率;
- 6.2)手动提交:
- 同步提交:consumer.commitSync() ;
- 异步提交:comsumer.commitAsync() ;
7)partition.assignment.strategy : 分区分配策略;分区应该分配给同一个消费者组的哪些消费者;PartitionAssignor 根据消费者和主题,决定哪些分区应该分配给消费者;有2个策略:
- 7.1)Range(默认值): 该策略把主题的若干个连续分区分配给消费者;
- 只要使用了range,且分区数量无法被消费者数量整除,就会出现消费者分配的分区数不均等的情况(不整除,无法均分);
- 7.2)RoundRobin: 该策略把主题所有分区逐个分配给消费者;注意是主题所有分区;
- 一般来说, RoundRobin 会给消费者分配相同数量的分区(均分);
// 消费者设置分区策略 (默认值-RangeAssignor) props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
补充:消费者接收分区的分配策略;
Range分配,在分区数不整除消费者数时,是不会均分的;
RoundRobin分配,会尽量均分,即使不整除;
8)client.id : 可以是任意字符串,用来标识消息来源;
- 通常用在日志,度量指标和配额里;
9)max.poll.records : 指定单次调用 poll 能够返回的记录数量;
10)receive.buffer.bytes 和 send.buffer.bytes :
- 设置socket 在读写数据时用到的tcp 缓冲区大小;若设置为-1,使用 *** 作系统默认值;
【5】提交和偏移量
1)提交: 更新分区当前位置的 *** 作;
2)消费者如何提交偏移量 ?
消费者往 _consumer_offset 这个特殊主题发送消息, 消息里面包含每个分区的偏移量;
- 如果消费者一致处于运行状态,那偏移量就没有作用;
- 但若有消费者上线和下线,就会触发再均衡;完成再均衡后,消费者为了继续之前的工作,需要读取每个分区最后一次提交的偏移量,然后从偏移量指定位置继续处理;
3)3种提交偏移量方式
- 自动提交;
- 手动同步提交;
- 手动异步提交;
1)最简单的提交方式是自动提交;
- enable.auto.commit 设置为true ;则每过5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去;提交间隔 通过 auto.commit.interval.ms 设置,默认5s;
- 自动提交在轮询里进行; 在每次轮询时,会检查是否需要提交偏移量,若是,则提交从上一次轮询返回的偏移量;
2)自动提交的问题: 可能造成数据重复消费(这需要消费者支持幂等性来解决);
若提交间隔时间为5s, 在最近一次提交后的3s发生了分区再均衡;如分区1的消费者从A换成了B;B 从最后一次A提交的偏移量开始读取消息。所以 消费者A和B会重复处理相同数据;因为消费者A处理了消息但没有提交偏移量,这就会造成A在最后3s处理的消息会被B重复处理;
【5.2】提交当前偏移量 (手动同步提交)
提交处理的这批消息的最后一个偏移量;
1)如何实现手动同步提交
- 把 enable.auto.commit 设置为false,让消费者调用 commitSync() 手动提交;这个方法会提交由 poll() 方法返回的最新偏移量;提交成功马上返回,提交失败抛出异常;
2)如果发生分区再均衡,从最近一次提交到发生再均衡之间的所有消息都将被重复处理;
3)代码例子
public class MyConsumerSyncCommit { public static void main(String[] args) { // 创建消费者配置信息 Properties props = new Properties(); // 属性配置 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1"); // 关闭自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 设置消费消息的位置,消费最新消息 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 设置分区策略 (默认值-RangeAssignor) props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName()); // 创建消费者 KafkaConsumerconsumer = new KafkaConsumer<>(props); // 订阅主题 consumer.subscribe(Arrays.asList("hello11")); // 循环拉取 try { while(!Thread.interrupted()) { // 模拟延时 try { System.out.println(DateUtils.getNowTimestamp() + " 消费者-SyncCommit-等待消费消息"); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } // 消费消息 ConsumerRecords consumerRds = consumer.poll(100); for(ConsumerRecord rd : consumerRds) { System.out.println("消费者-SyncCommit-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value()); } // 手动同步提交 consumer.commitSync(); } } finally { // 记得关闭消费者 consumer.close(); } } }
补充: 手动同步提交的问题, 在broker对提交请求做出回应之前, 应用程序会一直阻塞;这样会限制应用程序的吞吐量;
【5.3】手动异步提交
提交处理的这批消息的最后一个偏移量;
1)为解决同步提交问题,引入了异步提交;
// 手动异步提交 consumer.commitAsync();
异步提交的问题: 在成功提交或碰到无法恢复的错误前,commitSync() 会一直重试,但 commitAsync() 不会,这就是问题所在;
2)异步提交的回调(若有异常,可以处理 )
- 一般被用于记录提交错误或生成度量指标;
- 不过如果要重试提交,一定要注意顺序;
// 循环拉取 try { while(!Thread.interrupted()) { // 模拟延时 try { System.out.println(DateUtils.getNowTimestamp() + " 消费者-SyncCommit-等待消费消息"); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } // 消费消息 ConsumerRecordsconsumerRds = consumer.poll(100); for(ConsumerRecord rd : consumerRds) { System.out.println("消费者-SyncCommit-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value()); } // 手动异步提交(带异步提交完成后的回调) consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception exception) { if (exception !=null) { System.out.println("异步提交异常"); // 把错误信息写入db 或日志 } } }); } } finally { // 记得关闭消费者 consumer.close(); }
但是异步回调也是有问题的;
如消费者1发送一个请求1,提交偏移量2000,但因为网络问题,kafka服务器没有收到或超时;如消费者1发送一个请求2,提交偏移量3000,且成功写入kafka;
这时,如果请求1,再重试,则kafka偏移量由 3000 设置为2000,则 偏移量2000~3000的消息会被重复消费,这就是问题所在了;
而且如果 把2000,3000 替换为 1 或 100w的偏移量,那会影响整个系统性能,即便支持幂等;因为消费重复消息需要白费系统资源;
解决方法: 把异常信息,提交失败的偏移量存入数据库,然后重试;重试时一定要注意不要让偏移量后退;
【5.4】同步异步组合提交
1)轮询中使用异步提交,而轮询外,在关闭数据库连接前使用同步提交;
2)代码示例
// 循环拉取 try { while(!Thread.interrupted()) { // 模拟延时 try { System.out.println(DateUtils.getNowTimestamp() + " 消费者-SyncCommit-等待消费消息"); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } // 消费消息 ConsumerRecordsconsumerRds = consumer.poll(100); for(ConsumerRecord rd : consumerRds) { System.out.println("消费者-SyncCommit-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value()); } // 【异步提交】 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map offsets, Exception exception) { if (exception !=null) { System.out.println("异步提交异常"); // 把错误信息写入db 或日志 } } }); } } finally { try { // 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误 consumer.commitSync(); } finally { // 记得关闭消费者 consumer.close(); } }
2)小结:
- 异步提交:commitAsync() 提交,速度更快,即使这次提交失败,下一次提交很可能会成功;
- 同步提交:commitSync() 提交;会因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误;
【5.5】提交特定偏移量
1)场景
轮询获得了一批数据,偏移量从1k ~ 10w;我想每处理1k条消息,我就提交一次偏移量;
这是 commitSync 和 commitAsync() 办不到的;因为它们提交的最后一个偏移量,而不是中间某个消息的偏移量;
2)代码示例
// 消费消息【提交特定偏移量】 ConsumerRecordsconsumerRds = consumer.poll(100); int counter = 0; for(ConsumerRecord rd : consumerRds) { System.out.println("消费者-SyncCommit-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value()); if (++counter % 100 == 0) { // 每消费100条消息,就提交一次偏移量 // 注意,这里提交的偏移量要加1 curOffsets.put(new TopicPartition(rd.topic(), rd.partition()), new OffsetAndmetadata(rd.offset()+1, "no metadata")); // 异步或同步提交特定偏移量 consumer.commitAsync(curOffsets, new OffsetCommitCallbackImpl()); } } // 轮序完之后,若无法整除,还要提交剩余消息的偏移量 if (counter % 100 != 0) { consumer.commitAsync(new OffsetCommitCallbackImpl()); }
注意: 提交特定偏移量时,需要把最后一个消息的偏移量加1;
【5.6】再均衡监听器
1)应用场景
在消费者退出或进行分区再均衡前,会做一些清理工作,如提交偏移量,或关闭数据库连接;这些工作可以通过监听器来实现;
2)具体实现和测试,参见 kafka再均衡监听器测试_PacosonSWJTU的博客-CSDN博客
【5.7】 从特定偏移量处开始处理记录
1)一些场景,希望从指定偏移量接收消息;
2)从分区起始位置,末尾位置读取消息的方法分别为:
- seekToBeginning(Collection
tp); - seekToEnd(Collection
tp);
3)为了避免处理重复消息,有一种解决方案,采用原子 *** 作;
把消息内容和偏移量放在同一个原子 *** 作完成;消息和偏移量要么都提交成功,要么都失败;
如果消息存数据库,偏移量提交到kafka,就无法实现原子;
但把消息和偏移量都存入数据库,就可以实现原子 *** 作;
然后当消费者下一次启动时,就从数据库读取上一次保存偏移量;并从该偏移量表示的位置接收消息;
3.1)以上方案,需要依赖 ConsumerRebalanceListener(分区再均衡监听器) 和 seek(offset) 来实现;
- 向seek() 方法传入偏移量, 下文的poll轮询 *** 作就可以从该偏移量接收消息;
- 在 监听器的 onPartitionsRevoked() 方法中 把消息和偏移量保存到数据库,并提交事务;以便下一次消费前从数据库读取偏移量,传入seek方法;
补充, 分区再均衡监听器,参见
kafka再均衡监听器测试_PacosonSWJTU的博客-CSDN博客
【5.8】消费者如何退出(关闭)
1)告诉消费者如何优雅退出 ?
另一个线程调用 consumer.wakeup() 方法;就可以让消费者退出poll方法,并抛出 WakeupException 异常;该方法是唯一一个可以从其他线程安全调用的方法;
- 并不需要处理 WakeupException 异常,因为它只是用于跳出循环的一种方式;
- 记得在退出线程前需要调用 consumer.close() 方法,关闭消费者,会提交任何还没有提交的东西,并向群组协调器发送消息告知自己要离开群组;
- 接下来会触发再均衡,而不需要等待会话超时;
2)在消费者主线程添加 jvm关闭时的钩子
【5.8.1】带有jvm关闭钩子的消费者jvm中增加一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的所有 通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等 *** 作。
public class MyConsumerShutdownHook { public static void main(String[] args) { // 创建消费者配置信息 Properties props = new Properties(); // 属性配置 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1"); // 关闭自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 设置消费消息的位置,消费最新消息 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 设置分区策略 (默认值-RangeAssignor) props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName()); // 创建消费者 KafkaConsumerconsumer = new KafkaConsumer<>(props); // 订阅主题, 【没有分区再均衡监听器】 consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME)); // 添加消费者jvm关闭钩子,在消费者主程序退出前执行 addShutdownHook(consumer); // 循环拉取 try { while(!Thread.interrupted()) { System.out.println(DateUtils.getNowTimestamp() + " > 带有jvm关闭钩子的消费者等待消费消息"); TimeUtils.sleep(1000); // 消费消息 ConsumerRecords consumerRds = consumer.poll(100); for(ConsumerRecord rd : consumerRds) { System.out.println("消费者-ShutdownHook-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value()); } // 【异步提交】 consumer.commitAsync(new OffsetCommitCallbackImpl()); if (!consumerRds.isEmpty()) throw new RuntimeException("测试-抛出运行时异常"); } } finally { try { // 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误 consumer.commitSync(); } finally { // 记得关闭消费者 consumer.close(); System.out.println("消费者关闭"); } } } private static void addShutdownHook(Consumer consumer) { Thread mainThread = Thread.currentThread(); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { // 关闭钩子运行在独立线程,以便我们可以安全退出poll() 抛出WakeupException() 异常 System.out.println("进入消费者jvm关闭钩子"); consumer.wakeup(); try { // 当前线程一直挂起,直到主线程运行完成 mainThread.join(); System.out.println("消费者主程序执行完成,关闭钩子done"); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("消费者jvm关闭钩子执行异常"); } } }); } }
运行日志 :
........
消费者-ShutdownHook-分区【2】offset【1294】值=[585] > ABCDE
消费者-ShutdownHook-分区【2】offset【1295】值=[588] > ABCDE
消费者关闭
进入消费者jvm关闭钩子
消费者主程序执行完成,关闭钩子done
Exception in thread "main" java.lang.RuntimeException: 测试-抛出运行时异常
at kafka.consumer.shutdownhook.MyConsumerShutdownHook.main(MyConsumerShutdownHook.java:57)
注意: 在消费者主线程退出之前,确保彻底关闭了消费者;
【5.9】独立消费者如何接收消息
1)独立消费者: 即一个群组只有一个消费者(接收所有分区消息); 这样就不存在分区再均衡问题;
2)对于独立消费者,不需要订阅主题,而是为自己分配分区;
- 一个消费者可以订阅主题(并加入群组),或者为自己分配分区,但不能同时做这两件事情;
3)独立消费者为自己分配分区并接收消息;
public class MyIndependentConsumer { public static void main(String[] args) { // 创建消费者配置信息 Properties props = new Properties(); // 属性配置 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1"); // 关闭自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 设置消费消息的位置,消费最新消息 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 设置分区策略 (默认值-RangeAssignor) props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName()); // 创建消费者 KafkaConsumerconsumer = new KafkaConsumer<>(props); // 为自己分配分区,这里我们并没有订阅主题* assignPartition(consumer); // 循环拉取 try { while(!Thread.interrupted()) { System.out.println(DateUtils.getNowTimestamp() + " > 独立消费者等待消费消息"); TimeUtils.sleep(1000); // 消费消息 ConsumerRecords consumerRds = consumer.poll(100); for(ConsumerRecord rd : consumerRds) { System.out.println("消费者-独立-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value()); } // 【异步提交】 consumer.commitAsync(new OffsetCommitCallbackImpl()); } } finally { try { // 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误 consumer.commitSync(); } finally { // 记得关闭消费者 consumer.close(); System.out.println("消费者关闭"); } } } private static void assignPartition(Consumer consumer) { // 获取topic的分区信息列表 List partitionInfoList = consumer.partitionsFor(MyProducer.TOPIC_NAME); if (partitionInfoList == null || partitionInfoList.isEmpty()) { throw new RuntimeException("topic 分区查无记录"); } // 创建主题分区并添加到列表(所有分区都添加到列表) List partitions = new ArrayList<>(partitionInfoList.size()); partitionInfoList.forEach(x->{ partitions.add(new TopicPartition(x.topic(), x.partition())); }); // 给自己分配分区 consumer.assign(partitions); } }
为自己分配分区,而不是订阅主题核心代码;
private static void assignPartition(Consumer consumer) { // 获取topic的分区信息列表 ListpartitionInfoList = consumer.partitionsFor(MyProducer.TOPIC_NAME); if (partitionInfoList == null || partitionInfoList.isEmpty()) { throw new RuntimeException("topic 分区查无记录"); } // 创建主题分区并添加到列表(所有分区都添加到列表) List partitions = new ArrayList<>(partitionInfoList.size()); partitionInfoList.forEach(x->{ partitions.add(new TopicPartition(x.topic(), x.partition())); }); // 给自己分配分区 consumer.assign(partitions); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)