kafka消费者开发方式小结

kafka消费者开发方式小结,第1张

kafka消费者开发方式小结 【README】
  • 1, 本文总结了 kafka消费者开发方式;
  • 2, 本文使用的是最新的kafka版本 3.0.0;

【1】 kafka消费则 【1.1】消费者与消费者组

1)消费者: 应用程序需要创建消费者对象,订阅主题并开始接收消息;

2)消费者群组:

  • kafka消费者从属于消费者群组;
【1.2】消费者接收分区消息模型

1)1个消费者接收4个分区 (模型1)

(注:kafka分区编号从0开始,所以本文也从0开始)

2)2个消费者接收4个分区(模型2)

3)4个消费者接收4个分区(模型3)

4)5个消费者接收4个分区(模型4)

5)2个消费者组接收4个分区(模型5)

【小结】消费者与消费者组接收分区模型:

  • 一个群组里的消费者订阅的是同一个主题,每个消费者接收一部分分区消息;如模型1,2,3;
  • 不同群组各自接收同一个主题的全部消息,相互不影响,不竞争;如模型5;

往群组里添加消费者是横向伸缩消费能力的主要方式;

  • 不要让 消费者数量超过分区数量,多余消费者会被重置(如模型4);

只要保证每个应用程序的消费者组id 是唯一的,就可以让它们各自都获取全部数据(注意是全部而不是部分); 因为消费组间不竞争消息,但组内竞争(如模型5);


【1.2】消费者群组和分区再均衡

1)分区再均衡: 分区所有权从一个消费者转到另一个消费者 ;

  • 分区再均衡期间, 消费者无法读取消息,整个群组一小段时间不可用; 

2)消费者维持从属关系:消费者向被指派为 群组协调器的broker (不同群组有不同的协调器)发送心跳来维持 群组从属关系,及它们对分区所有权;

  • 消费者会在轮询消息或提交偏移量时发送心跳;如果停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡, 就会触发一次再均衡;
  • 补充: kafka 0.10.1 版本中,引入了 独立的心跳线程,即可以在轮询消息的空档发送心跳;
  • 属性配置:
    •  max.poll.interval.ms 设置轮询间隔时长;
    • session.timeout.ms 会话超时时间;

3)分区分配过程

  • 当消费者要加入群组时,向群组协调器发送一个 JoinGroup请求;
    • step1)第一个加入群组的消费者称为群主。群主从协调器哪里获取群组成员列表(列表包含所有最近发送心跳的消费者,是活跃的),并负责给每一个消费者分配分区。 群主使用了一个实现  PartitionAssignor 接口(默认为Range,但可以通过partition.assignment.strategy 来配置)来决定分区所有权应该分给哪一个消费者; 
    • step2)分配完成后,群主把分配情况发给协调器;协调器再把这些信息发给所有消费者。每个消费者只能看到自己的分区,而群主可以看到所有; 这个过程在分区再均衡时重复发生;


 

【2】创建kafka消费者

1)创建kafka消费者必须配置的3个属性

  • bootstrap.server :kafka集群broker 列表;
  • key.deserializer:键的反序列化器的全限定类名;
  • value.deserializer:值的反序列化器的全限定类名;
  • 若仅设置3个属性,kafka-client3.0会报错,说是必须配置 group.id

2)创建消费者代码示例 

// 创建消费者配置信息
Properties props = new Properties();
// 属性配置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");

 3)订阅主题 

consumer.subscribe(Arrays.asList("hello10"));

【3】轮询

1)一旦消费者订阅了主题,轮询会处理所有细节, 包括群组协调,分区再均衡, 发送心跳和获取数据

2)轮询代码(死循环)

