consumer 采用 pull(拉)模式从 broker 中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数
据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有
数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。
分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。
可以通过分区策略体现消息顺序性。分区策略有轮询策略、随机策略、按消息键保序策略。
处理顺序 :拦截器->序列化器->分区器
消息在通过 send() 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。
一般来说如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。开发者可以继承AbstractPartitionAssignor实现自定义消费策略,从而实现同一消费组内的任意消费者都可以消费订阅主题的所有分区。
当前消费者需要提交的消费位移是offset+1
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。
Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件。
Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。
每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。
日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。
日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。
在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用 kafka-topicssh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka 中有多种延时 *** 作,比如延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。
延时 *** 作创建之后会被加入延时 *** 作管理器(DelayedOperationPurgatory)来做专门的处理。延时 *** 作有可能会超时,每个延时 *** 作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。
为了实现生产者的幂等性,Kafka 为此引入了 producer id(以下简称 PID)和序列号(sequence number)这两个概念。
Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在
初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子 *** 作来处理,同时成功或失败,即使该生产或消费会跨多个分区。
生产者必须提供唯一的transactionalId,启动后请求事务协调器获取一个PID,transactionalId与PID一一对应。
每次发送数据给<Topic, Partition>前,需要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该<Transaction, Topic, Partition>存于__transaction_state内,并将其状态置为BEGIN。
在处理完 AddOffsetsToTxnRequest 之后,生产者还会发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中
一旦上述数据写入 *** 作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。
在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。
Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所
有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的。
Kafka到底是个啥?用来干嘛的?
官方定义如下:
翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠!
实时数据处理 ,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。
这些中间件,最大的特点主要有两个:
在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。
但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。
随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。
采用这种模型,本质就是将要推送的数据,不在存放在当前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。
消息中间件 :主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。
应用程序 :只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可!
引入消息中间件之后,整个服务开发会变得更加简单,各负其责。
Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。
LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统, Kafka 由此诞生 。
在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。
先来看一张图, 下面这张图就是 kafka 生产与消费的核心架构模型 !
如果你看不懂这些概念没关系,我会带着大家一起梳理一遍!
简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:
与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个分区 Partition 的概念。
这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分配到 5 个分区中,其中每个分区存储 200 条数据。
这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快!
这是 kafka 与其他的消息系统最大的不同!
和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower ,即使主分区挂了,也不会影响服务的正常运行。
那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则:
与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。
这里我们需要重点了解一个名词: 消费组 !
考虑到多个消费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题!
但是不同的组,可以消费同一个分区的数据!
你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。
但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。
如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。
因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致!
光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。
kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。
zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk
下载zookeeper,并解压文件包
创建数据、日志目录
配置zookeeper
重新配置 dataDir 和 dataLogDir 的存储路径
最后,启动 Zookeeper 服务
到官网 >Kafka 是一个高度可扩展的分布式消息系统,在海量数据处理生态中占据着重要的地位。
数据处理的一个关键特性是数据的一致性。具体到 Kafka 的领域中,也就是生产者生产的数据和消费者消费的数据之间一对一的一致性。在各种类型的失败普遍存在的分布式系统环境下,保证业务层面一个整体的消息集合被原子的发布和恰好一次处理,是数据一致性在 Kafka 生态系统的实际要求。
本文介绍了 Kafka 生态中的事务机制的概念和流程。
Kafka 事务机制的概念
Kafka 从 011 版本开始支持了事务机制。Kafka 事务机制支持了跨分区的消息原子写功能。具体来说,Kafka 生产者在同一个事务内提交到多个分区的消息,要么同时成功,要么同时失败。这一保证在生产者运行时出现异常甚至宕机重启之后仍然成立。
此外,同一个事务内的消息将以生产者发送的顺序,唯一地提交到 Kafka 集群上。也就是说,事务机制从某种层面上保证了消息被恰好一次地提交到 Kafka 集群。众所周知,恰好一次送达在分布式系统中是不可能实现的。这个论断有一些微妙的名词重载问题,但大抵没错,所有声称能够做到恰好一次处理的系统都在某个地方依赖了幂等性。
Kafka 的事务机制被广泛用于现实世界中复杂业务需要保证一个业务领域中原子的概念被原子地提交的场景。
例如,一次下单流水包括订单生成消息和库存扣减消息,如果这两个消息在历史上由两个主题分管,那么它们在业务上的原子性就要求 Kafka 要利用事务机制原子地提交到 Kafka 集群上。
还有,对于复杂的流式处理系统,Kafka 生产者的上游可能是另一个流式处理系统,这个系统可能有着自己的一致性方案。为了跟上游系统的一致性方案协调,Kafka 就需要提供一个尽可能通用且易于组合的一致性机制,即灵活的事务机制,来帮助实现端到端的一致性。
Kafka 事务机制的流程
分布式系统的数据一致性是难的。要想理解一个系统提供何种程度的数据一致性保证,以及这样的保证对应用程序提出了什么样的要求,再及在哪些情况下一致性保证会出现什么方面的回退,细究其一致性机制的实现是必须的。
上面我们提到,事务机制的核心特征是能跨越多个分区原子地提交消息集合,甚至这些分区从属于不同的主题。同时,被提交的消息集合中的消息每条仅被提交一次,并保持它们在生产者应用中被生产的顺序写入到 Kafka 集群的消息日志中。此外,事务能够容忍生产者运行时出现异常甚至宕机重启。
实现事务机制最关键的概念就是事务的唯一标识符( TransactionalID ),Kafka 使用 TransactionalID 来关联进行中的事务。TransactionalID 由用户提供,这是因为 Kafka 作为系统本身无法独立的识别出宕机前后的两个不同的进程其实是要同一个逻辑上的事务。
对于同一个生产者应用前后进行的多个事务,TransactionalID 并不需要每次都生成一个新的。这是因为 Kafka 还实现了 ProducerID 以及 epoch 机制。这个机制在事务机制中的用途主要是用于标识不同的会话,同一个会话 ProducerID 的值相同,但有可能有多个任期。ProducerID 仅在会话切换时改变,而任期会在每次新的事物初始化时被更新。这样,同一个 TransactionalID 就能作为跨会话的多个独立事务的标识。
接下来,我们从一个事务的完整流程出发讨论客户端也就是生产者和消费者,以及服务端也就是 Kafka 集群在这个流程中扮演了什么角色,执行了什么动作。
初始化事务上下文
逻辑上说,事务总是从生产者提起的。生产者通过调用 initTransactions 方法初始化事务上下文。首要做的事情就是找到 Kafka 集群负责管理当前事务的事务协调者( TransactionCoordinator ),向其申请 ProducerID 资源。初始的 ProducerID 及 epoch 都是未初始化的状态。
生产者一侧的事务管理者( TransactionManager )收到相应的方法调用之后先后发送查找事务协调者的信息和初始化 ProducerID 的信息。事务相关的所有元数据信息都会由客户端即生产者一侧的事务管理者和服务端即 Kafka 集群的一个 Broker 上的事务协调者交互完成。
一开始,生产者并不知道哪个 Broker 上有自己 TransactionalID 关联的事务协调者。逻辑上,所有事务相关的需要持久化的数据最终都会写到一个特殊的主题 __transaction_state 上。这跟前面回答消费位点管理文章中的管理消费者消费位点的特殊主题 __consumer_offsets 构成了目前 Kafka 系统里唯二的特殊主题。
对于一个生产者或者说被 TransactionalID 唯一标识的事务来说,它的事务协调者就是该事务的元数据最终存储在 __transaction_state 主题上对应分区的分区首领。对于一个具体的事务来说,它的元数据将被其 TransactionalID 的哈希值的绝对值模分区数的分区所记录,这也是常见的确定分区的方案。
生产者将查找事务协调者的信息发送到集群的任意一个 Broker 上,由它计算出实际的事务协调者,获取对应的节点信息后返回给生产者。这样,生产者就找到了事务协调者。
随后,生产者会向事务协调者申请一个 ProducerID 资源,这个资源包括 ProducerID 和对应的 epoch 信息。事务协调者收到对应请求后,将会首先判断同一个 TransactionalID 下的事务的状态,以应对好跨会话的事务的管理。
第一步,事务协调者会获取 TransactionalID 对应的事务元数据信息。前面提到,这些元数据信息将被写在特殊主题 __transaction_state 上,这也是事务元数据信息对生产者和 Kafka 集群都容错的需要。
如果获取不到元数据信息,那么就初始化事务元数据信息,包括从获取一个新的 ProducerID 资源,并将它和 TransactionalID 以及分区编号和其他一些配置信息一起打包持久化。
其中,获取一个新的 ProducerID 资源需要 ProducerID 管理器从 ZooKeeper 上申请一个 ProducerID 的号段,在逐一的分配出去。申请号段的手段是修改 ZooKeeper 上 /latest_producer_id_block 节点的信息,流程是读节点上最后一个被申请的 ProducerID 的信息,加上要申请的号段的长度,再更新节点上最后一个被申请的 ProducerID 的信息。由于 ZooKeeper 对节点的更新有版本控制,因此并发的请求将会导致其中若干个请求目标版本失配,并提起重试。ProducerID 的长度是 Long 类型的长度,因此在实际使用过程中几乎不可能用完,Kafka 对号段资源耗尽的情况抛出致命错误并不尝试恢复。
如果获取到了相同 TransactionalID 先前的元数据信息,那么根据事务协调器事务先前的状态采取不同的行为。
如果此时状态转移正在进行,直接返回 CONCURRENT_TRANSACTIONS 异常。注意这里是事务协调器上正在发生并发的状态转移。通常来说,并发的状态转移应该依次执行,直接返回此异常可避免客户端即生产者请求超时,而是让生产者稍后自行重试。这也是一种乐观的加锁策略。
如果此时状态为 PrepareAbort 或 PrepareCommit 则返回 CONCURRENT_TRANSACTIONS 异常。同样的,此时状态即将转换为终结状态,无需强行终止先前的事务,否则将会产生无谓的浪费。
如果此时状态为 Dead 或 PrepareEpochFence 或当前 ProducerID 和 epoch 对不上,直接抛出不可重试的异常。这是由于要么是先前的 Producer 且已经被新的 Producer 替代,要么事务已经超时,无需再次尝试。
如果此时状态为 Ongoing 则事务协调者会将事务转移到 PrepareEpochFence 状态,然后再丢弃当前的事务,并返回 CONCURRENT_TRANSACTIONS 异常。
如果此时状态为 CompleteAbort 或 CompleteCommit 或 Empty 之一那么先将状态转移为 Empty 然后更新 epoch 值。
经过这么一连环的 *** 作,Kafka 就将事务执行的上下文初始化好了。
开始一个事务
初始化事务的流程实际上是生产者和对应的事务协调者就事务状态达成一致,进入到一个可以提起新的事务的状态。此时,生产者可以通过 beginTransaction 方法开始一个事务 *** 作。这个方法只会将本地事务状态转移到 IN_TRANSACTION 状态,在真正的提交事务中的消息之前,不会有跟 Kafka 集群的交互。
生产者将自己标记为开始事务之后,也就是本地事务状态转移到事务进行中的状态之后,就可以开始发送事务中的消息了。
发送事务中的消息
生产者在发送事务中的消息的时候,会将消息对应的分区添加到事务管理器中去,如果这个分区此前没被添加过,那么事务管理器会在下一次发送消息之前插入一条 AddPartitionsToTxnRequest 请求来告诉 Kafka 集群的事务协调者参与事务的分区的信息。事务协调者收到这条信息之后,将会更新事务的元数据,并将元数据持久化到 __transaction_state 中。
对于生产者发送的消息,仍然和一般的消息生产一样采用 ProduceRequest 请求。除了会在请求中带上相应的 TransactionalID 信息和属于事务中的消息的标识符,它跟生产者生产的普通信息别无二致。如果消费者没有配置读已提交的隔离级别,那么这些消息在被 Kafka 集群接受并持久化到主题分区中时,就已经对消费者可见而且可以被消费了。
事务中的消息的顺序性保证也是在发送事务的时候检查的。
生产者此时已经申请到了一个 ProducerID 资源,当它向一个分区发送消息时,内部会有一个消息管理器为每个不同的分区维护一个顺序编号( SequenceNumber )。相应地,Kafka 集群也会为每个 ProducerID 到每个分区的消息生产维护一个顺序编号。
ProducerRequest 请求中包含了顺序编号信息。如果 Kafka 集群看到请求的顺序编号跟自己的顺序编号是连续的,即比自己的顺序编号恰好大一,那么接受这条消息。否则,如果请求的顺序编号大一以上,则说明是一个乱序的消息,直接拒绝并抛出异常。如果请求的顺序编号相同或更小,则说明是一个重复发送的消息,直接忽略并告诉客户端是一个重复消息。
提交事务
在一个事务相关的所有消息都发送完毕之后,生产者就可以调用 commitTransaction 方法来提交整个事务了。对于事务中途发生异常的情形,也可以通过调用 abortTransaction 来丢弃整个事务。这两个 *** 作都是将事务状态转移到终结状态,彼此之间有许多相似点。
无论是提交还是丢弃,生产者都是给事务协调者发送 EndTxnRequest 请求,请求中包含一个字段来判断是提交还是丢弃。事务协调者在收到这个请求后,首先更新事务状态到 PrepareAbort 或 PrepareCommit 并更新状态到 __transaction_state 中。
如果在状态更新成功前事务协调者宕机,那么恢复过来的事务协调者将认为事务在 Ongoing 状态中,此时生产者由于收不到确认回复,会重试 EndTxnRequest 请求,并最终更新事务到 PrepareAbort 或 PrepareCommit 状态。
随后,根据是提交还是丢弃,分别向事务涉及到的所有分区的分区首领发送事务标志( TransactionMarker )。
事务标志是 Kafka 事务机制引入的不同于业务消息的事务控制消息。它的作用主要是标识事务已经完成,这个消息同业务消息一样能够被消费者所消费,并且它和事务中的业务消息能够通过 TransactionalID 关联起来,从而支持配置了读已提交特性的消费者忽略尚未提交的事务消息或被丢弃的事务消息。
如果在事务标志写到涉及到的所有分区的分区首领之前,事务协调者宕机或者分区首领宕机或网络分区,新起来的事务协调者或超时后重试的事务协调者会重新向分区首领写入事务标志。事务标志是幂等的,因此不会影响事务提交的结果。这里我们印证了之前所说的所有声称能够做到恰好一次处理的系统都在某个地方依赖了幂等性。
在当前事务涉及到的所有分区都已经把事务标志信息持久化到主题分区之后,事务协调者才会将这个事务的状态置为提交或丢弃,并持久化到事务日志文件中。在这之后,一个 Kafka 事务才算真正的完成了。事务协调者中缓存的关于当前事务的元数据就可以清理了。
如果在事务协调者回复生产者提交成功之前宕机,在恢复之后生产者再次提交事务时会直接返回事务提交成功。
总的来说,事务的状态以 __transaction_state 主题上持久化的元数据信息为准。
超时过期事务
分布式系统由于天然的网络阻塞或分区等失败原因, *** 作在成功和失败之外还有超时这第三种状态。现实中的分布式系统必须合理地处理超时的状态,否则永久阻塞或等待在任何实际的业务领域中都是不可接受的。
Kafka 事务机制本身可以配置事务超时,在事务管理者和事务协调者交互的各个过程中都会检验事务超时的配置,如果事务已经超时则抛出异常。
但是,在网络分区的情况下,可能 Kafka 集群根本就等不到生产者发送的消息。这个时候,Kafka 集群就需要相应的机制来主动过期。否则永不过期的中间状态事务在生产者宕机且不可恢复或不再恢复的情况下将逐步积累成存储垃圾。
Kafka 集群会周期性的轮询内存中的事务信息。如果发现进行中的事务最后的状态更新时间距今已经超过了配置的集群事务清理时间阈值,则采取丢弃该事务的 *** 作。同时,为了避免 *** 作过程中并发地收到原 Producer 发来事务更新请求,首先更新事务关联的 ProducerID 的 epoch 以将原 Producer 的 epoch 隔离掉。换个角度说,也就是以一个新的有效的身份执行丢弃事务 *** 作,以免分不清到底是谁在丢弃事务。
此外,轮询中还会检查 TransactionalID 最新的事务信息,如果一个 TransactionalID 最后一个事务距今已经已经超过了配置的集群 TransactionalID 清理时间阈值,则将该 TransactionalID 对应的元数据信息都进行清理。
上面的讨论中还有两个重要的主题被忽略了。一个是 Kafka 事务机制支持在同一个事务里进行消息生产和消息消费位点提交,另一个是配置了读已提交的消费者如何在事务未提交以及丢弃事务时正确的读取事务中消息。
前者不是特别复杂,只需要将消费位点提交视作一条事务中的消息,和消息生产以及控制消息同等待遇,在提交的时候也被事务标志所界定即可。
不展开聊是因为这个特性通常只在仅适用 Kafka 搭建流式处理流水线的场景下有用,尤其是 Kafka Streams 解决方案。
对于组合多个系统的流式处理流水线来说,消息从 Kafka 中消费得到是上游,生产到 Kafka 上是下游,中间是另一个例如 Flink 的流式计算系统。在这种场景下,消费位点的管理和事务地生产消息是两个可以分开考虑的事情,可以跟其他系统的一致性方案例如 Flink 的 Checkpoint 机制相结合,而不需要非得在同一个事务里既提交消费位点,又提交新的消息。
后者主要靠 Kafka 集群在管理消费位点拉取请求的时候,通过随事务机制的引入新添加的 LastStableOffset 概念来响应配置为读已提交的消费者的请求。在事务完成之前不会允许读已提交的消费者拉取事务中的消息。显然,这有可能导致消费者拉取新消息时长时间的阻塞。因此在实践中应当尽量避免长时间的事务。
对于丢弃事务的消息,Kafka 集群会维护一个丢弃事务的消息的元数据,从而支持消费者同时拉取消息和丢弃事务的消息的元数据,自行比对筛掉丢弃事务的消息。在正常的业务场景里,丢弃的事务不会太多,从而维护这样的一份元数据以及让消费者自行筛选会是一个能够接受的选择。
所谓的副本机制(Replication),也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝。副本机制有什么好处呢?
这些优点都是在分布式系统教科书中最常被提及的,但是有些遗憾的是,对于 Apache Kafka 而言,目前只能享受到副本机制带来的第 1 个好处,也就是提供数据冗余实现高可用性和高持久性。我会在这一讲后面的内容中,详细解释 Kafka 没能提供第 2 点和第 3 点好处的原因。
不过即便如此,副本机制依然是 Kafka 设计架构的核心所在,它也是 Kafka 确保系统高可用和消息高持久性的重要基石。
在讨论具体的副本机制之前,我们先花一点时间明确一下副本的含义。
我们之前谈到过,Kafka 是有主题概念的,而每个主题又进一步划分成若干个分区。副本的概念实际上是在分区层级下定义的,每个分区配置有若干个副本。
所谓副本(Replica),本质就是一个只能追加写消息的提交日志 。根据 Kafka 副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。
在实际生产环境中,每台 Broker 都可能保存有各个主题下不同分区的不同副本,因此,单个 Broker 上存有成百上千个副本的现象是非常正常的。
接下来我们来看一张图,它展示的是一个有 3 台 Broker 的 Kafka 集群上的副本分布情况。从这张图中,我们可以看到,主题 1 分区 0 的 3 个副本分散在 3 台 Broker 上,其他主题分区的副本也都散落在不同的 Broker 上,从而实现数据冗余。
既然分区下能够配置多个副本,而且这些副本的内容还要一致,那么很自然的一个问题就是:我们该如何确保副本中所有的数据都是一致的呢?特别是对 Kafka 而言,当生产者发送消息到某个主题后,消息是如何同步到对应的所有副本中的呢?针对这个问题,最常见的解决方案就是采用 基于领导者(Leader-based)的副本机制 。Apache Kafka 就是这样的设计。
基于领导者的副本机制的工作原理如下图所示,我来简单解释一下这张图里面的内容。
第一,在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
第二,Kafka 的副本机制比其他分布式系统要更严格一些。在 Kafka 中,追随者副本是不对外提供服务的。这就是说,任何一个追随者副本都不能响应消费者和生产者的读写请求。所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本 异步拉取 消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
第三,当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于 ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
你一定要特别注意上面的第二点,即 追随者副本是不对外提供服务的 。还记得刚刚我们谈到副本机制的好处时,说过 Kafka 没能提供读 *** 作横向扩展以及改善局部性吗?具体的原因就在于此。
对于客户端用户而言,Kafka 的追随者副本没有任何作用,它既不能像 MySQL 那样帮助领导者副本“抗读”,也不能实现将某些副本放到离客户端近的地方来改善数据局部性。
既然如此,Kafka 为什么要这样设计呢?其实这种副本机制有两个方面的好处。
1 方便实现“Read-your-writes” 。
所谓 Read-your-writes,顾名思义就是,当你使用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息。
举个例子,比如你平时发微博时,你发完一条微博,肯定是希望能立即看到的,这就是典型的 Read-your-writes 场景。如果允许追随者副本对外提供服务,由于副本同步是异步的,因此有可能出现追随者副本还没有从领导者副本那里拉取到最新的消息,从而使得客户端看不到最新写入的消息。
2 方便实现单调读(Monotonic Reads) 。
什么是单调读呢?就是对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。
如果允许追随者副本提供读服务,那么假设当前有 2 个追随者副本 F1 和 F2,它们异步地拉取领导者副本数据。倘若 F1 拉取了 Leader 的最新消息而 F2 还未及时拉取,那么,此时如果有一个消费者先从 F1 读取消息之后又从 F2 拉取消息,它可能会看到这样的现象:第一次消费时看到的最新消息在第二次消费时不见了,这就不是单调读一致性。但是,如果所有的读请求都是由 Leader 来处理,那么 Kafka 就很容易实现单调读一致性。
我们刚刚反复说过,追随者副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。既然是异步的,就存在着不可能与 Leader 实时同步的风险。在探讨如何正确应对这种风险之前,我们必须要精确地知道同步的含义是什么。或者说,Kafka 要明确地告诉我们,追随者副本到底在什么条件下才算与 Leader 同步。
基于这个想法,Kafka 引入了 In-sync Replicas,也就是所谓的 ISR 副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。那么,到底什么副本能够进入到 ISR 中呢?
我们首先要明确的是,Leader 副本天然就在 ISR 中。也就是说, ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本 。
另外,能够进入到 ISR 的追随者副本要满足一定的条件。至于是什么条件,我先卖个关子,我们先来一起看看下面这张图。
图中有 3 个副本:1 个领导者副本和 2 个追随者副本。Leader 副本当前写入了 10 条消息,Follower1 副本同步了其中的 6 条消息,而 Follower2 副本只同步了其中的 3 条消息。现在,请你思考一下,对于这 2 个追随者副本,你觉得哪个追随者副本与 Leader 不同步?
答案是,要根据具体情况来定。换成英文,就是那句著名的“It depends”。看上去好像 Follower2 的消息数比 Leader 少了很多,它是最有可能与 Leader 不同步的。的确是这样的,但仅仅是可能。
事实上,这张图中的 2 个 Follower 副本都有可能与 Leader 不同步,但也都有可能与 Leader 同步。也就是说,Kafka 判断 Follower 是否与 Leader 同步的标准,不是看相差的消息数,而是另有“玄机”。
这个标准就是 Broker 端参数 replicalagtimemaxms 参数值 。这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
我们在前面说过,Follower 副本唯一的工作就是不断地从 Leader 副本拉取消息,然后写入到自己的提交日志中。如果这个同步过程的速度持续慢于 Leader 副本的消息写入速度,那么在 replicalagtimemaxms 时间后,此 Follower 副本就会被认为是与 Leader 副本不同步的,因此不能再放入 ISR 中。此时,Kafka 会自动收缩 ISR 集合,将该副本“踢出”ISR。
值得注意的是,倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回 ISR 的。这也表明,ISR 是一个动态调整的集合,而非静态不变的。
既然 ISR 是可以动态调整的,那么自然就可以出现这样的情形:ISR 为空。因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。可是 ISR 是空,此时该怎么选举新 Leader 呢?
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本 。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。 Broker 端参数 uncleanleaderelectionenable 控制是否允许 Unclean 领导者选举 。
开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
如果你听说过 CAP 理论的话,你一定知道,一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。
你可以根据你的实际业务场景决定是否开启 Unclean 领导者选举。不过,我强烈建议你 不要 开启它,毕竟我们还可以通过其他的方式来提升高可用性。如果为了这点儿高可用性的改善,牺牲了数据一致性,那就非常不值当了。
kafka是一个高性能、低延迟的分布式发布-订阅消息系统,适用于在线、离线消息消费,为了防止数据丢失,kafka将消息持久化到磁盘上并在集群内复制在深入了解kafka之前,先介绍kafka体系架构中的一些组件,包括Topic、Producer、Consumer、Consumer Group、Broker、Partition、Leader、Follower。
Topic
消息被发送到kafak中都有一个类别,这个类别叫做Topic,kafka中的消息都是通过主题进行组织的,一个Topic可以有1个或多个Partition。
Producer
生产者,即是将消息发送到kafka的过程,发送消息是需要指定Topic,并且可以指定Partition。Broker接收到消息后,将消息存放在partition中。
Consumer
消费者,从broker topic中读取消息,可以订阅一个或多个topic。
Consumer Group
消费者组由一个或多个消费者组成,消费者组中的消费者共同消费一个主题的分区,主题中每个分区只能被同一个消费者组中的一个消费者消费。
Broker
kafka集群包括一个或多个节点,每个节点就叫做Broker。
Partition
Topic中的数据可以分割为一个或多个Partition,Partition在底层逻辑是log文件,每个Partition由多个Segment组成,任何发送到kafka的消息最终都是会被追加到log文件的尾部。
Leader
Topic的Partition允许有若干个副本,每个分区都一个leader和多个follower,leader负责所有的读写 *** 作。
Follower
Follower追随Leader,所有的读写请求都是通过Leader路由,请求会广播给所有的Follower,Follower和Leader保持数据同步。如果Leader失效,通过Follower选举一个新的Leader
下面通过一张简单的UML图简要说明组件之间的交互和关联关系
主要关系说明如下
- kafka集群可以有1个或多个Broker
- Broker 可以包含多个副本(每个分区可以包含多个副本,通常每个分区副本数不会多于Broker数量,一个broker中包含很多个分区)
- Topic可以有1个或多个分区
- broker中的每个partition可以有0个或1个副本
- 一个partition有一个leader副本和0个或多个follower副本
- partition的每个副本都必须位于单独的broker上
- 每个partition副本位于一个broker上,并且一个partition不能划分多个broker。
Kafka架构
下面重点介绍Producer、Topic、Consumer的关系,一个简单生产消费的过程例子如下图所示
在这个例子中,一个生产者将消息发送给单个topic
上面这个图中,1个生产者发布消息到1个topic,一个消费者消费1个Topic,如上图中的Producer 1和Consumer 1;一个Topic可以是由多个生产者发布消息,如Topic4;1个消费者可以消费多个Topic,如图中的Consumer 2。
如上图的例子,一个生产者可以给多个Topic发布消息。一个消费者同一时间只能给一个topic发布消息,但是可以使用异步发布消息,生产者可以一次将多个消息发送给多个Topic
生产者负责将每条消息发送到分区,默认分区由消息key通过hash算法确定,如果没有指定消息key,则通过循环轮询来确定分区。但是在实际业务场景中,默认的分区行为并不能满足业务需要,比如需要确保消息的顺序或需要将消息平均分配给消费者等等。因此,生产者在发布消息的时候可以使用自定义分区方式,为消息指定分区key、重写分区算法或手动设置分区等方式将消息发布到特定分区。
kafka内部运作的基本逻辑大概为:每个主题都有1个或多个分区,这些分区分不在1个或多个Broker上,为了提高消息的可靠性不会丢失,可以配置多个副本因子,这样每个分区可以被复制到一个或多个Broker上,每个分区对应一个log文件,只能被一个消费组中的一个消费者消费,用于提高Topic的并发性。因此一般将消费组消费者数量设置为小于或者等于topic的分区数量,如果要增加消费者也相应的增加对应的分区数量。
同一个分区内的消息是由顺序的,每个分区仅能被同一个消费组中的一个消费者顺序消费。但是,当多个消费组订阅了同一个topic,则每个组中的消费者都会收到消息。
下面例子说明多分区情况下,消费者组和消费者消费的几种情况。
分区数和消费者数相同,如下图所示
这种情况,同一个消费组的每个消费者只消费一个分区。
另外一种情况,消费组中的消费者数量多于分区数,如下图所示
消费者数量多于分区数,则某些消费者就处于空闲状态,当有消费者down掉或添加新的分区情况时,空闲消费者将发挥作用。
另外一种情况,消费者数比分区数少,如下图所示
这种情况,导致某些分区需要负责更多的分区,处理更多的消息。
最后,多个消费组消费了同一个topic
topic消息被广播到每个消费组,每个消费组都可以接受同一个消息。这是kafka实现一个Topic广播的方式,一个Topic可以被多个Conumse Group的消费者同时消费;同一个消息只能被一个消费者组中的一个消费者消费。
在 kafka 中, topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 kafka 集群的消息都有一个topic。物理上来说,不同的 topic 的消息是分开存储的,每个 topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。
partition分区是topic的进一步拆分,每个topic可以拆分为多个partition分区,类似于数据库中表的水平拆分,每条消息都会分到某一个分区当中,分区内部会给消息分配一个offset来表示消息的顺序所在。
多个生产者可以向topic发送消息,消息可以按照一定规则均匀分布在各个partition里面,由于各个partition物理上也是隔离存储的,这点就类似于数据库对于表做水平拆分来提高性能。
扩展资料
Kafka它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
参考资料来源:百度百科-Kafka
下面我们来介绍下Kafka rebalance的原理。kafka的consumer group有多个consumer实例,这些consumer实例是怎么分配topic分区的呢,这就需要kafka rebalance来实现这个功能。rebalance是怎么触发的,有3个条件,可以触发kafka的rebalance流程。1组成员发生变更。当有新的consumer加入组,或者已有consumer崩溃时则触发rebalance流程。
2组订阅的topic数发生变更。当consumer group订阅了新的topic时,也会触发rebalance流程。
3组订阅的topic的分区数发生变更。当consumer group订阅的topic的分区数变更时,也会触发rebalance流程。
最多的情况就是第一个条件发生时,比如consumer实例崩溃,或者设置的consumer参数requesttimeoutms,maxpollrecords和maxpollintervalms不合理时,则会触发rebalance流程。
rebalance分区分配时,使用的是rebalance协议,下面介绍一下这个协议。kafka提供了5个协议来处理rebalance *** 作。
JoinGroup请求: consumer请求加入组。
SyncGroup请求: group leader把分配方案同步更新到组内所有成员中。
LeaveGroup请求: consumer即将离开组的请求。
DescribeGroup请求: 查看组的所有成员信息,包括成员信息,协议信息,分配方案以及订阅信息等。
Heartbeat请求: consumer发送心跳请求。
consumer group在执行rebalance之前必须确定coordinator所在的broker,并创建与该broker通信的连接。成功连接coordinator之后就可以执行rebalance *** 作。目前rebalance主要分为两步:加入组和同步更新分配方案。
加入组:这一步中组内所有consumer向coordinator发送JoinGroup请求。当所有JoinGroup请求都发送完成后,coordinator会从中选择一个consumer成为group的leader,并把所有成员信息以及它们的订阅信息发送给leader。
同步更新分配方案。这一步中leader开始制定分配方案,即根据分配策略决定每个consumer都负责哪些topic分区。一旦分配完成,leader会把这个分配方案放入到SyncGroup请求并发送给coordinator。而coordinator接收到分配方案后把属于每个consumer的方案放入SyncGroup请求的response返回给consumer。
发送SyncGroup请求会同步每个consumer的状态信息,在所有成员都接受到SyncGroup的response后,每个成员按照coordinator的方案进行工作。
consumer group的分配方案是在consumer端执行的。这样即使以后分区策略发生了变更,也只需要重启consumer实例即可,不需要重启kafka broker。
到这里,我们介绍了kafka rebalance的原理,kafka rebalance的原理我们就讲到这里了。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)