一探究竟,详解Kafka生产者和消费者的工作原理!

一探究竟,详解Kafka生产者和消费者的工作原理!,第1张

对于每个主题,Kafka群集都会维护一个分区日志,如下所示:

每个分区(Partition)都是有序的(所以每一个Partition内部都是有序的),不变的记录序列,这些记录连续地附加到结构化的提交日志中。分区中的每个记录均分配有一个称为偏移的顺序ID号,该ID 唯一地标识分区中的每个记录。

每个消费者保留的唯一元数据是该消费者在日志中的偏移量或位置。此偏移量由使用者控制:通常,使用者在读取记录时会线性地推进其偏移量,但实际上,由于位置是由使用者控制的,因此它可以按喜欢的任何顺序使用记录。例如,使用者可以重置到较旧的偏移量以重新处理过去的数据,或者跳到最近的记录并从“现在”开始使用。(类似于游标指针的方式顺序处理数据,并且该指标可以任意移动)

分区的设计结构

生产者分区策略是 决定生产者将消息发送到哪个分区的算法, 主要有以下几种:

kafka消息的有序性,是采用消息键保序策略来实现的。 一个topic,一个partition(分割),一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue。

kafka发送进行消息压缩有两个地方,分别是生产端压缩和Broker端压缩。

生产者端压缩 生产者压缩通常采用的GZIP算法这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。 配置参数:

Broker压缩 大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但以下情况会引发Broker压缩

消费者端解压 Kafka 会将启用了哪种压缩算法封装进消息集合中,在Consummer中进行解压 *** 作。

kafka提供以下特性来保证其消息的不丢失,从而保证消息的可靠性

生产者确认机制 当 Kafka 的若干个 Broker(根据配置策略,可以是一个,也可以是ALL) 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。

生产者失败回调机制 生产者不要使用 producersend(msg),而要使用 producersend(msg, callback)。记住,一定要使用带有回调通知的 send 方法。producersend(msg, callback) 采用异步的方式,当发生失败时会调用callback方法。

失败重试机制 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

消费者确认机制 确保消息消费完成再提交。Consumer 端有个参数 enableautocommit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

副本机制 设置 replicationfactor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 设置 mininsyncreplicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 确保 replicationfactor > mininsyncreplicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replicationfactor = mininsyncreplicas + 1。

限定Broker选取Leader机制 设置 uncleanleaderelectionenable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。

由于kafka生产者确认机制、失败重试机制的存在,kafka的消息不会丢失但是存在由于网络延迟等原因造成重复发送的可能性。 所以我们要考虑消息幂等性的设计。 kafka提供了幂等性Producer的方式来保证消息幂等性。使用 的方式开启幂等性。

幂等性 Producer 的作用范围:

Kafka事务 事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。 同样使用 的方式开启事务。

consumer group是kafka提供的可扩展且具有容错性的消费者机制。它是由一个或者多个消费者组成,它们共享同一个Group ID 组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。

consummer group有以下的特性:

消费者位置 消费者位置,即位移。 消费者在消费的过程中需要记录自己消费了多少数据。 位移提交有自动、手动两种方式进行位移提交。

Kafka通过一个内置Topic(__consumer_offsets)来管理消费者位移。

rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。

Kafka提供了一个角色:coordinator来执行对于consumer group的管理。 Group Coordinator是一个服务,每个Broker在启动的时候都会启动一个该服务。Group Coordinator的作用是用来存储Group的相关Meta信息,并将对应Partition的Offset信息记录到Kafka内置Topic(__consumer_offsets)中。

Rebalance 过程分为两步:Join 和 Sync。 Join 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

为了解决重试导致的消息重复、乱序问题,kafka引入了幂等消息。幂等消息保证producer在一次会话内写入一个partition内的消息具有幂等性,可以通过重试来确保消息发布的Exactly Once语义。

实现逻辑很简单:

producer每次启动后,首先向broker申请一个全局唯一的pid,用来标识本次会话。

message_v2 增加了sequence number字段,producer每发一批消息,seq就加1。