try {
	int i =0;
	while(!Thread.interrupted()) {
		// 模拟延时
		try {
			System.out.println(DateUtils.getNowTimestamp() + " 等待消费消息");
			TimeUnit.MILLISECONDS.sleep(100);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		// 消费消息
		ConsumerRecords consumerRds  = consumer.poll(100);
		for(ConsumerRecord rd : consumerRds) {
			System.out.println("消费者Simple-分区【" + rd.partition() + "】offset【" + rd.offset() + "】键=" +  rd.key() + "值=" + rd.value());
		}
		// 手动同步提交
		consumer.commitSync();
	}
} finally {
	// 记得关闭消费者
	consumer.close();
}

整体代码如下:

public class MyConsumerSimple {
	public static void main(String[] args) {
		// 创建消费者配置信息
		Properties props = new Properties();
		// 属性配置
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");
		//   关闭自动提交
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

		// 创建消费者
		KafkaConsumer consumer = new KafkaConsumer<>(props);

		
		consumer.subscribe(Arrays.asList("hello10"));

		
		try {
			int i =0;
			while(!Thread.interrupted()) {
				// 模拟延时
				try {
					System.out.println(DateUtils.getNowTimestamp() + " 等待消费消息");
					TimeUnit.MILLISECONDS.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				// 消费消息
				ConsumerRecords consumerRds  = consumer.poll(100);
				for(ConsumerRecord rd : consumerRds) {
					System.out.println("消费者Simple-分区【" + rd.partition() + "】offset【" + rd.offset() + "】键=" +  rd.key() + "值=" + rd.value());
				}
				// 手动同步提交
				consumer.commitSync();
			}
		} finally {
			// 记得关闭消费者
			consumer.close();
		}
	}
}

【4】消费者配置

1)fetch.min.bytes : 消费者从服务器获取记录的最小字节数;

2)fetch.max.wait.ms:设置当没有足够数据(fetch.min.bytes设置)时,需要等待的最大时间 ;

  • 默认500ms, 属性1(fetch.min.bytes)和属性2 任意一个属性达到,kafka返回消息,

3)max.partition.fetch.bytes:指定服务器从每个分区返回给消费者的最大字节数; 默认值1M;

即 kafkaConsumer.poll() 从每个分区返回的消息大小不超过该设定值;

  • 注意1,该值必须要比broker能够接收的最大消息的字节数(max.message.size 配置)要大,否则消费者无法读取这些消息,一直挂起;
  • 注意2,该值的设置并不是越大越好,因为需要考虑消费者处理数据的时间;因为消费者需要频繁调用poll方法来避免会话过期和分区再均衡问题;如果单次返回数据太多,可能无法及时轮询发送心跳;

4)session.timeous.ms :指定了消费者被认为死亡之前与服务器断开连接的时间;

  • 1,默认 3s; 消费者被认定为死亡,协调器触发分区再均衡,把分区分配给其他消费者;
  • 2,该属性与  heartbeat.interval.ms 紧密相关; heartbeat.interval.ms 指定poll()方法向协调器发送心跳频率,session.timeout.ms 指定了消费者可以多久不发送心跳
  • 3,一般需要同时修改这两个属性,  显然 heartbeat.interval.ms 必须比  session.timeout.ms 小,一般 前者是后者的 三分之一 ;

5)auto.offset.reset :指定消费者读取一个没有偏移量的分区或偏移量无效的情况下,该如何读取;

  1. 消费者第一次读取分区,分区没有保留它的偏移量信息,这叫没有偏移量;
  2. 消费者断开,然后上线,分区删除了它的偏移量信息,所以无效;
  3. 取值:默认值,latest, 读取最新消息;earliest, 从起始位置读取分区;

6)enable.auto.commit : 是否自动提交偏移量,默认值true;

  • 6.1)自动提交:配置 auto.commit.interval.ms 控制自动提交频率;
  • 6.2)手动提交:
    • 同步提交:consumer.commitSync()  ;
    • 异步提交:comsumer.commitAsync() ;

7)partition.assignment.strategy : 分区分配策略;分区应该分配给同一个消费者组的哪些消费者;PartitionAssignor 根据消费者和主题,决定哪些分区应该分配给消费者;有2个策略:

  • 7.1)Range(默认值): 该策略把主题的若干个连续分区分配给消费者;
    • 只要使用了range,且分区数量无法被消费者数量整除,就会出现消费者分配的分区数不均等的情况(不整除,无法均分);
  • 7.2)RoundRobin: 该策略把主题所有分区逐个分配给消费者;注意是主题所有分区;
    • 一般来说, RoundRobin 会给消费者分配相同数量的分区(均分)
