- 消费者并发数量
- 消费者提交已消费消息offset
- 生产者批量发送消息
- ...
spring.kafka.listener.concurrency
@KafkaListener.concurrency
仅在多partition对应单个消费端时,用于多线程消费消息(concurrency <= partition数量),
当存在多个消费端时,优先考虑让新的消费端去消费(而不是多线程,即便设置concurrency > 1也仅有唯一消费线程生效),
所以如果消费端个数本身就大于等于分区数了,那么concurrency这个参数再设置大于1就是浪费(可保持默认1)
spring.kafka.consumer.enable-auto-commit
spring.kafka.conusmer.auto-commit-interval
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.ack-count
- spring.kafka.consumer.enbable-auto-commit
- true 自动提交已消费消息offset
- auto-commit-interval 设置自动提交间隔
- fasle 由程序控制已消费消息offset提交
- spring.kafka.listener.ack-mode 已消费offset提交模式
- true 自动提交已消费消息offset
ack-mode列表(详细说明参见:SpringKafka - Committing Offsets)
单记录当每一条记录被消费者监听器(ListenerConsumer)处理之后提交✔️
批量当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交✔️
超时当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
(通过spring.kafka.listener.ack-time设置触发时间)✔️
超过消费数量当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
(通过spring.kafka.listener.ack-count设置触发数量)✔️
超时或超数量TIME或COUNT 有一个条件满足时提交✔️
手动提交(ack)后同BATCH当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交❌
需要手动使用
Acknowledgment参数提交
手动立即提交手动调用Acknowledgment.acknowledge()后立即提交❌
需要手动使用
Acknowledgment参数提交
spring.kafka.producer.batch-size
spring.kafka.producer.properties.linger.ms
spring.kafka.producer.buffer-memory
- spring.kafka.producer.batch-size
批量发送消息大小(被发送到同一partition),默认16KB - spring.kafka.producer.properties.linger.ms
延时发送毫秒数(可结合batch.size以保证即使未到达batchSize也会在指定时间内发送消息)
注: 同时设置batch.size和 linger.ms就是哪个条件先满足就都会将消息发送出去 (即高吞吐量与延时的平衡) - spring.kafka.producer.buffer-memory
发送消息内存缓存区大小(缓存未发送的消息)
KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,
然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,
这样性能才可能高。
注:
(1)如果buffer.memory设置的太小,而消息写入过快导致buffer被写满,则会造成发消息线程阻塞(直到max.block.ms超时抛出异常)。
(2)如果要发送大文件的话,要同时提高buffer.memory 和batch.size的大小。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)