broker在内存维护(pid,seq)映射,收到消息后检查seq,如果,

producer在收到明确的的消息丢失ack,或者超时后未收到ack,要进行重试。

考虑在stream处理的场景中,需要多个消息的原子写入语义,要么全部写入成功,要么全部失败,这就是kafka事务消息要解决的问题。

事务消息是由producer、事务协调器、broker、组协调器、consumer共同参与实现的,

为producer指定固定的TransactionalId,可以穿越producer的多次会话(producer重启/断线重连)中,持续标识producer的身份。

使用epoch标识producer的每一次"重生",防止同一producer存在多个会话。

producer遵从幂等消息的行为,并在发送的recordbatch中增加事务id和epoch。

引入事务协调器,以两阶段提交的方式,实现消息的事务提交。

事务协调器使用一个特殊的topic:transaction,来做事务提交日志。

事务控制器通过RPC调研,协调 broker 和 consumer coordinator 实现事务的两阶段提交。

每一个broker都会启动一个事务协调器,使用hash(TransactionalId)确定producer对应的事务协调器,使得整个集群的负载均衡。

broker处理在事务协调器的commit/abort控制消息,把控制消息向正常消息一样写入topic(和正常消息交织在一起,用来确认事务提交的日志偏移),并向前推进消息提交偏移hw。

如果在事务过程中,提交了消费偏移,组协调器在offset log中写入事务消费偏移。当事务提交时,在offset log中写入事务offset确认消息。

consumer过滤未提交消息和事务控制消息,使这些消息对用户不可见。

有两种实现方式,

设置isolationlevel=read_uncommitted,此时topic的所有消息对consumer都可见。

consumer缓存这些消息,直到收到事务控制消息。若事务commit,则对外发布这些消息;若事务abort,则丢弃这些消息。

设置isolationlevel=read_committed,此时topic中未提交的消息对consumer不可见,只有在事务结束后,消息才对consumer可见。

broker给consumer的BatchRecord消息中,会包含以列表,指明哪些是"abort"事务,consumer丢弃abort事务的消息即可。

事务消息处理流程如图1所示,

图1 事务消息业务流程

流程说明:

事务协调器是分配pid和管理事务的核心,produer首先对任何一个broker发送FindCoordinatorRequest,发现自己的事务协调器。

紧接着,producer向事务协调器发送InitPidRequest,申请生成pid。

2a当指定了transactionalid时,事务协调器为producer分区pid,并更新epoch,把(tid,pid)的映射关系写入事务日志。 同时清理tid任何未完成的事务,丢弃未提交的消息。

启动事务是producer的本地 *** 作,促使producer更新内部状态,不会和事务协调器发生关系。

事务协调器自动启动事务,始终处在一个接一个的事务处理状态机中。

对于每一个要在事务中写消息的topic分区,producer应当在第一次发消息前,向事务处理器注册分区。

41a事务处理器把事务关联的分区写入事务日志。

在提交或终止事务时,事务协调器需要这些信息,控制事务涉及的所有分区leader完成事务提交或终止。

42a producer向分区leader写消息,消息中包含tid,pid,epoch和seq。

43 提交消费偏移 -- AddOffsetCommitsToTxnRequest

43a producer向事务协调器发送消费偏移,事务协调器在事务日志中记录偏移信息,并把组协调器返回给producer。

44a producer向组协调器发送TxnOffsetCommitRequest,组协调器把偏移信息写入偏移日志。但是,要一直等到事务提交后,这个偏移才生效,对外部可见。

收到提交或终止事务的请求时,事务处理器执行下面的 *** 作:

1 在事务日志中写入PREPARE_COMMIT或PREPARE_ABORT消息(51a)。

2 通过WriteTxnMarkerRequest向事务中的所有broker发事务控制消息(52)。

3 在事务之日中写入COMMITTED或ABORTED消息(53)。

这个消息由事务处理器发给事务中所涉及分区的leader。

当收到这个消息后,broker会在分区log中写入一个COMMIT或ABORT控制消息。同时,也会更新该分区的事务提交偏移hw。