// 消费者设置分区策略 (默认值-RangeAssignor)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

补充:消费者接收分区的分配策略;

Range分配,在分区数不整除消费者数时,是不会均分的;

RoundRobin分配,会尽量均分,即使不整除;

 

 8)client.id : 可以是任意字符串,用来标识消息来源;

  • 通常用在日志,度量指标和配额里;

9)max.poll.records : 指定单次调用 poll 能够返回的记录数量;

10)receive.buffer.bytes 和 send.buffer.bytes :

  • 设置socket 在读写数据时用到的tcp 缓冲区大小;若设置为-1,使用 *** 作系统默认值;

【5】提交和偏移量

1)提交: 更新分区当前位置的 *** 作;

2)消费者如何提交偏移量 ?

消费者往 _consumer_offset 这个特殊主题发送消息, 消息里面包含每个分区的偏移量;

  • 如果消费者一致处于运行状态,那偏移量就没有作用;
  • 但若有消费者上线和下线,就会触发再均衡;完成再均衡后,消费者为了继续之前的工作,需要读取每个分区最后一次提交的偏移量,然后从偏移量指定位置继续处理;

3)3种提交偏移量方式

  1. 自动提交;
  2. 手动同步提交;
  3. 手动异步提交; 
【5.1】自动提交

1)最简单的提交方式是自动提交;

  • enable.auto.commit 设置为true ;则每过5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去;提交间隔 通过 auto.commit.interval.ms 设置,默认5s;
  • 自动提交在轮询里进行; 在每次轮询时,会检查是否需要提交偏移量,若是,则提交从上一次轮询返回的偏移量; 

2)自动提交的问题: 可能造成数据重复消费(这需要消费者支持幂等性来解决);

若提交间隔时间为5s, 在最近一次提交后的3s发生了分区再均衡;如分区1的消费者从A换成了B;B 从最后一次A提交的偏移量开始读取消息。所以 消费者A和B会重复处理相同数据;因为消费者A处理了消息但没有提交偏移量,这就会造成A在最后3s处理的消息会被B重复处理;


【5.2】提交当前偏移量 (手动同步提交)

提交处理的这批消息的最后一个偏移量;

1)如何实现手动同步提交

  • 把 enable.auto.commit  设置为false,让消费者调用  commitSync() 手动提交;这个方法会提交由 poll() 方法返回的最新偏移量;提交成功马上返回,提交失败抛出异常;

2)如果发生分区再均衡,从最近一次提交到发生再均衡之间的所有消息都将被重复处理; 

3)代码例子

public class MyConsumerSyncCommit {
	public static void main(String[] args) {
		// 创建消费者配置信息
		Properties props = new Properties();
		// 属性配置
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");
		//   关闭自动提交
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
		// 设置消费消息的位置,消费最新消息
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
		// 设置分区策略 (默认值-RangeAssignor)
		props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
		props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

		// 创建消费者
		KafkaConsumer consumer = new KafkaConsumer<>(props);
		// 订阅主题
		consumer.subscribe(Arrays.asList("hello11"));

		// 循环拉取
		try {
			while(!Thread.interrupted()) {
				// 模拟延时
				try {
					System.out.println(DateUtils.getNowTimestamp() + " 消费者-SyncCommit-等待消费消息");
					TimeUnit.MILLISECONDS.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				// 消费消息
				ConsumerRecords consumerRds  = consumer.poll(100);
				for(ConsumerRecord rd : consumerRds) {
					System.out.println("消费者-SyncCommit-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());
				}
				// 手动同步提交
				consumer.commitSync();
			}
		} finally {
			// 记得关闭消费者
			consumer.close();
		}
	}
}

补充: 手动同步提交的问题, 在broker对提交请求做出回应之前, 应用程序会一直阻塞;这样会限制应用程序的吞吐量;  


【5.3】手动异步提交

提交处理的这批消息的最后一个偏移量;

1)为解决同步提交问题,引入了异步提交;

// 手动异步提交
consumer.commitAsync();

异步提交的问题: 在成功提交或碰到无法恢复的错误前,commitSync() 会一直重试,但 commitAsync() 不会,这就是问题所在;

2)异步提交的回调(若有异常,可以处理 )

  • 一般被用于记录提交错误或生成度量指标;
  • 不过如果要重试提交,一定要注意顺序;
