Kafka 是一个高度可扩展的分布式消息系统,在海量数据处理生态中占据着重要的地位。
数据处理的一个关键特性是数据的一致性。具体到 Kafka 的领域中,也就是生产者生产的数据和消费者消费的数据之间一对一的一致性。在各种类型的失败普遍存在的分布式系统环境下,保证业务层面一个整体的消息集合被原子的发布和恰好一次处理,是数据一致性在 Kafka 生态系统的实际要求。
本文介绍了 Kafka 生态中的事务机制的概念和流程。
Kafka 事务机制的概念
Kafka 从 011 版本开始支持了事务机制。Kafka 事务机制支持了跨分区的消息原子写功能。具体来说,Kafka 生产者在同一个事务内提交到多个分区的消息,要么同时成功,要么同时失败。这一保证在生产者运行时出现异常甚至宕机重启之后仍然成立。
此外,同一个事务内的消息将以生产者发送的顺序,唯一地提交到 Kafka 集群上。也就是说,事务机制从某种层面上保证了消息被恰好一次地提交到 Kafka 集群。众所周知,恰好一次送达在分布式系统中是不可能实现的。这个论断有一些微妙的名词重载问题,但大抵没错,所有声称能够做到恰好一次处理的系统都在某个地方依赖了幂等性。
Kafka 的事务机制被广泛用于现实世界中复杂业务需要保证一个业务领域中原子的概念被原子地提交的场景。
例如,一次下单流水包括订单生成消息和库存扣减消息,如果这两个消息在历史上由两个主题分管,那么它们在业务上的原子性就要求 Kafka 要利用事务机制原子地提交到 Kafka 集群上。
还有,对于复杂的流式处理系统,Kafka 生产者的上游可能是另一个流式处理系统,这个系统可能有着自己的一致性方案。为了跟上游系统的一致性方案协调,Kafka 就需要提供一个尽可能通用且易于组合的一致性机制,即灵活的事务机制,来帮助实现端到端的一致性。
Kafka 事务机制的流程
分布式系统的数据一致性是难的。要想理解一个系统提供何种程度的数据一致性保证,以及这样的保证对应用程序提出了什么样的要求,再及在哪些情况下一致性保证会出现什么方面的回退,细究其一致性机制的实现是必须的。
上面我们提到,事务机制的核心特征是能跨越多个分区原子地提交消息集合,甚至这些分区从属于不同的主题。同时,被提交的消息集合中的消息每条仅被提交一次,并保持它们在生产者应用中被生产的顺序写入到 Kafka 集群的消息日志中。此外,事务能够容忍生产者运行时出现异常甚至宕机重启。
实现事务机制最关键的概念就是事务的唯一标识符( TransactionalID ),Kafka 使用 TransactionalID 来关联进行中的事务。TransactionalID 由用户提供,这是因为 Kafka 作为系统本身无法独立的识别出宕机前后的两个不同的进程其实是要同一个逻辑上的事务。
对于同一个生产者应用前后进行的多个事务,TransactionalID 并不需要每次都生成一个新的。这是因为 Kafka 还实现了 ProducerID 以及 epoch 机制。这个机制在事务机制中的用途主要是用于标识不同的会话,同一个会话 ProducerID 的值相同,但有可能有多个任期。ProducerID 仅在会话切换时改变,而任期会在每次新的事物初始化时被更新。这样,同一个 TransactionalID 就能作为跨会话的多个独立事务的标识。
接下来,我们从一个事务的完整流程出发讨论客户端也就是生产者和消费者,以及服务端也就是 Kafka 集群在这个流程中扮演了什么角色,执行了什么动作。
初始化事务上下文
逻辑上说,事务总是从生产者提起的。生产者通过调用 initTransactions 方法初始化事务上下文。首要做的事情就是找到 Kafka 集群负责管理当前事务的事务协调者( TransactionCoordinator ),向其申请 ProducerID 资源。初始的 ProducerID 及 epoch 都是未初始化的状态。
生产者一侧的事务管理者( TransactionManager )收到相应的方法调用之后先后发送查找事务协调者的信息和初始化 ProducerID 的信息。事务相关的所有元数据信息都会由客户端即生产者一侧的事务管理者和服务端即 Kafka 集群的一个 Broker 上的事务协调者交互完成。
一开始,生产者并不知道哪个 Broker 上有自己 TransactionalID 关联的事务协调者。逻辑上,所有事务相关的需要持久化的数据最终都会写到一个特殊的主题 __transaction_state 上。这跟前面回答消费位点管理文章中的管理消费者消费位点的特殊主题 __consumer_offsets 构成了目前 Kafka 系统里唯二的特殊主题。
对于一个生产者或者说被 TransactionalID 唯一标识的事务来说,它的事务协调者就是该事务的元数据最终存储在 __transaction_state 主题上对应分区的分区首领。对于一个具体的事务来说,它的元数据将被其 TransactionalID 的哈希值的绝对值模分区数的分区所记录,这也是常见的确定分区的方案。
生产者将查找事务协调者的信息发送到集群的任意一个 Broker 上,由它计算出实际的事务协调者,获取对应的节点信息后返回给生产者。这样,生产者就找到了事务协调者。
随后,生产者会向事务协调者申请一个 ProducerID 资源,这个资源包括 ProducerID 和对应的 epoch 信息。事务协调者收到对应请求后,将会首先判断同一个 TransactionalID 下的事务的状态,以应对好跨会话的事务的管理。
第一步,事务协调者会获取 TransactionalID 对应的事务元数据信息。前面提到,这些元数据信息将被写在特殊主题 __transaction_state 上,这也是事务元数据信息对生产者和 Kafka 集群都容错的需要。
如果获取不到元数据信息,那么就初始化事务元数据信息,包括从获取一个新的 ProducerID 资源,并将它和 TransactionalID 以及分区编号和其他一些配置信息一起打包持久化。
其中,获取一个新的 ProducerID 资源需要 ProducerID 管理器从 ZooKeeper 上申请一个 ProducerID 的号段,在逐一的分配出去。申请号段的手段是修改 ZooKeeper 上 /latest_producer_id_block 节点的信息,流程是读节点上最后一个被申请的 ProducerID 的信息,加上要申请的号段的长度,再更新节点上最后一个被申请的 ProducerID 的信息。由于 ZooKeeper 对节点的更新有版本控制,因此并发的请求将会导致其中若干个请求目标版本失配,并提起重试。ProducerID 的长度是 Long 类型的长度,因此在实际使用过程中几乎不可能用完,Kafka 对号段资源耗尽的情况抛出致命错误并不尝试恢复。
如果获取到了相同 TransactionalID 先前的元数据信息,那么根据事务协调器事务先前的状态采取不同的行为。
如果此时状态转移正在进行,直接返回 CONCURRENT_TRANSACTIONS 异常。注意这里是事务协调器上正在发生并发的状态转移。通常来说,并发的状态转移应该依次执行,直接返回此异常可避免客户端即生产者请求超时,而是让生产者稍后自行重试。这也是一种乐观的加锁策略。
如果此时状态为 PrepareAbort 或 PrepareCommit 则返回 CONCURRENT_TRANSACTIONS 异常。同样的,此时状态即将转换为终结状态,无需强行终止先前的事务,否则将会产生无谓的浪费。
如果此时状态为 Dead 或 PrepareEpochFence 或当前 ProducerID 和 epoch 对不上,直接抛出不可重试的异常。这是由于要么是先前的 Producer 且已经被新的 Producer 替代,要么事务已经超时,无需再次尝试。
如果此时状态为 Ongoing 则事务协调者会将事务转移到 PrepareEpochFence 状态,然后再丢弃当前的事务,并返回 CONCURRENT_TRANSACTIONS 异常。
如果此时状态为 CompleteAbort 或 CompleteCommit 或 Empty 之一那么先将状态转移为 Empty 然后更新 epoch 值。
经过这么一连环的 *** 作,Kafka 就将事务执行的上下文初始化好了。
开始一个事务
初始化事务的流程实际上是生产者和对应的事务协调者就事务状态达成一致,进入到一个可以提起新的事务的状态。此时,生产者可以通过 beginTransaction 方法开始一个事务 *** 作。这个方法只会将本地事务状态转移到 IN_TRANSACTION 状态,在真正的提交事务中的消息之前,不会有跟 Kafka 集群的交互。
生产者将自己标记为开始事务之后,也就是本地事务状态转移到事务进行中的状态之后,就可以开始发送事务中的消息了。
发送事务中的消息
生产者在发送事务中的消息的时候,会将消息对应的分区添加到事务管理器中去,如果这个分区此前没被添加过,那么事务管理器会在下一次发送消息之前插入一条 AddPartitionsToTxnRequest 请求来告诉 Kafka 集群的事务协调者参与事务的分区的信息。事务协调者收到这条信息之后,将会更新事务的元数据,并将元数据持久化到 __transaction_state 中。
对于生产者发送的消息,仍然和一般的消息生产一样采用 ProduceRequest 请求。除了会在请求中带上相应的 TransactionalID 信息和属于事务中的消息的标识符,它跟生产者生产的普通信息别无二致。如果消费者没有配置读已提交的隔离级别,那么这些消息在被 Kafka 集群接受并持久化到主题分区中时,就已经对消费者可见而且可以被消费了。
事务中的消息的顺序性保证也是在发送事务的时候检查的。
生产者此时已经申请到了一个 ProducerID 资源,当它向一个分区发送消息时,内部会有一个消息管理器为每个不同的分区维护一个顺序编号( SequenceNumber )。相应地,Kafka 集群也会为每个 ProducerID 到每个分区的消息生产维护一个顺序编号。
ProducerRequest 请求中包含了顺序编号信息。如果 Kafka 集群看到请求的顺序编号跟自己的顺序编号是连续的,即比自己的顺序编号恰好大一,那么接受这条消息。否则,如果请求的顺序编号大一以上,则说明是一个乱序的消息,直接拒绝并抛出异常。如果请求的顺序编号相同或更小,则说明是一个重复发送的消息,直接忽略并告诉客户端是一个重复消息。
提交事务
在一个事务相关的所有消息都发送完毕之后,生产者就可以调用 commitTransaction 方法来提交整个事务了。对于事务中途发生异常的情形,也可以通过调用 abortTransaction 来丢弃整个事务。这两个 *** 作都是将事务状态转移到终结状态,彼此之间有许多相似点。
无论是提交还是丢弃,生产者都是给事务协调者发送 EndTxnRequest 请求,请求中包含一个字段来判断是提交还是丢弃。事务协调者在收到这个请求后,首先更新事务状态到 PrepareAbort 或 PrepareCommit 并更新状态到 __transaction_state 中。
如果在状态更新成功前事务协调者宕机,那么恢复过来的事务协调者将认为事务在 Ongoing 状态中,此时生产者由于收不到确认回复,会重试 EndTxnRequest 请求,并最终更新事务到 PrepareAbort 或 PrepareCommit 状态。
随后,根据是提交还是丢弃,分别向事务涉及到的所有分区的分区首领发送事务标志( TransactionMarker )。
事务标志是 Kafka 事务机制引入的不同于业务消息的事务控制消息。它的作用主要是标识事务已经完成,这个消息同业务消息一样能够被消费者所消费,并且它和事务中的业务消息能够通过 TransactionalID 关联起来,从而支持配置了读已提交特性的消费者忽略尚未提交的事务消息或被丢弃的事务消息。
如果在事务标志写到涉及到的所有分区的分区首领之前,事务协调者宕机或者分区首领宕机或网络分区,新起来的事务协调者或超时后重试的事务协调者会重新向分区首领写入事务标志。事务标志是幂等的,因此不会影响事务提交的结果。这里我们印证了之前所说的所有声称能够做到恰好一次处理的系统都在某个地方依赖了幂等性。
在当前事务涉及到的所有分区都已经把事务标志信息持久化到主题分区之后,事务协调者才会将这个事务的状态置为提交或丢弃,并持久化到事务日志文件中。在这之后,一个 Kafka 事务才算真正的完成了。事务协调者中缓存的关于当前事务的元数据就可以清理了。
如果在事务协调者回复生产者提交成功之前宕机,在恢复之后生产者再次提交事务时会直接返回事务提交成功。
总的来说,事务的状态以 __transaction_state 主题上持久化的元数据信息为准。
超时过期事务
分布式系统由于天然的网络阻塞或分区等失败原因, *** 作在成功和失败之外还有超时这第三种状态。现实中的分布式系统必须合理地处理超时的状态,否则永久阻塞或等待在任何实际的业务领域中都是不可接受的。
Kafka 事务机制本身可以配置事务超时,在事务管理者和事务协调者交互的各个过程中都会检验事务超时的配置,如果事务已经超时则抛出异常。
但是,在网络分区的情况下,可能 Kafka 集群根本就等不到生产者发送的消息。这个时候,Kafka 集群就需要相应的机制来主动过期。否则永不过期的中间状态事务在生产者宕机且不可恢复或不再恢复的情况下将逐步积累成存储垃圾。
Kafka 集群会周期性的轮询内存中的事务信息。如果发现进行中的事务最后的状态更新时间距今已经超过了配置的集群事务清理时间阈值,则采取丢弃该事务的 *** 作。同时,为了避免 *** 作过程中并发地收到原 Producer 发来事务更新请求,首先更新事务关联的 ProducerID 的 epoch 以将原 Producer 的 epoch 隔离掉。换个角度说,也就是以一个新的有效的身份执行丢弃事务 *** 作,以免分不清到底是谁在丢弃事务。
此外,轮询中还会检查 TransactionalID 最新的事务信息,如果一个 TransactionalID 最后一个事务距今已经已经超过了配置的集群 TransactionalID 清理时间阈值,则将该 TransactionalID 对应的元数据信息都进行清理。
上面的讨论中还有两个重要的主题被忽略了。一个是 Kafka 事务机制支持在同一个事务里进行消息生产和消息消费位点提交,另一个是配置了读已提交的消费者如何在事务未提交以及丢弃事务时正确的读取事务中消息。
前者不是特别复杂,只需要将消费位点提交视作一条事务中的消息,和消息生产以及控制消息同等待遇,在提交的时候也被事务标志所界定即可。
不展开聊是因为这个特性通常只在仅适用 Kafka 搭建流式处理流水线的场景下有用,尤其是 Kafka Streams 解决方案。
对于组合多个系统的流式处理流水线来说,消息从 Kafka 中消费得到是上游,生产到 Kafka 上是下游,中间是另一个例如 Flink 的流式计算系统。在这种场景下,消费位点的管理和事务地生产消息是两个可以分开考虑的事情,可以跟其他系统的一致性方案例如 Flink 的 Checkpoint 机制相结合,而不需要非得在同一个事务里既提交消费位点,又提交新的消息。
后者主要靠 Kafka 集群在管理消费位点拉取请求的时候,通过随事务机制的引入新添加的 LastStableOffset 概念来响应配置为读已提交的消费者的请求。在事务完成之前不会允许读已提交的消费者拉取事务中的消息。显然,这有可能导致消费者拉取新消息时长时间的阻塞。因此在实践中应当尽量避免长时间的事务。
对于丢弃事务的消息,Kafka 集群会维护一个丢弃事务的消息的元数据,从而支持消费者同时拉取消息和丢弃事务的消息的元数据,自行比对筛掉丢弃事务的消息。在正常的业务场景里,丢弃的事务不会太多,从而维护这样的一份元数据以及让消费者自行筛选会是一个能够接受的选择。
一个事务里面有没有当前读,快照读与当前读的区别:快照读:读取的是记录数据的可见版本(可能是过期的数据),不用加锁;当前读:读取的是记录数据的最新版本,并且当前读返回的记录都会加上锁,保证其他事务不会再并发的修改这条记录。
1,定位REPLICAT进程当前事务所在列队文件及RBA:
GGSCI (sv890n01) 46> info rads_1
REPLICAT RADS_1 Last Started 2012-12-08 16:29 Status ABENDED
Checkpoint Lag 02:23:36 (updated 00:01:38 ago)
Log Read Checkpoint File /dirdat/pa000002
2012-12-08 14:05:51970013 RBA 77698825
2,由于在列队中相邻两个事务的 RBA 并不是简单的 +1 关系,所以下一步需要定位到下个事务的RBA,在这里需要借助 logdump 工具。
打开列队文件:
Logdump 10 >open /dirdat/pa000002
Current LogTrail is /u01/app/oracle/oradata/ogg/dirdat/pa000002
定位到指定的RBA位置:
Logdump 11 >pos 77698825
Reading forward from RBA 77698825
查看当前事务详细信息,在这里可以看到DDL/DML *** 作类型,RBA,对象名等一些相关信息:
Logdump 12 >n
2012/12/08 14:05:52970013 DDLOP Len 1221 RBA 77698825
Name:
After Image: Partition 0 G s
2c43 353d 2735 3730 3727 2c2c 4237 3d27 3537 3037 | ,C5='5707',,B7='5707
272c 2c42 323d 2735 3631 3330 3027 2c2c 4233 3d27 | ',,B2='561300',,B3='
4f47 4727 2c2c 4234 3d27 4442 4d53 5f54 4142 434f | OGG',,B4='DBMS_TABCO
4d50 5f54 454d 505f 554e 434d 5027 2c2c 4331 323d | MP_TEMP_UNCMP',,C12=
2727 2c2c 4331 333d 2727 2c2c 4235 3d27 5441 424c | '',,C13='',,B5='TABL
4527 2c2c 4236 3d27 4452 4f50 272c 2c42 383d 274f | E',,B6='DROP',,B8='O
4747 2e47 4753 5f44 444c 5f48 4953 5427 2c2c 4239 | GGGGS_DDL_HIST',,B9
查看下一个事务信息:
Logdump 13 >n
2012/12/08 14:05:52970013 Insert Len 370 RBA 77700210
Name: DATAGATETB_DOWNLOAD_LOG
After Image: Partition 4 G s
0000 0009 0000 0005 7274 7231 3100 0100 0900 0000 | rtr11
0531 3036 3034 0002 001d 0000 0019 5943 3159 3037 | 10604YC1Y07
315f 3132 3939 3537 3833 3534 3932 342e 786d 6c00 | 1_1299578354924xml
0300 0300 0030 0004 00bf 0000 00bb 6874 7470 3a2f | 0>
1,分布式事务产生的背景;
分情况而定。
1,在单体项目中,多个不同的业务逻辑都是在同一个数据源中心实现事务管理,是不存在分布式事务的问题。因为在同一个数据源的情况下都是采用事务管理器,相当于每个事务管理器对应一个数据源。
2,在单体项目中,有多个不同的数据源,每个数据源中都有自己独立的事务管理器,互不影响,那么这时候也会存在多数据源事务管理:解决方案jta+Atomikos
3,在分布式/微服务架构中。每个服务都有自己的本地的事务。每个服务本地事务互不影响,那么这时候也会存在分布式事务的问题。
分布式事务产生的背景:订单服务调用派单服务接口成功之后,可能会引发错误。
2pc3pc思想,实际上都是解决我们在分布式系统中,每个节点保证数据一致性问题。
事务的定义。
对我们业务逻辑可以实现提交或者是回滚,保证数据一致性的情况。所以,要么提交,要么回滚。
原子性a 要么提交 要么回滚。
一致性 c
持久性d 事务一旦提交或者回滚后,不会再对该结果有任何的影响。
Base 与 cap理论。
1,cap定律
这个定理的内容是指的是在一个分布式系统中,Consistency(一致性),Availability(可用性),Partition tolerance(分区容错性),二者不可兼容。
1,一致性(C)
在分布式系统中的所有数据备份,是在同一时刻是否同样的值,(等同于所有节点访问同一份最新的数据副本)
2,可用性 A
在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求(对数据更新具有高可用性)
3,分区容错性(p) 形成脑裂问题:
以实际效果而言,分区相当于对通信的时限要求,系统如果不能再时限内达成数据一致性,就意味着发生了分区的情况,必须就当前 *** 作在C和A之间做出选择。
4,总结下
以上可以知道分区容错性(P)主要代表网络波动产生的错误,这是不可以避免的,且这三个模式不可以兼得,所以目前就2种模式: cp和Ap模式。
其中cp表示遵循一致性的原则,但不能保证高可用性,其中zookeeper作为注册中心就是采用cp模式,因为zookeeper有过半节点不可以的话整个zookeeper将不可用。
AP表示遵循于可用性原则,例如Eureka作为注册中心用的是AP模式,因为其为去中心化,采用你中有我我中有你的相互注册方式,只要集群中有一个节点可以使用,整个eureka服务就是可用的,但可能会出现短暂的数据不一致问题。
Ap保证可用性:但是不能保证每个副本数据数据一致性,
cp保证数据一致性;如果有过半的zk节点宕机的情况下,不能保证可用性,但是必须保证每个副本节点之间数据一致性,比如zk。
Base理论:
Base是 Basically Available(基本可用),Softstate(软状态)和Eventually consistent(最终一致性)三个短语的缩写。Base理论是对CAP定理逐步演化而来的,base理论核心思想是:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式达到最终一致性。
1基本可用性;
基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性,注意:这绝不等于系统不可用。
比如: 响应时间的损失,正常情况下,在一个电子商务网站上进行购物的时候,消费者几乎能顺利完成每一笔订单,但是在一些介入大促销购物高峰的时候,由于消费者的购物行为激增,为了保护购物系统的稳定性,部分消费者可能被引导在一个降级页面。
2,软状态。
软状态指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,既允许系统在不同节点的数据副本之间进行数据同步的过程存在延时。
3,最终一致性
最终一致性强调的是所有的数据副本,在经过一段时间的同步之后,最终都能够达到一个一致的状态,因此,最终一致性的本质需要系统保证数据能够达成一致,而不需要时时保证系统数据的强一致性。
2pc 与3pC
通过2pc和3pc 思想可以实现保证每个节点的数据一致性问题。
目前主流分布式解决框架
1,单体项目多数据源,可以jta+Atomilos;
2,基于Rabbitmq的形式解决 最终一致性思想;
3,基于Rocketmq解决分布式事务 ,采用事务消息。
4,lcn采用lcn模式,假关闭连接
5,Alibaba的seata 背景强大,已经成为了主流。
以上适合于微服务架构中,不适合于和外部接口保证分布式事务问题。
6,跨语言的方式实现解决分布式事务问题。类似于支付宝回调方式。
2阶段提交协议基本概念。
2阶段提交协议基本概念:
俩阶段提交协议可以理解为2pc,也就是分为参与者和协调者,协调者会通过2次阶段实现数据最终一致性的
2pc和3pc 的区别就是解决参与者超时问题和多加了一层询问。保证了数据传输的可靠性。
简单的回顾下lcn解决分布式事务。
>
以上就是关于Kafka 是如何实现事务的全部的内容,包括:Kafka 是如何实现事务的、一个事务里面有没有当前读、GoldenGate进程失败后怎么跳过当前事务等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)