private Duration autoCommitInterval; private String autoOffsetReset; private ListoffsetbootstrapServers; private String clientId; private Boolean enableAutoCommit; private Duration fetchMaxWait; private DataSize fetchMinSize; private String groupId; private Duration heartbeatInterval; private KafkaProperties.IsolationLevel isolationLevel; private Class> keyDeserializer; private Class> valueDeserializer; private Integer maxPollRecords; private final Map properties;
offset是Kafka消费者在接收到消息后返回的偏移量,标识partition中已经消费的数据位置。
offset的提交方式有两种,自动提交和ack手工确认模式。
spring.kafka.consumer.enable-auto-commit=true
当生产者发送数据到达partition中无需消费者进行状态确认,即自动生成偏移量。
优点在于提升Kafka性能,加大吞吐能力,但是不能很好的对消费位置进行把握。
spring.kafka.consumer.enable-auto-commit=false spring.kafka.listener.ack-mode=manual
需要在消费端进行手动提交offset,有同步和异步两种方式
org.apache.kafka.clients.consumer.Consumer consumer.commitAsync(); consumer.commitSync();重置offset
在手工确认模式下进一步理解autoOffsetReset。
重置offset是指在partition中没有初始offset或offset超出范围时的 *** 作,有三种模式:
1 latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从新产生的消息开始消费。
场景:当生产者生产到100个消息时,消费者开始工作,则此时前100个消息将无法收到,及从第101个开始接受消息。
2 earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
场景:当生产者生产到100个消息时,消费者开始工作,则此时前100个消息正常收到,及从第1个开始接受消息。
3 none 当该topic下所有分区都未提交的offset时,抛出异常
spring.kafka.listener.type=batch spring.kafka.consumer.max-poll-records=1000
默认为single,批量消费设置相对应的poll数量能提升消费能力
批量消费和手工确认代码示例
private void batchConsumer(List消费分组和Kafka数据存储> records, Consumer consumer) throws IOException { //手动提交 consumer.commitAsync(); try { for (ConsumerRecord record: records) { ………… } } finally { …… } }
rabbitmq常用于服务之间消息通知,Kafka则在于数据分流传输和存储。topic 是逻辑上的存在,而 partition 是实际物理概念,partition的数据存储在log结尾的文件中(误导和log日志文件混淆),produce不断生产的数据就存在于此文件中,而且不断地在结尾追加。同时可以设置kafka数据存储策略
log.retention.hours=168 log.retention.bytes==1073741824 log.segment.bytes=1073741824
log.retention.hours为存储的过期时间,默认168小时,log.retention.bytes为达到多少数据量则过期,默认1G。可以根据自己的业务和硬盘大小来调试两种参数选择合适策略。
物理上的partition则是分成多个segment,在此可设置每个segment大小,默认1G
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
rabbitmq当消费完则从队列中删除数据.,而kafka中的分区可以被不同的组进行消费,即数据持久后的多次消费。且组与组之间的消费情况互不影响,例如前面的重置offset策略。
Kafka除正常消费外还可以指定时间段进行二次消费,还可以通过不同的groupID。
在消息中间件中传输数据和对象都要有序列化 *** 作
# 序列化 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 反序列化 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerProducer配置 kafkaproperties
private String acks; private DataSize batchSize; private List消息可靠投递ack机制bootstrapServers; private DataSize bufferMemory; private String clientId; private String compressionType; private Class> keySerializer = StringSerializer.class; private Class> valueSerializer = StringSerializer.class; private Integer retries; private String transactionIdPrefix;
为了保证Kafka的消息发送可靠,引入ack机制,即每次producer发送到的消息都应收到partition的ack回复。
spring.kafka.producer.acks=0/1/all
设置为0则自动ack,保证性能到不安全。
设置为1时只需收到分区的leader回复即可。
设置为all时要等到集群副本全部同步完成才返回ack进行下一轮生产。
Kafka在生产时并不是每条数据发送请求,而是积攒到一定数量统一发送,在此可以设置大小,并且在指定之间还没有达到数据量时也会发送。此设置也可以对吞吐量进行控制,
# 批量大小 spring.kafka.producer.batch-size=16384 # 提交延时 spring.kafka.producer.properties.linger.ms=0
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)