// 循环拉取
try {
	while(!Thread.interrupted()) {
		// 模拟延时
		try {
			System.out.println(DateUtils.getNowTimestamp() + " 消费者-SyncCommit-等待消费消息");
			TimeUnit.MILLISECONDS.sleep(100);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		// 消费消息
		ConsumerRecords consumerRds  = consumer.poll(100);
		for(ConsumerRecord rd : consumerRds) {
			System.out.println("消费者-SyncCommit-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());
		}
		// 手动异步提交(带异步提交完成后的回调)
		consumer.commitAsync(new OffsetCommitCallback() {
			@Override
			public void onComplete(Map offsets, Exception exception) {
				if (exception !=null) {
					System.out.println("异步提交异常");
					// 把错误信息写入db 或日志
				}
			}
		});
	}
} finally {
	// 记得关闭消费者
	consumer.close();
}

但是异步回调也是有问题的

如消费者1发送一个请求1,提交偏移量2000,但因为网络问题,kafka服务器没有收到或超时;如消费者1发送一个请求2,提交偏移量3000,且成功写入kafka;

这时,如果请求1,再重试,则kafka偏移量由 3000 设置为2000,则 偏移量2000~3000的消息会被重复消费,这就是问题所在了;

而且如果 把2000,3000 替换为 1 或 100w的偏移量,那会影响整个系统性能,即便支持幂等;因为消费重复消息需要白费系统资源;

解决方法: 把异常信息,提交失败的偏移量存入数据库,然后重试;重试时一定要注意不要让偏移量后退;


【5.4】同步异步组合提交

1)轮询中使用异步提交,而轮询外,在关闭数据库连接前使用同步提交;

2)代码示例

// 循环拉取
try {
	while(!Thread.interrupted()) {
		// 模拟延时
		try {
			System.out.println(DateUtils.getNowTimestamp() + " 消费者-SyncCommit-等待消费消息");
			TimeUnit.MILLISECONDS.sleep(100);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		// 消费消息
		ConsumerRecords consumerRds  = consumer.poll(100);
		for(ConsumerRecord rd : consumerRds) {
			System.out.println("消费者-SyncCommit-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());
		}
		// 【异步提交】
		consumer.commitAsync(new OffsetCommitCallback() {
			@Override
			public void onComplete(Map offsets, Exception exception) {
				if (exception !=null) {
					System.out.println("异步提交异常");
					// 把错误信息写入db 或日志
				}
			}
		});
	}
} finally {
	try {
		// 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误 
		consumer.commitSync();
	} finally {
		// 记得关闭消费者
		consumer.close();
	}
}

2)小结:

  • 异步提交:commitAsync() 提交,速度更快,即使这次提交失败,下一次提交很可能会成功;
  • 同步提交:commitSync() 提交;会因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误;

【5.5】提交特定偏移量

1)场景

轮询获得了一批数据,偏移量从1k ~ 10w;我想每处理1k条消息,我就提交一次偏移量;

这是 commitSync 和 commitAsync() 办不到的;因为它们提交的最后一个偏移量,而不是中间某个消息的偏移量;

2)代码示例

// 消费消息【提交特定偏移量】
ConsumerRecords consumerRds  = consumer.poll(100);
int counter = 0;
for(ConsumerRecord rd : consumerRds) {
	System.out.println("消费者-SyncCommit-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());
	if (++counter % 100 == 0) {  // 每消费100条消息,就提交一次偏移量
		// 注意,这里提交的偏移量要加1
		curOffsets.put(new TopicPartition(rd.topic(), rd.partition()), new OffsetAndmetadata(rd.offset()+1, "no metadata"));
		// 异步或同步提交特定偏移量
		consumer.commitAsync(curOffsets, new OffsetCommitCallbackImpl());
	}
}
// 轮序完之后,若无法整除,还要提交剩余消息的偏移量
if (counter % 100 != 0) {
	consumer.commitAsync(new OffsetCommitCallbackImpl());
}

注意: 提交特定偏移量时,需要把最后一个消息的偏移量加1; 


【5.6】再均衡监听器

1)应用场景

