kafka事务

kafka事务,第1张

kafka事务 文章

权威来源: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98±+Exactly+Once+Delivery+and+Transactional+Messaging

概述 生产者幂等

配置:enable.idempotence。
幂等机制:每个producer每次启动时,都会获得一个ProducerID和序列号。broker要求新收到的消息

  1. 序列号小于等于已确认的消息,这些消息被忽略
  2. 序列号恰好等于已确认的消息+1,这个消息会被记录为最新消息
  3. 序列号大于已确认的消息(异常状态),则让Producer抛出IllegalStateException,从而关闭Producer

注意:即使同一个producer重启,也会生成新的ProducerID;不能跨Session幂等
如果使用了transactional.id, 则PID也会被保存并返回

事务

配置: transactional.id
transactional.id是producer配置,用于coordinator(broker)识别生产者的标识。它并非随机生成,由用户指定,以便可以在实例崩溃重启后,对事务进行恢复或终止。

kafka可以保证同一个生产者写入不同Topic和不同分区的多个消息由一个事务完成,但是不能保证这些消息都被一个消费者同时消费或者不消费。原因是:

  1. log重写,丢失事务中的较老的消息
  2. 消费者可以定位到任何offset
  3. 消费者可能不会订阅所有相关的分区
核心概念
  1. 引入Transaction Coordinator, 每个生产者都被唯一分配给一个Transaction Coordinator进行管理
  2. 一个新的内部topic:Transaction Log
  3. Control Message,用于broker向consumer告知消费的哪些消息被自动提交了offset
  4. TransactionalId, 分配给生产者,允许不同的实例以相同的TransactionalId恢复
  5. Epoch,用于识别相同的TransactionalId中,谁是最新的
流程

对于生产者来说,启动时,需要分配一个Transaction Coordinator;在发送消息时,需要记录消息的PID和offset,并最终提交所有的<分区,offset>映射;
对于Transaction Coordinator,Abort和Commit都需要将对应的Control Message写入到各个分区中,以便消费者消费缓存的消息;如果是Commit,还需要将对应的Offset提交到Consumer Coordinator中;
对于消费者,当收到带有PID的消息时,如果isolation.level是read_commited,则缓存此消息,直到收到对应PID的commit或abort消息。

Transaction Coordinator的状态:

图中的PrepareCommit阶段,Coordinator会向所有的参与者发送COMMIT(或ABORT)消息,在收到ACK之后,直接在本地写入CompleteCommit消息。
注意:一旦进入PrepareCommit分支之后,无法再进行Abort。

整个大的流程可以理解类似于2PC的事务模型,第一阶段是Producer向各个分区Broker调用send发送消息,返回成功认为是ready,返回错误则是abort;第二阶段则是Producer向Coordinator发送COMMIT消息。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4698463.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存