如果事务中有提交消费偏移, broker也会把控制消息写入 __consumer-offsets log,并通知组协调器使事务中提交的消费偏移生效。

当所有的commit或abort消息写入数据日志,事务协调器在事务日志中写入事务日志,标志这事务结束。

至此,本事务的所有状态信息都可以被删除,可以开始一个新的事务。

在实现上,还有很多细节,比如,事务协调器会启动定时器,用来检测并终止开始后长时间不活动的事务,具体请参考下面列出的kafka社区技术文档。

总结:

我们要认识到,虽然kafka事务消息提供了多个消息原子写的保证,但它不保证原子读。

例如,

也就是说,虽然kafka log持久化了数据,也可以通过指定offset多次消费数据,但由于分区数据之间的无序性,导致每次处理输出的结果都是不同的。这使得kafka stream不能像hadoop批处理任务一样,可以随时重新执行,保证每次执行的结果相同。除非我们只从一个topic分区读数据。

[0] >

consumer 采用 pull(拉)模式从 broker 中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数
据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有
数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。

可以通过分区策略体现消息顺序性。分区策略有轮询策略、随机策略、按消息键保序策略。

处理顺序 :拦截器->序列化器->分区器
消息在通过 send() 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。

一般来说如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。开发者可以继承AbstractPartitionAssignor实现自定义消费策略,从而实现同一消费组内的任意消费者都可以消费订阅主题的所有分区。

当前消费者需要提交的消费位移是offset+1
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。

Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件。
Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。

日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。

日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。

在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用 kafka-topicssh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。

Kafka 中有多种延时 *** 作,比如延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。
延时 *** 作创建之后会被加入延时 *** 作管理器(DelayedOperationPurgatory)来做专门的处理。延时 *** 作有可能会超时,每个延时 *** 作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。

为了实现生产者的幂等性,Kafka 为此引入了 producer id(以下简称 PID)和序列号(sequence number)这两个概念。
Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在
初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。

Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子 *** 作来处理,同时成功或失败,即使该生产或消费会跨多个分区。

生产者必须提供唯一的transactionalId,启动后请求事务协调器获取一个PID,transactionalId与PID一一对应。

每次发送数据给<Topic, Partition>前,需要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该<Transaction, Topic, Partition>存于__transaction_state内,并将其状态置为BEGIN。

在处理完 AddOffsetsToTxnRequest 之后,生产者还会发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中

一旦上述数据写入 *** 作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。

在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。

Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所
有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的。

在kafka的 config 目录下找到 serverproperties 配置文件

把 listeners 和 advertisedlisteners 两处配置的注释去掉,可以根据需要配置连接的服务器 外网IP 和 端口号 ,我这里演示选择的是本地 localhost 和默认端口 9092

KafkaTemplate 这个类包装了个生产者 Producer ,来提供方便的发送数据到 kafka 的主题 topic 里面。
send() 方法的源码, KafkaTemplate 类中还重载了很多 send() 方法,有需要可以看看源码

通过 KafkaTemplate 模板类发送数据。
kafkaTemplatesend(String topic, K key, V data) ,第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过 @KafkaListener 注解配置用户监听 topics

bootstrap-servers :kafka服务器地址(可以多个)
consumergroup-id :指定一个默认的组名
不指定的话会报

1 earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费
2 latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据
3 none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常

这个属性也是必须配置的,不然也是会报错的

在使用Kafka发送接收消息时,生产者 producer 端需要序列化,消费者 consumer 端需要反序列化,由于网络传输过来的是 byte[] ,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输
consumerkey-deserializer 和 consumervalue-deserializer 是消费者 key/value 反序列化
producerkey-deserializer 和 producervalue-deserializer 是生产者 key/value 序列化

StringDeserializer 是内置的字符串反序列化方式

StringSerializer 是内置的字符串序列化方式

在 orgapachekafkacommonserialization 源码包中还提供了多种类型的序列化和反序列化方式
要自定义序列化方式,需要实现接口 Serializer
要自定义反序列化方式,需要实现接口 Deserializer