在消费者退出或进行分区再均衡前,会做一些清理工作,如提交偏移量,或关闭数据库连接;这些工作可以通过监听器来实现;

2)具体实现和测试,参见   kafka再均衡监听器测试_PacosonSWJTU的博客-CSDN博客


【5.7】 从特定偏移量处开始处理记录

1)一些场景,希望从指定偏移量接收消息;

2)从分区起始位置,末尾位置读取消息的方法分别为:

  • seekToBeginning(Collection tp);
  • seekToEnd(Collection tp);

3)为了避免处理重复消息,有一种解决方案,采用原子 *** 作;

把消息内容和偏移量放在同一个原子 *** 作完成;消息和偏移量要么都提交成功,要么都失败;
如果消息存数据库,偏移量提交到kafka,就无法实现原子;
但把消息和偏移量都存入数据库,就可以实现原子 *** 作;
然后当消费者下一次启动时,就从数据库读取上一次保存偏移量;并从该偏移量表示的位置接收消息;

3.1)以上方案,需要依赖 ConsumerRebalanceListener(分区再均衡监听器) 和 seek(offset) 来实现;

  • 向seek() 方法传入偏移量, 下文的poll轮询 *** 作就可以从该偏移量接收消息;
  • 在 监听器的 onPartitionsRevoked() 方法中 把消息和偏移量保存到数据库,并提交事务;以便下一次消费前从数据库读取偏移量,传入seek方法;

补充, 分区再均衡监听器,参见 

kafka再均衡监听器测试_PacosonSWJTU的博客-CSDN博客


【5.8】消费者如何退出(关闭)

1)告诉消费者如何优雅退出 ?

另一个线程调用 consumer.wakeup() 方法;就可以让消费者退出poll方法,并抛出 WakeupException 异常;该方法是唯一一个可以从其他线程安全调用的方法;

  1. 并不需要处理 WakeupException 异常,因为它只是用于跳出循环的一种方式;
  2. 记得在退出线程前需要调用 consumer.close() 方法,关闭消费者,会提交任何还没有提交的东西,并向群组协调器发送消息告知自己要离开群组;
  3. 接下来会触发再均衡,而不需要等待会话超时; 

2)在消费者主线程添加 jvm关闭时的钩子

jvm中增加一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的所有 通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等 *** 作。

【5.8.1】带有jvm关闭钩子的消费者
public class MyConsumerShutdownHook {
    public static void main(String[] args) {
        // 创建消费者配置信息
        Properties props = new Properties();
        // 属性配置
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1");
        //   关闭自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // 设置消费消息的位置,消费最新消息
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 设置分区策略 (默认值-RangeAssignor)
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

        // 创建消费者
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        // 订阅主题, 【没有分区再均衡监听器】
        consumer.subscribe(Arrays.asList(MyProducer.TOPIC_NAME));

        // 添加消费者jvm关闭钩子,在消费者主程序退出前执行
        addShutdownHook(consumer);

        // 循环拉取
        try {
            while(!Thread.interrupted()) {
                System.out.println(DateUtils.getNowTimestamp() + " > 带有jvm关闭钩子的消费者等待消费消息");
                TimeUtils.sleep(1000);
                // 消费消息
                ConsumerRecords consumerRds  = consumer.poll(100);
                for(ConsumerRecord rd : consumerRds) {
                    System.out.println("消费者-ShutdownHook-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());
                }
                // 【异步提交】
                consumer.commitAsync(new OffsetCommitCallbackImpl());
                if (!consumerRds.isEmpty()) throw new RuntimeException("测试-抛出运行时异常");
            }
        } finally {
            try {
                // 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误
                consumer.commitSync();
            } finally {
                // 记得关闭消费者
                consumer.close();
                System.out.println("消费者关闭");
            }
        }
    }

    
    private static void addShutdownHook(Consumer consumer) {
        Thread mainThread = Thread.currentThread();
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                // 关闭钩子运行在独立线程,以便我们可以安全退出poll() 抛出WakeupException() 异常
                System.out.println("进入消费者jvm关闭钩子");
                consumer.wakeup();
                try {
                    // 当前线程一直挂起,直到主线程运行完成
                    mainThread.join();
                    System.out.println("消费者主程序执行完成,关闭钩子done");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    System.out.println("消费者jvm关闭钩子执行异常");
                }
            }
        });
    }
}

