一探究竟,详解Kafka生产者和消费者的工作原理!

一探究竟,详解Kafka生产者和消费者的工作原理!,第1张

对于每个主题,Kafka群集都会维护一个分区日志,如下所示:

每个分区(Partition)都是有序的(所以每一个Partition内部都是有序的),不变的记录序列,这些记录连续地附加到结构化的提交日志中。分区中的每个记录均分配有一个称为偏移的顺序ID号,该ID 唯一地标识分区中的每个记录。

每个消费者保留的唯一元数据是该消费者在日志中的偏移量或位置。此偏移量由使用者控制:通常,使用者在读取记录时会线性地推进其偏移量,但实际上,由于位置是由使用者控制的,因此它可以按喜欢的任何顺序使用记录。例如,使用者可以重置到较旧的偏移量以重新处理过去的数据,或者跳到最近的记录并从“现在”开始使用。(类似于游标指针的方式顺序处理数据,并且该指标可以任意移动)

分区的设计结构

生产者分区策略是 决定生产者将消息发送到哪个分区的算法, 主要有以下几种:

kafka消息的有序性,是采用消息键保序策略来实现的。 一个topic,一个partition(分割),一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue。

kafka发送进行消息压缩有两个地方,分别是生产端压缩和Broker端压缩。

生产者端压缩 生产者压缩通常采用的GZIP算法这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。 配置参数:

Broker压缩 大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但以下情况会引发Broker压缩

消费者端解压 Kafka 会将启用了哪种压缩算法封装进消息集合中,在Consummer中进行解压 *** 作。

kafka提供以下特性来保证其消息的不丢失,从而保证消息的可靠性

生产者确认机制 当 Kafka 的若干个 Broker(根据配置策略,可以是一个,也可以是ALL) 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。

生产者失败回调机制 生产者不要使用 producersend(msg),而要使用 producersend(msg, callback)。记住,一定要使用带有回调通知的 send 方法。producersend(msg, callback) 采用异步的方式,当发生失败时会调用callback方法。

失败重试机制 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

消费者确认机制 确保消息消费完成再提交。Consumer 端有个参数 enableautocommit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

副本机制 设置 replicationfactor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 设置 mininsyncreplicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 确保 replicationfactor > mininsyncreplicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replicationfactor = mininsyncreplicas + 1。

限定Broker选取Leader机制 设置 uncleanleaderelectionenable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。

由于kafka生产者确认机制、失败重试机制的存在,kafka的消息不会丢失但是存在由于网络延迟等原因造成重复发送的可能性。 所以我们要考虑消息幂等性的设计。 kafka提供了幂等性Producer的方式来保证消息幂等性。使用 的方式开启幂等性。

幂等性 Producer 的作用范围:

Kafka事务 事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。 同样使用 的方式开启事务。

consumer group是kafka提供的可扩展且具有容错性的消费者机制。它是由一个或者多个消费者组成,它们共享同一个Group ID 组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。

consummer group有以下的特性:

消费者位置 消费者位置,即位移。 消费者在消费的过程中需要记录自己消费了多少数据。 位移提交有自动、手动两种方式进行位移提交。

Kafka通过一个内置Topic(__consumer_offsets)来管理消费者位移。

rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。

Kafka提供了一个角色:coordinator来执行对于consumer group的管理。 Group Coordinator是一个服务,每个Broker在启动的时候都会启动一个该服务。Group Coordinator的作用是用来存储Group的相关Meta信息,并将对应Partition的Offset信息记录到Kafka内置Topic(__consumer_offsets)中。

Rebalance 过程分为两步:Join 和 Sync。 Join 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

    Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

JMS(Java Message Service)是Java提供的一套技术规范

    用来异构系统 集成通信,缓解系统瓶颈,提高系统的伸缩性增强系统用户体验,使得系统模块化和组件化变得可行并更加灵活

(1) 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

    点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此。

(2) 发布/订阅模式(一对多,数据生产后,推送给所有订阅者)

    发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

    kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume *** 作。

kafka集群支持热扩展

消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

