kafka终极版

kafka终极版,第1张

kafka终极版

kafkatool和启动都可以看生产和消费者的配置信息

同一个partition内的消息只能被同一个组中的一个consumer消费!!!! 当消费者数量多于partition的数量时,多余的消费者空闲 !!例如partition=4,则可在同一组中被最多4个consumer均衡消费。
一种是 RangeAssignor 分配策略(范围分区),另一种是 RoundRobinAssignor分配策略(轮询分区)。默认采用 Range 范围分区。 自定义分区策略
不同的消费组之间的消费者互不影响,但是同一个消费组消息只会被一个人消费!! 就算服务关闭,zookeeper也会记录这个消费组的offset
auto-offset-reset: earliest 没有偏移量才会有作用!!!
./zkCli.sh zookeeper客户端 ls get可以看数据 2.0的kafka offset维持在本身!!!

多个topics @KafkaListener(topics = {“test”,“test1”}, groupId = “test”)
concurrency的值为自己想要的消费者个数即可(注意,消费者数要小于等于你开的所有topic的分区数总和!!) 多条消费者线程去消费这些topic!!

没有topic时 发送数据kafka自动创建 但是分区只为1 replication-factor副本少于broker
kafka-topics --zookeeper slave1:2181 --create --topic test2 --partitions 2 --replication-factor 1

消费指定topic的指定partitions
@KafkaListener(topicPartitions = {@TopicPartition(topic = “test2”, partitions = {“1”})}, groupId = “test2”)

发送指定topic的指定partitions
kafkaTemplate.send(topic,1,“test”, JSONObject.toJSonString(obj));
在自定义分区策略前,我们还应该知道消息的路由机制:
若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
若发送消息时未指定patition,但指定了key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个Key的所有消息都进入到相同的分区;
patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;

listener:type : batch 这个会导致所有都是批量!!!!!
从版本1.1开始,@KafkaListener可以被配置为批量接收从Kafka话题队列中的Message。要配置监听器容器工厂以创建批处理侦听器,需要设置batchListener属性为true
批量消费
(List records, Acknowledgment ack) 会映射个List
(String records, Acknowledgment ack) 多条会组装成一个String,用,隔开!!!‘

我想又有单条又有批量 这样配置和spring里面的不冲突
@Bean
KafkaListenerContainerFactory batchFactory() {
ConcurrentKafkaListenerContainerFactory factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(true); // 开启批量监听
return factory;
}
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, “http://192.168.31.31:9092”);
return props;
}
@KafkaListener(topics = {“test2”}, groupId = “test2” ,containerFactory = “batchFactory”)

批量发送 本身没有,但在内存中已经做了批量化处理 buffer-memory
batch.size 通过这个参数来设置批量提交的数据大小,默认是16k,当积压的消息达到这个值的时候就会统一发送(发往同一分区的消息)
linger.ms 这个设置是为发送设置一定是延迟来收集更多的消息,默认大小是0ms!!(就是有消息就立即发送) buffer没满指定时间也会发送

max.poll.records = 500
max.poll.interval.ms = 300000 指定consumer两次poll的最大时间间隔(默认5分钟)
实际应用中,消费到的数据处理时长不宜超过max.poll.interval.ms,否则会触发rebalance!!!
#max-poll-interval-ms: 1000 不生效
properties:
max:
poll:
interval:
ms: 1000

Rebalance 的触发条件有3个。
组成员个数发生变化。例如有新的 consumer 实例加入该消费组或者离开组。
订阅的 Topic 个数发生变化。
订阅 Topic 的分区数发生变化。

enable-auto-commit: true 等业务处理完了或者发生异常都会提交!!!!
auto-commit-interval: 默认值是 5000,单位是毫秒。

@KafkaListener(topicPartitions = {@TopicPartition(topic = “test2”, partitions = {“1”})}, groupId = “test3”)
@KafkaListener(topicPartitions = {@TopicPartition(topic = “test2”, partitions = {“1”})}, groupId = “test3”)
两个消费者这样写 写法就有问题!!! 因为一个partitions不会分两个消费者

enable-auto-commit: false listener: ack-mode: manual_immediate #listner负责ack,每调用一次,就立即commit

org.springframework.kafka
spring-kafka

spring:
kafka:
bootstrap-servers: http://192.168.31.31:9092
producer:
# 发生错误后,消息重发的次数。
retries: 1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false

@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String topic, Object obj) {
//发送消息
ListenableFuture> future = kafkaTemplate.send(topic, JSONObject.toJSonString(obj));
future.addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理
System.out.println(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
}

	@Override
	public void onSuccess(SendResult stringObjectSendResult) {
		//成功的处理
		System.out.println(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
	}
});

}

@KafkaListener(topics = “OM_logs”, groupId = “test”)
public void topic_test(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
	Object msg = message.get();
	System.out.println("test topic:"+topic + " msg:" +msg);
	ack.acknowledge();
}

}

kafka 数据存放路径 /home/var/local/kafka/data

–查询kafka所有topic命令
kafka-topics --list --zookeeper nssa.node1:2181

–创建topic
kafka-topics --zookeeper slave1:2181 --create --topic test --partitions 1 --replication-factor 1

–查看topic消息
kafka-console-consumer --bootstrap-server slave1:9092 --topic test --from-beginning

–查看kafka所有消费者组信息
kafka-consumer-groups --bootstrap-server slave1:9092 --list

–kafka消费者组信息 可以看到消费了那血topics offset
kafka-consumer-groups --bootstrap-server slave1:9092 --group test --describe

–查看某个topic的数据量每个分区
kafka-run-class kafka.tools.GetOffsetShell --broker-list slave1:9092 --topic test

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5656510.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存