详细可以参考
>Kafka 是一个高度可扩展的分布式消息系统,在海量数据处理生态中占据着重要的地位。

数据处理的一个关键特性是数据的一致性。具体到 Kafka 的领域中,也就是生产者生产的数据和消费者消费的数据之间一对一的一致性。在各种类型的失败普遍存在的分布式系统环境下,保证业务层面一个整体的消息集合被原子的发布和恰好一次处理,是数据一致性在 Kafka 生态系统的实际要求。

本文介绍了 Kafka 生态中的事务机制的概念和流程。

Kafka 事务机制的概念

Kafka 从 011 版本开始支持了事务机制。Kafka 事务机制支持了跨分区的消息原子写功能。具体来说,Kafka 生产者在同一个事务内提交到多个分区的消息,要么同时成功,要么同时失败。这一保证在生产者运行时出现异常甚至宕机重启之后仍然成立。

此外,同一个事务内的消息将以生产者发送的顺序,唯一地提交到 Kafka 集群上。也就是说,事务机制从某种层面上保证了消息被恰好一次地提交到 Kafka 集群。众所周知,恰好一次送达在分布式系统中是不可能实现的。这个论断有一些微妙的名词重载问题,但大抵没错,所有声称能够做到恰好一次处理的系统都在某个地方依赖了幂等性。

Kafka 的事务机制被广泛用于现实世界中复杂业务需要保证一个业务领域中原子的概念被原子地提交的场景。

例如,一次下单流水包括订单生成消息和库存扣减消息,如果这两个消息在历史上由两个主题分管,那么它们在业务上的原子性就要求 Kafka 要利用事务机制原子地提交到 Kafka 集群上。

还有,对于复杂的流式处理系统,Kafka 生产者的上游可能是另一个流式处理系统,这个系统可能有着自己的一致性方案。为了跟上游系统的一致性方案协调,Kafka 就需要提供一个尽可能通用且易于组合的一致性机制,即灵活的事务机制,来帮助实现端到端的一致性。

Kafka 事务机制的流程

分布式系统的数据一致性是难的。要想理解一个系统提供何种程度的数据一致性保证,以及这样的保证对应用程序提出了什么样的要求,再及在哪些情况下一致性保证会出现什么方面的回退,细究其一致性机制的实现是必须的。

上面我们提到,事务机制的核心特征是能跨越多个分区原子地提交消息集合,甚至这些分区从属于不同的主题。同时,被提交的消息集合中的消息每条仅被提交一次,并保持它们在生产者应用中被生产的顺序写入到 Kafka 集群的消息日志中。此外,事务能够容忍生产者运行时出现异常甚至宕机重启。

实现事务机制最关键的概念就是事务的唯一标识符( TransactionalID ),Kafka 使用 TransactionalID 来关联进行中的事务。TransactionalID 由用户提供,这是因为 Kafka 作为系统本身无法独立的识别出宕机前后的两个不同的进程其实是要同一个逻辑上的事务。

对于同一个生产者应用前后进行的多个事务,TransactionalID 并不需要每次都生成一个新的。这是因为 Kafka 还实现了 ProducerID 以及 epoch 机制。这个机制在事务机制中的用途主要是用于标识不同的会话,同一个会话 ProducerID 的值相同,但有可能有多个任期。ProducerID 仅在会话切换时改变,而任期会在每次新的事物初始化时被更新。这样,同一个 TransactionalID 就能作为跨会话的多个独立事务的标识。

接下来,我们从一个事务的完整流程出发讨论客户端也就是生产者和消费者,以及服务端也就是 Kafka 集群在这个流程中扮演了什么角色,执行了什么动作。

初始化事务上下文

逻辑上说,事务总是从生产者提起的。生产者通过调用 initTransactions 方法初始化事务上下文。首要做的事情就是找到 Kafka 集群负责管理当前事务的事务协调者( TransactionCoordinator ),向其申请 ProducerID 资源。初始的 ProducerID 及 epoch 都是未初始化的状态。