支持数千个客户端同时读写

    一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

    解耦和生产者和消费者、缓存消息等。

    Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

    Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种 *** 作的集中反馈,比如报警和报告。

    比如spark streaming和storm

    Kafka每个主题的多个分区日志分布式地存储在Kafka集群上,同时为了故障容错,每个分区都会以副本的方式复制到多个消息代理节点上。其中一个节点会作为主副本(Leader),其他节点作为备份副本(Follower,也叫作从副本)。主副本会负责所有的客户端读写 *** 作,备份副本仅仅从主副本同步数据。当主副本出现故障时,备份副本中的一个副本会被选择为新的主副本。因为每个分区的副本中只有主副本接受读写,所以每个服务器端都会作为某些分区的主副本,以及另外一些分区的备份副本,这样Kafka集群的所有服务端整体上对客户端是负载均衡的。

    Kafka的生产者和消费者相对于服务器端而言都是客户端。

    Kafka生产者客户端发布消息到服务端的指定主题,会指定消息所属的分区。生产者发布消息时根据消息是否有键,采用不同的分区策略。消息没有键时,通过轮询方式进行客户端负载均衡;消息有键时,根据分区语义(例如hash)确保相同键的消息总是发送到同一分区。

    Kafka的消费者通过订阅主题来消费消息,并且每个消费者都会设置一个消费组名称。因为生产者发布到主题的每一条消息都只会发送给消费者组的一个消费者。所以,如果要实现传统消息系统的“队列”模型,可以让每个消费者都拥有相同的消费组名称,这样消息就会负责均衡到所有的消费者;如果要实现“发布-订阅”模型,则每个消费者的消费者组名称都不相同,这样每条消息就会广播给所有的消费者。

    分区是消费者现场模型的最小并行单位。如下图(图1)所示,生产者发布消息到一台服务器的3个分区时,只有一个消费者消费所有的3个分区。在下图(图2)中,3个分区分布在3台服务器上,同时有3个消费者分别消费不同的分区。假设每个服务器的吞吐量时300MB,在下图(图1)中分摊到每个分区只有100MB,而在下图(图2)中,集群整体的吞吐量有900MB。可以看到,增加服务器节点会提升集群的性能,增加消费者数量会提升处理性能。

    同一个消费组下多个消费者互相协调消费工作,Kafka会将所有的分区平均地分配给所有的消费者实例,这样每个消费者都可以分配到数量均等的分区。Kafka的消费组管理协议会动态地维护消费组的成员列表,当一个新消费者加入消费者组,或者有消费者离开消费组,都会触发再平衡 *** 作。

    Kafka的消费者消费消息时,只保证在一个分区内的消息的完全有序性,并不保证同一个主题汇中多个分区的消息顺序。而且,消费者读取一个分区消息的顺序和生产者写入到这个分区的顺序是一致的。比如,生产者写入“hello”和“Kafka”两条消息到分区P1,则消费者读取到的顺序也一定是“hello”和“Kafka”。如果业务上需要保证所有消息完全一致,只能通过设置一个分区完成,但这种做法的缺点是最多只能有一个消费者进行消费。一般来说,只需要保证每个分区的有序性,再对消息假设键来保证相同键的所有消息落入同一分区,就可以满足绝大多数的应用。

Consumer消费消息之后不需要手动提交,consumer客户端会自动提交已经消费的消息的offset。

调用栈为:

KafkaConsumer#assign

ConsumerCoordinator#maybeAutoCommitOffsetsNow

调用栈为:

KafkaConsumer#poll

KafkaConsumer#pollOnce

ConsumerCoordinator#poll

ConsumerCoordinator#maybeAutoCommitOffsetsAsync

调用栈为:

KafkaConsumer#poll

KafkaConsumer#pollOnce

ConsumerCoordinator#poll

AbstractCoordinator#ensureActiveGroup

AbstractCoordinator#joinGroupIfNeeded

AbstractCoordinator#onJoinPrepare

ConsumerCoordinator#maybeAutoCommitOffsetsSync

只要开启了自动提交,此处是一定会向协调者同步提交位移,因为需要重新rebalance,消费者组中各个消费者的分区既有可能会发生改变,重新消费之前一定要获取最新的唯一,尽最大努力避免重复消费。

调用栈为:

KafkaConsumer#close

ConsumerCoordinator#close

关闭的时候肯定是要同步提交消费位移的。

consumer 采用 pull(拉)模式从 broker 中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数

据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有

数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。

可以通过分区策略体现消息顺序性。分区策略有轮询策略、随机策略、按消息键保序策略。

处理顺序 :拦截器->序列化器->分区器

消息在通过 send() 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。

一般来说如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。开发者可以继承AbstractPartitionAssignor实现自定义消费策略,从而实现同一消费组内的任意消费者都可以消费订阅主题的所有分区。

当前消费者需要提交的消费位移是offset+1

在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。

Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件。

Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。

日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。

日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。

在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用 kafka-topicssh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。

Kafka 中有多种延时 *** 作,比如延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。

延时 *** 作创建之后会被加入延时 *** 作管理器(DelayedOperationPurgatory)来做专门的处理。延时 *** 作有可能会超时,每个延时 *** 作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。

为了实现生产者的幂等性,Kafka 为此引入了 producer id(以下简称 PID)和序列号(sequence number)这两个概念。

Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在

初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。

Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子 *** 作来处理,同时成功或失败,即使该生产或消费会跨多个分区。

生产者必须提供唯一的transactionalId,启动后请求事务协调器获取一个PID,transactionalId与PID一一对应。

每次发送数据给<Topic, Partition>前,需要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该<Transaction, Topic, Partition>存于__transaction_state内,并将其状态置为BEGIN。

在处理完 AddOffsetsToTxnRequest 之后,生产者还会发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中

一旦上述数据写入 *** 作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。

在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。

Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所

有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的。

# flink消费kafka细节

Apache kafka connector提供对Kafka服务的事件流的访问。Flink提供了特殊的Kafka连接器,用于从Kafka主题读写数据。 Flink Kafka Consumer与Flink的检查点机制集成在一起,以提供一次精确的处理语义。 为此,Flink不仅仅依赖于Kafka的消费者群体偏移量跟踪,还内部跟踪和检查这些偏移量。

请为您的用例和环境选择一个包(Maven项目ID)和类名。 对于大多数用户来说,FlinkKafkaConsumer08(flink-connector-kafka的一部分)是合适的。

| Maven Dependency                | Supported since | Consumer and Producer Class name            | Kafka version | Notes                                                        |

| :------------------------------ | :-------------- | :------------------------------------------ | :------------ | :----------------------------------------------------------- |

| flink-connector-kafka-08_211  | 100          | FlinkKafkaConsumer08 FlinkKafkaProducer08  | 08x        | Uses the [SimpleConsumer](>

以上就是关于一探究竟,详解Kafka生产者和消费者的工作原理!全部的内容,包括:一探究竟,详解Kafka生产者和消费者的工作原理!、Kafka-概述、Kafka Consumer自动提交机制等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/10187465.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-06
下一篇 2023-05-06

发表评论

登录后才能评论

评论列表(0条)

保存