权威来源: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98±+Exactly+Once+Delivery+and+Transactional+Messaging
概述 生产者幂等配置:enable.idempotence。
幂等机制:每个producer每次启动时,都会获得一个ProducerID和序列号。broker要求新收到的消息
- 序列号小于等于已确认的消息,这些消息被忽略
- 序列号恰好等于已确认的消息+1,这个消息会被记录为最新消息
- 序列号大于已确认的消息(异常状态),则让Producer抛出IllegalStateException,从而关闭Producer
事务注意:即使同一个producer重启,也会生成新的ProducerID;不能跨Session幂等
如果使用了transactional.id, 则PID也会被保存并返回
配置: transactional.id
transactional.id是producer配置,用于coordinator(broker)识别生产者的标识。它并非随机生成,由用户指定,以便可以在实例崩溃重启后,对事务进行恢复或终止。
kafka可以保证同一个生产者写入不同Topic和不同分区的多个消息由一个事务完成,但是不能保证这些消息都被一个消费者同时消费或者不消费。原因是:
- log重写,丢失事务中的较老的消息
- 消费者可以定位到任何offset
- 消费者可能不会订阅所有相关的分区
- 引入Transaction Coordinator, 每个生产者都被唯一分配给一个Transaction Coordinator进行管理
- 一个新的内部topic:Transaction Log
- Control Message,用于broker向consumer告知消费的哪些消息被自动提交了offset
- TransactionalId, 分配给生产者,允许不同的实例以相同的TransactionalId恢复
- Epoch,用于识别相同的TransactionalId中,谁是最新的
对于生产者来说,启动时,需要分配一个Transaction Coordinator;在发送消息时,需要记录消息的PID和offset,并最终提交所有的<分区,offset>映射;
对于Transaction Coordinator,Abort和Commit都需要将对应的Control Message写入到各个分区中,以便消费者消费缓存的消息;如果是Commit,还需要将对应的Offset提交到Consumer Coordinator中;
对于消费者,当收到带有PID的消息时,如果isolation.level是read_commited,则缓存此消息,直到收到对应PID的commit或abort消息。
Transaction Coordinator的状态:
图中的PrepareCommit阶段,Coordinator会向所有的参与者发送COMMIT(或ABORT)消息,在收到ACK之后,直接在本地写入CompleteCommit消息。
注意:一旦进入PrepareCommit分支之后,无法再进行Abort。
整个大的流程可以理解类似于2PC的事务模型,第一阶段是Producer向各个分区Broker调用send发送消息,返回成功认为是ready,返回错误则是abort;第二阶段则是Producer向Coordinator发送COMMIT消息。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)