Kafka总结

Kafka总结,第1张

consumer 采用 pull(拉)模式从 broker 中读取数据

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数

据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有

数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。

可以通过分区策略体现消息顺序性。分区策略有轮询策略、随机策略、按消息键保序策略。

处理顺序 :拦截器->序列化器->分区器

消息在通过 send() 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。

一般来说如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。开发者可以继承AbstractPartitionAssignor实现自定义消费策略,从而实现同一消费组内的任意消费者都可以消费订阅主题的所有分区。

当前消费者需要提交的消费位移是offset+1

在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。

Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件。

Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。

每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。

日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。

日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。

在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用 kafka-topicssh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。

Kafka 中有多种延时 *** 作,比如延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。

延时 *** 作创建之后会被加入延时 *** 作管理器(DelayedOperationPurgatory)来做专门的处理。延时 *** 作有可能会超时,每个延时 *** 作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。

为了实现生产者的幂等性,Kafka 为此引入了 producer id(以下简称 PID)和序列号(sequence number)这两个概念。

Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在

初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。

Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子 *** 作来处理,同时成功或失败,即使该生产或消费会跨多个分区。

生产者必须提供唯一的transactionalId,启动后请求事务协调器获取一个PID,transactionalId与PID一一对应。

每次发送数据给<Topic, Partition>前,需要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该<Transaction, Topic, Partition>存于__transaction_state内,并将其状态置为BEGIN。

在处理完 AddOffsetsToTxnRequest 之后,生产者还会发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中

一旦上述数据写入 *** 作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。

在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。

Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所

有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的。

由于kafka吞吐量特别大,所以先考虑集群服务器的自身瓶颈,因为现在测试的是单机所以只会涉及到磁盘IO以及cpu,但是对于kafka来说对于cpu的使用还是可以忽略不计的,

11磁盘IO写入瓶颈

使用以下命令测试磁盘IO的写入瓶颈

sync;time -p bash -c "(dd if=/dev/zero of=testdd bs=1M count=20000)"

说明: 在当前目录下创建一个testdd的文件,写入20000个1M的数据

磁盘写入IO的结果

可以看到平均就是187MB/s

12 使用iostat命令监测磁盘io情况

使用命令

# iostat -x 1

说明: 扩展查看io性能,每秒刷新一次

注意: 如果没有iostat,请执行 yum install sysstat -y 进行安装 iostat命令

关注wkB/s和%util两个参数

wkB/s:每秒写入设备的数据量(单位:KB)

%util:消耗在I/O请求中的CPU时间百分比(设备带宽利用率)。如果该值接近100%说明设备出现了瓶颈。

如图现在这台机器的磁盘IO极限值为187MB/s

13 单机版测试kafka性能

因为测试的次数比较多,也没有去找kafka中数据存储设置,所以就使用docker部署单机版的kafka (因为测试的数据比较多,也就多次的删除了容器,重新启动镜像)

新建目录:

mkdir /usr/local/kafka_test

dockerfile

runsh

sourceslist

目录结构如下:

生成镜像

docker build -t kafka_test /usr/local/kafka_test

启动kafka

docker run -d -it kafka_test

测试结果

从表格中可以看出来五个分区就已经是极限了

结果分析

这中间并没有设置条数/每秒,所以就是按照kafka 就会按照量级自动的吞入数据,如果我们需要对于消息的即时性做控制,还需要再重新测试一下,按照业务的延迟找到最合适的数量(单机版,然后再部署集群,测试适合的数量)

集群测试:

部署就不再这里说明了

本次测试的是三台机器集群

测试结果:

之后还测试了9个分区的topic 因为空间不足所以就没有继续测下去,但是看部分数据还超过了500MB/s还是有上升空间的

13 磁盘IO 读取瓶颈

使用一下命令测试磁盘IO的读取瓶颈

hdparm -tT --direct /dev/vda

说明: hdparm命令是显示与设定硬盘的参数, -t参数为评估硬盘的读取效率(不经过磁盘cache), -T参数为评估硬盘的读取效率(经过磁盘cache)

Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。

入门请参照: >

1、定位:分布式的消息队列系统,同时提供数据分布式缓存功能(默认7天)

2、消息持久化到磁盘,达到O(1)访问速度,预读和后写,对磁盘的顺序访问(比内存访问还要快)

3、Storm(分布式的实时计算框架)

Kafka目标成为队列平台

4、基本组件:

Broker:每一台机器是一个Broker

Producer:日志消息生产者,主要写数据

Consumer:日志消息消费者,主要读数据

Topic:是虚拟概念,不同的consumer去指定的topic去读数据,不同producer可以往不同的topic去写

Partition:是实际概念,文件夹,是在Topic的基础上做了进一步分层

5、Partition功能:负载均衡,需要保证消息的顺序性

