consumer 采用 pull(拉)模式从 broker 中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数
据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有
数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。
分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。
可以通过分区策略体现消息顺序性。分区策略有轮询策略、随机策略、按消息键保序策略。
处理顺序 :拦截器->序列化器->分区器
消息在通过 send() 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。
一般来说如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。开发者可以继承AbstractPartitionAssignor实现自定义消费策略,从而实现同一消费组内的任意消费者都可以消费订阅主题的所有分区。
当前消费者需要提交的消费位移是offset+1
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。
Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件。
Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。
每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。
日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。
日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。
在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用 kafka-topicssh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka 中有多种延时 *** 作,比如延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。
延时 *** 作创建之后会被加入延时 *** 作管理器(DelayedOperationPurgatory)来做专门的处理。延时 *** 作有可能会超时,每个延时 *** 作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。
为了实现生产者的幂等性,Kafka 为此引入了 producer id(以下简称 PID)和序列号(sequence number)这两个概念。
Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在
初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子 *** 作来处理,同时成功或失败,即使该生产或消费会跨多个分区。
生产者必须提供唯一的transactionalId,启动后请求事务协调器获取一个PID,transactionalId与PID一一对应。
每次发送数据给<Topic, Partition>前,需要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该<Transaction, Topic, Partition>存于__transaction_state内,并将其状态置为BEGIN。
在处理完 AddOffsetsToTxnRequest 之后,生产者还会发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中
一旦上述数据写入 *** 作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。
在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。
Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所
有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的。
(一)消费者和消费者组
1、消费者:订阅并消费kafka消息,从属于消费者组
2、消费者组:一个群组里的消费者订阅的是同一个主题,每个消费者接受主题一部分分区的消息。
注:同一个消费者可以消费不同的partition,但是同一个partition不能被不同消费者消费。
(二)消费者群组和分区再均衡
1、再均衡:分区的消费所有权从一个消费者转移到另一个消费者称为再均衡,为消费者组带来了高可用性和可伸缩性。
注:分区何时重新分配:加入消费者或者消费者崩溃等
2、如何判断消费者崩溃:消费者通过向群组协调器(某broker,不同群组可以有不同的群组协调器)发送心跳(一般在拉取消息或者提交偏移量的时候)表示自己仍旧存活,如果长时间不发送心跳则协调器认为期死亡并进行再均衡。
注:在0101版本中,心跳行为不再和获取消息和提交偏移量绑定在一起,有一个单独的心跳线程。
3、分配分区:消费者加入消费者组是,会像群组协调器发送请求,第一个加入的成为“群主”。群主从协调器那里获取成员列表,并负责给每一个消费者分配分区。完毕之后,将分配结果发送给协调器,协调器再将消息发送给所有的消费者,每个消费者只能看到自己的分配信息。只有群主知道所有的消费信息。
(三)参数配置
1、bootstrapserver:host:port
2、keyserializer:键序列化器
3、valueserializer:值序列化器
注:以上为必须设置的
4、groupid:从属的消费者组
5、fetchminbytes:消费者从服务器获取记录的最小字节数。
6、fetchmaxwaitms:消费者等待消费消息的最大时间
7、maxpartitionfetchbytes:服务器从每个分区返回给消费者的最大字节数(需要比broker的设置maxmessagesize属性配置大,否则有些消息无法消费)
8、sessiontimeoutms:指定该消费者在被认为死亡之前可以与服务器断开连接的时间,默认3秒
9、heartbeatintervalms:制定了poll方法向协调器发送心跳的频率。
注:一般9是8的三分之一
10、autooffsetreset:消费者在读取一个没有偏移量分区或者无效偏移量分区的情况下如何处理(latest:从最新记录开始读取,earliest:从最早的记录开始读取)
11、enableauthcommit:消费者是否自动提交偏移量,默认为true
12、autocommitintervalms:自动提交偏移量的时间间隔
13、partitionassignmentstrategy:分区分配给消费者的策略:
(1)range:会把主题若干个连续分区分配给消费者
(2)roundRobin:会把主题的所有分区逐个分配给消费者
14、clientid:任意字符串,broker用来区分客户端发来的消息
15:maxpollrecords:控制poll方法返回的最大记录数
16:receivebufferbytes/sendbufferbytes:tcp缓冲池读写大小
(四)订阅主题
consumersubscribe(list)
(五)轮训(消费者API的核心)
1、轮训作用: 只要消费者订阅了主题,轮训就会处理所有的细节(群组协调、分区再均衡、发送心跳、获取数据)
(1)获取数据
(2)第一次执行poll时,负责查找协调器,然后加入群组,接受分配的分区
(3)心跳的发送
(4)再均衡也是在轮训期间进行的
2、方法:poll(),消费者缓冲区没有数据时会发生阻塞,可以传一个阻塞时间,避免无限等待。0表示立即返回。
3、关闭:close(),网络连接随之关闭,立即触发再均衡。
4、线程安全:无法让一个线程运行多个消费者,也无法让多个线程公用一个消费者。
(六)提交和偏移量
1、提交:更新分区当前位置的 *** 作
2、如何提交:消费者往一个特殊主题(_consumer_offset)发送消息,消息中包含每个分区中的偏移量。
3、偏移量:分区数据被消费的位置。
4、偏移量作用:当发生再均衡时,消费者可能会分配到不一样的分区,为了继续工作,消费者需要读取到每个分区最后一次提交的偏移量,然后从偏移量的地方继续处理。
5、提交偏移量的方式
(1)自动提交:经过一个时间间隔,提交上一次poll方法返回的偏移量。每次轮训都会检测是否应该提交偏移量。缺陷:可能导致重复消费
(2)手动提交:commitSysn()提交迁移量,最简单也最可靠,提交由poll方法返回的最新偏移量。缺点:忘了提交可能会丢数据,再均衡可能会重复消费
(3)异步提交:同步提交在提交过程中必须阻塞
(4)同步异步提交组合
(5)提交特定的偏移量
(七)再均衡监听器
(八)从特定偏移量读取数据(seek)
1、从分区开始:seekToBegining
2、从分区结束:seekToEnd
3、ConsumerRebalanceListener和seek结合使用
(九)如何退出
1、前言:wakeup方法是唯一安全退出轮训的方法,从poll方法中退出并抛出wakeupException异常。如果没有碰上轮训,则在下一次poll调用时抛出。
2、退出轮训
(1)另一个线程调用consumerwakeup方法
(2)如果循环在主线程里可以在ShutdownHook里面调用该方法
3、退出之前调用close方法:告知协调器自己要离开,出发再均衡,不必等到超时。
(十)独立消费者(assign为自己分配分区)
Kafka的高吞吐能力、缓存机制能有效的解决高峰流量冲击问题。实践表明,在未将kafka引入系统前,当互联网关发送的数据量较大时,往往会挂起关系数据库,数据常常丢失。在引入kafka后,更新程序能够结合能力自主处理消息,不会引起数据丢失,关系型数据库的压力波动不会发生过于显著的变化,不会出现数据库挂起锁死现象。
依靠kafka的订阅分发机制,实现了一次发布,各分支依据需求自主订阅的功能。避免了各分支机构直接向数据中心请求数据,或者数据中心依次批量向分支机构传输数据以致实时性不足的情况。kafka提高了实时性,减轻了数据中心的压力,提高了效率。为了帮助大家让学习变得轻松、高效,给大家免费分享一大批资料,帮助大家在成为大数据工程师,乃至架构师的路上披荆斩棘。在这里给大家推荐一个大数据学习交流圈:658558542 欢迎大家进群交流讨论,学习交流,共同进步。
当真正开始学习的时候难免不知道从哪入手,导致效率低下影响继续学习的信心。
但最重要的是不知道哪些技术需要重点掌握,学习时频繁踩坑,最终浪费大量时间,所以有有效资源还是很有必要的。
消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。
消费方式
consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)。
消费者组的偏移量等信息存储在zookeeper中的consumers节点中。
61 Kafka Producer 压力测试
record-size 是一条信息有多大,单位是字节。
num-records 是总共发送多少条信息。
throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。
以上就是关于3分钟带你彻底搞懂 Kafka全部的内容,包括:3分钟带你彻底搞懂 Kafka、Kafka架构及基本原理简析、Kafka总结等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)