生产者一侧的事务管理者( TransactionManager )收到相应的方法调用之后先后发送查找事务协调者的信息和初始化 ProducerID 的信息。事务相关的所有元数据信息都会由客户端即生产者一侧的事务管理者和服务端即 Kafka 集群的一个 Broker 上的事务协调者交互完成。

一开始,生产者并不知道哪个 Broker 上有自己 TransactionalID 关联的事务协调者。逻辑上,所有事务相关的需要持久化的数据最终都会写到一个特殊的主题 __transaction_state 上。这跟前面回答消费位点管理文章中的管理消费者消费位点的特殊主题 __consumer_offsets 构成了目前 Kafka 系统里唯二的特殊主题。

对于一个生产者或者说被 TransactionalID 唯一标识的事务来说,它的事务协调者就是该事务的元数据最终存储在 __transaction_state 主题上对应分区的分区首领。对于一个具体的事务来说,它的元数据将被其 TransactionalID 的哈希值的绝对值模分区数的分区所记录,这也是常见的确定分区的方案。

生产者将查找事务协调者的信息发送到集群的任意一个 Broker 上,由它计算出实际的事务协调者,获取对应的节点信息后返回给生产者。这样,生产者就找到了事务协调者。

随后,生产者会向事务协调者申请一个 ProducerID 资源,这个资源包括 ProducerID 和对应的 epoch 信息。事务协调者收到对应请求后,将会首先判断同一个 TransactionalID 下的事务的状态,以应对好跨会话的事务的管理。

第一步,事务协调者会获取 TransactionalID 对应的事务元数据信息。前面提到,这些元数据信息将被写在特殊主题 __transaction_state 上,这也是事务元数据信息对生产者和 Kafka 集群都容错的需要。

如果获取不到元数据信息,那么就初始化事务元数据信息,包括从获取一个新的 ProducerID 资源,并将它和 TransactionalID 以及分区编号和其他一些配置信息一起打包持久化。

其中,获取一个新的 ProducerID 资源需要 ProducerID 管理器从 ZooKeeper 上申请一个 ProducerID 的号段,在逐一的分配出去。申请号段的手段是修改 ZooKeeper 上 /latest_producer_id_block 节点的信息,流程是读节点上最后一个被申请的 ProducerID 的信息,加上要申请的号段的长度,再更新节点上最后一个被申请的 ProducerID 的信息。由于 ZooKeeper 对节点的更新有版本控制,因此并发的请求将会导致其中若干个请求目标版本失配,并提起重试。ProducerID 的长度是 Long 类型的长度,因此在实际使用过程中几乎不可能用完,Kafka 对号段资源耗尽的情况抛出致命错误并不尝试恢复。

如果获取到了相同 TransactionalID 先前的元数据信息,那么根据事务协调器事务先前的状态采取不同的行为。

如果此时状态转移正在进行,直接返回 CONCURRENT_TRANSACTIONS 异常。注意这里是事务协调器上正在发生并发的状态转移。通常来说,并发的状态转移应该依次执行,直接返回此异常可避免客户端即生产者请求超时,而是让生产者稍后自行重试。这也是一种乐观的加锁策略。
如果此时状态为 PrepareAbort 或 PrepareCommit 则返回 CONCURRENT_TRANSACTIONS 异常。同样的,此时状态即将转换为终结状态,无需强行终止先前的事务,否则将会产生无谓的浪费。
如果此时状态为 Dead 或 PrepareEpochFence 或当前 ProducerID 和 epoch 对不上,直接抛出不可重试的异常。这是由于要么是先前的 Producer 且已经被新的 Producer 替代,要么事务已经超时,无需再次尝试。
如果此时状态为 Ongoing 则事务协调者会将事务转移到 PrepareEpochFence 状态,然后再丢弃当前的事务,并返回 CONCURRENT_TRANSACTIONS 异常。
如果此时状态为 CompleteAbort 或 CompleteCommit 或 Empty 之一那么先将状态转移为 Empty 然后更新 epoch 值。
经过这么一连环的 *** 作,Kafka 就将事务执行的上下文初始化好了。