顺序性的保证:订阅消息是从头往后读取的,写消息是尾部追加,所以整体消息是顺序的

如果有多个partiton存在,可能会出现顺序不一致的情况,原因:每个Partition相互独立

6、Topic:逻辑概念

一个或多个Partition组成一个Topic

7、Partition以文件夹的形式存在

8、Partition有两部分组成:

(1)index log:(存储索引信息,快速定位segment文件)

(2)message log:(真实数据的所在)

9、HDFS多副本的方式来完成数据高可用

如果设置一个Topic,假设这个Topic有5个Partition,3个replication

Kafka分配replication的算法:

假设:

将第i个Partition分配到(i % N)个Broker上

将第i个Partition的第j个replication分配到( (i+j) % N)个Broker上

虽然Partition里面有多个replication

如果里面有M个replication,其中有一个是Leader,其他M-1个follower

10、zookeeper包系统的可用性,zk中会保存一些meta信息(topic)

11、物理上,不同的topic的消息肯定是分开存储的

12、偏移量——offset:用来定位数据读取的位置

13、kafka内部最基本的消息单位——message

14、传输最大消息message的size不能超过1M,可以通过配置来修改

15、Consumer Group

16、传输效率:zero-copy

0拷贝:减少Kernel和User模式上下文的切换

直接把disk上的data传输给socket,而不是通过应用程序来传输

17、Kafka的消息是无状态的,消费者必须自己维护已消费的状态信息(offset)

减轻Kafka的实现难度

18、Kafka内部有一个时间策略:SLA——消息保留策略(消息超过一定时间后,会自动删除)

19、交付保证:

at least once:至少一次(会有重复、但不丢失)

at most once:最多发送一次(不重复、但可能丢失)

exactly once:只有一次(最理想),目前不支持,只能靠客户端维护

20、Kafka集群里面,topic内部由多个partition(包含多个replication),达到高可用的目的:

日志副本:保证可靠性

角色:主、从

ISR:是一个集合,只有在集合中的follower,才有机会被选为leader

如何让leader知道follower是否成功接收数据(心跳,ack)

如果心跳正常,代表节点活着

21、怎么算“活着”

(1)心跳

(2)如果follower能够紧随leader的更新,不至于被落的太远

如果一旦挂掉,从ISR集合把该节点删除掉

前提:需要把zookeeper提前启动好

一、单机版

1、启动进程:

]# /bin/kafka-server-startsh config/serverproperties

2、查看topic列表:

]# /bin/kafka-topicssh --list --zookeeper localhost:2181

3、创建topic:

]# /bin/kafka-topicssh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test

4、查看topic描述:

]# /bin/kafka-topicssh --describe --zookeeper localhost:2181 --topic newyear_test

5、producer发送数据:

]# /bin/kafka-console-producersh --broker-list localhost:9092 --topic newyear_test

6、consumer接收数据:

]# /bin/kafka-console-consumersh --zookeeper localhost:2181 --topic newyear_test --from-beginning

7、删除topic:

]# /bin/kafka-topicssh --delete --zookeeper localhost:2181 --topic newyear_test

二、集群版

在slave1和slave2上的brokerid一定设置不同

分别在slave1和slave2上开启进程:

/bin/kafka-server-startsh config/serverproperties

创建topic:

]# /bin/kafka-topicssh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test

1、实现一个consumer group

首先在不同的终端分别开启consumer,保证groupid一致

]# python consumer_kafkapy

执行一次producer:

]# python producer_kafkapy

2、指定partition发送数据

]# python producer_kafka_2py

3、指定partition读出数据

]# python consumer_kafka_2py

consumer_kafkapy:

producer_kafkapy:

consumer_kafka_2py:

producer_kafka_2py:

1新建/conf/kafka_test/flume_kafkaconf

2启动flume:

]# flume-ng agent -c conf -f /conf/kafka_test/flume_kafkaconf -n a1 -Dflumerootlogger=INFO,console

启动成功,如下图:

3测试:

11flume监控产生的数据:

]# for i in seq 1 100 ; do echo '====> '$i >> 1log ; done

12kafka消费数据:

]# /bin/kafka-console-consumersh --zookeeper localhost:2181 --topic topic_1013 --from-beginning

消费结果如下图:

通常情况下,kafka集群中越多的partition会带来越高的吞吐量。但是,我们必须意识到集群的partition总量过大或者单个broker节点partition过多,都会对系统的可用性和消息延迟带来潜在的影响。未来,我们计划对这些限制进行一些改进,让kafka在分区数量方面变得更加可扩展。

以上就是关于Kafka总结全部的内容,包括:Kafka总结、kafka 单机/集群压力测试、Kafka相关内容总结(Kafka集群搭建手记)等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9802388.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-02
下一篇 2023-05-02

发表评论

登录后才能评论

评论列表(0条)

保存