- 消费步骤
- 消息订阅
- 序列化方式
- 拉取消息
- offset提交
- 重复消费问题
- 控制和关闭消费者
- 指定位移消费
- 线程不安全
- 消费者拦截器
- 消费者多线程
- 订阅主题
- 拉取消息
- 提交offset
- 关闭消费者实列
- subscribe(Collection)集合订阅分区
- subscribe(Pattern)正则表达式订阅分区
- assign(Collection)指定分区订阅
通过subscribe订阅主题具有消费者自动再均衡功能,在多个消费者的情况下可以根据分区策略自动分配各个消费者和分区的关系。
Avro、JSON、Thrift、Protobuf、Protostuff序列化工具
拉取消息- poll(time),设置超时时间可以传long单位为毫秒数,或者使用Duration设置超时时间。返回类型为ConsumerRecords包含多条消息
- 获取指定分区中数据:ConsumerRecords中recored(TopicPartition)获取,通过ConsumerRecords中partitons获取所有分区。
每一个ConsumerRecord中包含消息当前位移,假设为x,提交offset的时候需要提交x+1的位置,代表下一次消费的位置。
消费者客户端参数enable.auto.commit,true为自动提交offset,默认为true,自动提交时为消费者定期提交offset,通过客户端auto.commit.interval.ms配置,默认5s
- 同步提交consumer.commitsync,consumer.commitsync阻塞消费者消费,直到提交完最新的位移,按分区粒度提交offset,OffsetAndmetadata。
- 异步提交consumer.commitasync。
- 重复消费发送的情况
- 集群环境下一个客户端处理了,当还没提交,另一个客户端消费
- 自动提交offset时,由于定期提交offset时间过长,消息消费完还没自动提交offset,进行了下一次消费。缩小提交间隔时间(没用),改成代码提交。
- 消费消息后,将消息放入缓存,缓存达到限定值时批量提交
- consumer.commitasync引入失败重试机制,当提交位移x时失败了,下一次提交位移x+y成功,重试将位移x提交成功,后续从x消费数据
- 重复消费问题解决
维护一个递增序号(标记)记录当前最大offset,当最大offset存在时,重试的小offset将不再提交。适用所有
暂停消费pause();继续消费resume()。
指定位移消费- auto.offset.reset有一个可配置的值none,当查不到位移消费时,即不从最新的消费位置出开始消费,也不从最早的消息位置处消费,而是直接抛异常NoOffsetForPartitionException
- lastest从最近的消费位移处消费
- earliest从最早的消费位置
- seek(TopicPartition,long)方法,seek设置分区offset需要先调用一次poll方法给消费者分配分区。通过assignment判断是否获取到了分区。
kafkaconsumner为线程不安全,多个线程共用一个consumer会报错:Kafka is not safe for muti-thread access.
Kafkaconsumer中定义了一个acquire方法,用来检测当前是否只有一个线程在 *** 作。如果有多个线程 *** 作时,将会抛异常Kafka is not safe for muti-thread access。
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD); //KafkaConsumer中的成员变量 private void acquire() { long threadId = Thread.currentThread().getId(); if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) throw new ConcurrentModificationException ("KafkaConsumer is not safe for multi-threaded access"); refcount.incrementAndGet(); }消费者拦截器
消费者拦截器主要在消费到消息或者提交消费者位移时进行一些定制化的 *** 作。
consumerInterceptor接口主要有三个方法
- onConsumer 在poll方法返回之前调用,可以对消息进行修改
- onCommit 在提交位置后进行 *** 作commitSync之后,记录位移提交
- close 回收资源
- 方式一:线程封闭,为每个线程实例化一个KafkaConsumer对象。一个线程对应一个KafkaConsumer示列,一个消费线程消费一个或者多个分区中的消息,所有消费线程都隶属一个消费组
- 方式二:由一个线程poll拉取消息,处理消息设置为多线程。设置一个全局offset用于提交位移,但是会出现消息丢失的情况,比如线程一消费1-100的消息,线程二消费完了100-200的消息并将offset设置为201,并提交了offset,线程一处理失败
解决方法:基于滑动窗口维护一个消息缓存队列,窗口大小为线程大小,处理消息的线程从窗口里取,窗口有与一个startoffset和endoffset,当startoffset消费完成,窗口向未消费的位置移动。该方法会出现重复消费问题,以及消息悬停。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)