开始一个事务

初始化事务的流程实际上是生产者和对应的事务协调者就事务状态达成一致,进入到一个可以提起新的事务的状态。此时,生产者可以通过 beginTransaction 方法开始一个事务 *** 作。这个方法只会将本地事务状态转移到 IN_TRANSACTION 状态,在真正的提交事务中的消息之前,不会有跟 Kafka 集群的交互。

生产者将自己标记为开始事务之后,也就是本地事务状态转移到事务进行中的状态之后,就可以开始发送事务中的消息了。

发送事务中的消息

生产者在发送事务中的消息的时候,会将消息对应的分区添加到事务管理器中去,如果这个分区此前没被添加过,那么事务管理器会在下一次发送消息之前插入一条 AddPartitionsToTxnRequest 请求来告诉 Kafka 集群的事务协调者参与事务的分区的信息。事务协调者收到这条信息之后,将会更新事务的元数据,并将元数据持久化到 __transaction_state 中。

对于生产者发送的消息,仍然和一般的消息生产一样采用 ProduceRequest 请求。除了会在请求中带上相应的 TransactionalID 信息和属于事务中的消息的标识符,它跟生产者生产的普通信息别无二致。如果消费者没有配置读已提交的隔离级别,那么这些消息在被 Kafka 集群接受并持久化到主题分区中时,就已经对消费者可见而且可以被消费了。

事务中的消息的顺序性保证也是在发送事务的时候检查的。

生产者此时已经申请到了一个 ProducerID 资源,当它向一个分区发送消息时,内部会有一个消息管理器为每个不同的分区维护一个顺序编号( SequenceNumber )。相应地,Kafka 集群也会为每个 ProducerID 到每个分区的消息生产维护一个顺序编号。

ProducerRequest 请求中包含了顺序编号信息。如果 Kafka 集群看到请求的顺序编号跟自己的顺序编号是连续的,即比自己的顺序编号恰好大一,那么接受这条消息。否则,如果请求的顺序编号大一以上,则说明是一个乱序的消息,直接拒绝并抛出异常。如果请求的顺序编号相同或更小,则说明是一个重复发送的消息,直接忽略并告诉客户端是一个重复消息。

提交事务

在一个事务相关的所有消息都发送完毕之后,生产者就可以调用 commitTransaction 方法来提交整个事务了。对于事务中途发生异常的情形,也可以通过调用 abortTransaction 来丢弃整个事务。这两个 *** 作都是将事务状态转移到终结状态,彼此之间有许多相似点。

无论是提交还是丢弃,生产者都是给事务协调者发送 EndTxnRequest 请求,请求中包含一个字段来判断是提交还是丢弃。事务协调者在收到这个请求后,首先更新事务状态到 PrepareAbort 或 PrepareCommit 并更新状态到 __transaction_state 中。

如果在状态更新成功前事务协调者宕机,那么恢复过来的事务协调者将认为事务在 Ongoing 状态中,此时生产者由于收不到确认回复,会重试 EndTxnRequest 请求,并最终更新事务到 PrepareAbort 或 PrepareCommit 状态。

随后,根据是提交还是丢弃,分别向事务涉及到的所有分区的分区首领发送事务标志( TransactionMarker )。

事务标志是 Kafka 事务机制引入的不同于业务消息的事务控制消息。它的作用主要是标识事务已经完成,这个消息同业务消息一样能够被消费者所消费,并且它和事务中的业务消息能够通过 TransactionalID 关联起来,从而支持配置了读已提交特性的消费者忽略尚未提交的事务消息或被丢弃的事务消息。

如果在事务标志写到涉及到的所有分区的分区首领之前,事务协调者宕机或者分区首领宕机或网络分区,新起来的事务协调者或超时后重试的事务协调者会重新向分区首领写入事务标志。事务标志是幂等的,因此不会影响事务提交的结果。这里我们印证了之前所说的所有声称能够做到恰好一次处理的系统都在某个地方依赖了幂等性。