运行日志 :

........

消费者-ShutdownHook-分区【2】offset【1294】值=[585]  > ABCDE
消费者-ShutdownHook-分区【2】offset【1295】值=[588]  > ABCDE
消费者关闭
进入消费者jvm关闭钩子
消费者主程序执行完成,关闭钩子done
Exception in thread "main" java.lang.RuntimeException: 测试-抛出运行时异常
    at kafka.consumer.shutdownhook.MyConsumerShutdownHook.main(MyConsumerShutdownHook.java:57)

注意: 在消费者主线程退出之前,确保彻底关闭了消费者;


 

【5.9】独立消费者如何接收消息

1)独立消费者: 即一个群组只有一个消费者(接收所有分区消息); 这样就不存在分区再均衡问题;

2)对于独立消费者,不需要订阅主题,而是为自己分配分区;

  • 一个消费者可以订阅主题(并加入群组),或者为自己分配分区,但不能同时做这两件事情;

3)独立消费者为自己分配分区并接收消息;

public class MyIndependentConsumer {

    public static void main(String[] args) {
        // 创建消费者配置信息
        Properties props = new Properties();
        // 属性配置
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, MyProducer.TOPIC_NAME + "G1");
        //   关闭自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        // 设置消费消息的位置,消费最新消息
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 设置分区策略 (默认值-RangeAssignor)
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

        // 创建消费者
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        // 为自己分配分区,这里我们并没有订阅主题*
        assignPartition(consumer);

        // 循环拉取
        try {
            while(!Thread.interrupted()) {
                System.out.println(DateUtils.getNowTimestamp() + " > 独立消费者等待消费消息");
                TimeUtils.sleep(1000);
                // 消费消息
                ConsumerRecords consumerRds  = consumer.poll(100);
                for(ConsumerRecord rd : consumerRds) {
                    System.out.println("消费者-独立-分区【" + rd.partition() + "】offset【" + rd.offset() + "】" + "值=" + rd.value());
                }
                // 【异步提交】
                consumer.commitAsync(new OffsetCommitCallbackImpl());
            }
        } finally {
            try {
                // 【同步提交】 因为错误时,同步提交会一直重试,直到提交成功或发生无法恢复的错误
                consumer.commitSync();
            } finally {
                // 记得关闭消费者
                consumer.close();
                System.out.println("消费者关闭");
            }
        }
    }

     
    private static void assignPartition(Consumer consumer) {
        // 获取topic的分区信息列表
        List  partitionInfoList = consumer.partitionsFor(MyProducer.TOPIC_NAME);
        if (partitionInfoList == null || partitionInfoList.isEmpty()) {
            throw new RuntimeException("topic 分区查无记录");
        }
        // 创建主题分区并添加到列表(所有分区都添加到列表)
        List partitions = new ArrayList<>(partitionInfoList.size());
        partitionInfoList.forEach(x->{
            partitions.add(new TopicPartition(x.topic(), x.partition()));
        });
        // 给自己分配分区
        consumer.assign(partitions);
    }
}

为自己分配分区,而不是订阅主题核心代码;

    private static void assignPartition(Consumer consumer) {
        // 获取topic的分区信息列表
        List  partitionInfoList = consumer.partitionsFor(MyProducer.TOPIC_NAME);
        if (partitionInfoList == null || partitionInfoList.isEmpty()) {
            throw new RuntimeException("topic 分区查无记录");
        }
        // 创建主题分区并添加到列表(所有分区都添加到列表)
        List partitions = new ArrayList<>(partitionInfoList.size());
        partitionInfoList.forEach(x->{
            partitions.add(new TopicPartition(x.topic(), x.partition()));
        });
        // 给自己分配分区
        consumer.assign(partitions);
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678103.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存