在当前事务涉及到的所有分区都已经把事务标志信息持久化到主题分区之后,事务协调者才会将这个事务的状态置为提交或丢弃,并持久化到事务日志文件中。在这之后,一个 Kafka 事务才算真正的完成了。事务协调者中缓存的关于当前事务的元数据就可以清理了。

如果在事务协调者回复生产者提交成功之前宕机,在恢复之后生产者再次提交事务时会直接返回事务提交成功。

总的来说,事务的状态以 __transaction_state 主题上持久化的元数据信息为准。

超时过期事务

分布式系统由于天然的网络阻塞或分区等失败原因, *** 作在成功和失败之外还有超时这第三种状态。现实中的分布式系统必须合理地处理超时的状态,否则永久阻塞或等待在任何实际的业务领域中都是不可接受的。

Kafka 事务机制本身可以配置事务超时,在事务管理者和事务协调者交互的各个过程中都会检验事务超时的配置,如果事务已经超时则抛出异常。

但是,在网络分区的情况下,可能 Kafka 集群根本就等不到生产者发送的消息。这个时候,Kafka 集群就需要相应的机制来主动过期。否则永不过期的中间状态事务在生产者宕机且不可恢复或不再恢复的情况下将逐步积累成存储垃圾。

Kafka 集群会周期性的轮询内存中的事务信息。如果发现进行中的事务最后的状态更新时间距今已经超过了配置的集群事务清理时间阈值,则采取丢弃该事务的 *** 作。同时,为了避免 *** 作过程中并发地收到原 Producer 发来事务更新请求,首先更新事务关联的 ProducerID 的 epoch 以将原 Producer 的 epoch 隔离掉。换个角度说,也就是以一个新的有效的身份执行丢弃事务 *** 作,以免分不清到底是谁在丢弃事务。

此外,轮询中还会检查 TransactionalID 最新的事务信息,如果一个 TransactionalID 最后一个事务距今已经已经超过了配置的集群 TransactionalID 清理时间阈值,则将该 TransactionalID 对应的元数据信息都进行清理。

上面的讨论中还有两个重要的主题被忽略了。一个是 Kafka 事务机制支持在同一个事务里进行消息生产和消息消费位点提交,另一个是配置了读已提交的消费者如何在事务未提交以及丢弃事务时正确的读取事务中消息。

前者不是特别复杂,只需要将消费位点提交视作一条事务中的消息,和消息生产以及控制消息同等待遇,在提交的时候也被事务标志所界定即可。

不展开聊是因为这个特性通常只在仅适用 Kafka 搭建流式处理流水线的场景下有用,尤其是 Kafka Streams 解决方案。

对于组合多个系统的流式处理流水线来说,消息从 Kafka 中消费得到是上游,生产到 Kafka 上是下游,中间是另一个例如 Flink 的流式计算系统。在这种场景下,消费位点的管理和事务地生产消息是两个可以分开考虑的事情,可以跟其他系统的一致性方案例如 Flink 的 Checkpoint 机制相结合,而不需要非得在同一个事务里既提交消费位点,又提交新的消息。

后者主要靠 Kafka 集群在管理消费位点拉取请求的时候,通过随事务机制的引入新添加的 LastStableOffset 概念来响应配置为读已提交的消费者的请求。在事务完成之前不会允许读已提交的消费者拉取事务中的消息。显然,这有可能导致消费者拉取新消息时长时间的阻塞。因此在实践中应当尽量避免长时间的事务。

对于丢弃事务的消息,Kafka 集群会维护一个丢弃事务的消息的元数据,从而支持消费者同时拉取消息和丢弃事务的消息的元数据,自行比对筛掉丢弃事务的消息。在正常的业务场景里,丢弃的事务不会太多,从而维护这样的一份元数据以及让消费者自行筛选会是一个能够接受的选择。


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/yw/13399963.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-07-29
下一篇 2023-07-29

发表评论

登录后才能评论

评论列表(0条)