Kafka基础-原理、运维与开发

Kafka基础-原理、运维与开发,第1张

Kafka基础-原理、运维与开发

文章目录

1.kafka基本原理

1.1 消息系统的作用1.2 kafka核心概念1.3 kafka集群架构1.4 kafka高性能高可用原理 2.kafka运维实战

2.1 kafka生产集群部署方案2.2 kafka常用命2.3 kafka监控-Kafkamanager 3.kafka开发实战

3.1 kafka生产者

3.1.1 生产者发消息原理3.1.2 生产者demo代码3.1.3 生产者核心参数 3.2 kafka消费者

3.2.1 消费者消费几个重要概念3.2.2 消费者demo代码3.2.3 消费者重要参数

1.kafka基本原理 1.1 消息系统的作用

解藕:统一接口,进行数据复用和分发,避免所有相关应用耦合关联缓冲:有助于控制消息流经系统的速度,解决数据生产和数据消费速度不一致问题顺序保证:大多数场景下,需要保证数据顺序性(kafka保证partition内消息顺序性)可恢复性:即使部分应用出现故障,也可以通过扩展性:当出现处理能力不足时,可以通过横向扩展进行扩容,增加处理能力 1.2 kafka核心概念

broker:kafka集群的代理服务节点topic:每条消息发送到kafka所属的类别,即kafka是面向topic的partition:为了实现高吞吐,每个topic可被设计为多个partition,每个partition存储topic的一部分数据producer:消息生产者,可以是产生消息的服务或应用consumer:消费者,消息的使用者comsumer group:High-level的API中,topic中的一条消息只能被consumer group中的一个consumer消费(group 共享同一个topic),但可以被多个consumer group 使用replica:partition的副本,保障partition的高可用leader: replica中的主节点角色,producer和consumer只与leader交互follower:replica中的从节点角色,负责复制lead partition的数据controller:kafka集群的总控制组件,包括:broker的上下线、leader partition的选举、topic数据的负载均衡、kafka集群的元数据管理等zookeeper: kafka元数据保存在zk中,kafka broker会监控controller角色,当controller宕机时,其他节点会进行争抢,成为controller,保证kafka服务正常 1.3 kafka集群架构

1.4 kafka高性能高可用原理

顺序写

相比随机写磁盘,顺序写能极大减少寻址等开销,提升写入速度

0拷贝

普通拷贝:涉及到应用程序(用户态)和 *** 作系统(内核态)之间转换,需要多次将数据进行copy

0拷贝:直接将内核态的缓存数据通过网卡发送给制定应用程序,减少中间的切换和数据复制

日志分段存储

每个分区都有自己的目录,如topic: test_topic 有3个分区,则分别命名: test_topic-0 test_topic-1 test_topic-2 三个目录,每个目录下边都会存放log segment file ,形式如下:

00000000000000000000.index 
00000000000000000000.log 
00000000000000000000.timeindex 

00000000000005367851.index 
00000000000005367851.log 
00000000000005367851.timeindex

log是日志文件,记录所有消息,文件名就是partition中的baseOffset

index是位移索引文件,间隔一定数据量会记录一条日志的索引数据,索引数据包括两部分:数据在log中的相对offset和磁盘中的绝对position

timeindex是时间索引文件,间隔一段时间会记录一条日志的时间索引数据,时间索引包括两部分:数据写入的timestamp和数据的相对offset

日志二分查找(稀疏索引)

kafka日志分段存储,同时间隔性将索引数据写入索引文件,那么根据稀疏索引,进行查找,日志查询的时间复杂度就变成O(n)=log2(n),极大提升查询速度

2.kafka运维实战 2.1 kafka生产集群部署方案

方案背景:假设公司每天的总消息量为10亿,在凌晨0点到8点之间几乎无数据,那么按照二八法则,80%的消息(8亿)会在16个小时内涌入,这8亿数据的80%(6.4亿)会在3小时内涌入,那么估算得到QPS峰值约为:640000000万÷(36060)=6万,每条消息按照50KB算,预估磁盘总共:10亿 x 50 KB ~= 50 T ,算上一个副本,大约每天增量100T

硬件估计:

机器:单台物理机支撑QPS大约3-5万(估计),我们通常建议:公司预算充足,尽量是让高峰QPS控制在集群能承载的总QPS的30%左右。峰值6万QPS,按照20万-30万的处理能力算,那么总共的物理机5-7台物理机磁盘:每条消息按照1KB算,预估磁盘总共:10亿 x 50 KB ~= 50 T ,算上一个副本,大约每天增量100T ,因为kafka是顺序写,使用普通sas磁盘即可内存:kafka自身的jvm是用不了过多堆内存的,因为kafka设计就是规避掉用jvm对象来保存数据,避免频繁fullgc导致的问题,所以一般kafka自身的jvm堆内存,分配个10G左右就够了,剩下的内存全部留给os cache。预估有100个Topic,一个topic有5个partition。那么总共会有500个partition。每个partition的大小是1G,我们有2个副本,也就是说要把100个topic的leader partition数据都驻留在内存里需要1000G的内存,实际只需要1/4的lead数据存放缓存(也可以调小),平均到5台机器,每台50G,那么加上其他服务缓存,需要64G内存(128G更好)CPU:过Kafka的Broker的网络设计模型。acceptor线程负责去接入客户端的连接请求,但是他接入了之后其实就会把连接分配给多个processor,默认是3个,但是说实话一般生产环境的话呢 ,建议大家还是多加几个,整体可以提升kafka的吞吐量比如说你可以加到6个,或者是9个。另外就是负责处理请求的线程,是一个线程池,默认是8个线程,在生产集群里,建议大家可以把这块的线程数量稍微多加个2倍~3倍,其实都正常,比如说搞个16个工作线程,24个工作线程。后台会有很多的其他的一些线程,比如说定期清理7天前数据的线程,Controller负责感知和管控整个集群的线程,副本同步拉取数据的线程,这样算下来每个broker起码会有上百个线程。4个Cpu core在几十个线程同时工作基本会被打满,因此预计需要16核(32核更好)网卡:现在的网基本就是千兆网卡(1GB / s),还有万兆网卡(10GB/s) ,每秒消息请求1万条,大约: 50KB x 10000 = 500M,然后同时有一个副本同步,预计网速限制至少1000M,千兆网卡基本达到要求,生产环境需要冗余一些

总结集群配置:

背景:10亿请求,6w/s的吞吐量,276T的数据,5台物理机
硬盘:11(SAS) * 7T,720
内存:64GB/128GB,JVM分配
CPU:16核/32核 
网络:千兆网卡,万兆更好
2.2 kafka常用命

创建主题

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic

test

查看主题

bin/kafka-topics.sh --list --zookeeper localhost:2181

发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test –

from-beginning

集群测试

测试生产数据

bin/kafka-producer-perf-test.sh --topic test-topic --num-records 500000 –

record-size 200 --throughput -1 --producer-props

bootstrap.servers=hadoop03:9092,hadoop04:9092,hadoop05:9092 acks=-1

测试消费数据

bin/kafka-consumer-perf-test.sh --broker-list

hadoop03:9092,hadoop04:9092,hadoop53:9092 --fetch-size 2000 --messages 500000 –

topic test-topic

2.3 kafka监控-Kafkamanager 3.kafka开发实战 3.1 kafka生产者 3.1.1 生产者发消息原理

1.每次发送消息都必须先把数据封装成一个ProducerRecord对象,里面包含了要发送的topic,具体在哪个分区,分区key,消息内容,timestamp时间戳,

2.然后这个对象交给序列化器,变成自定义协议格式的数据,接着把数据交给partitioner分区器,对这个数据选择合适的分区,默认就轮询所有分区,或者根据key来hash路由到某个分区,这个topic的分区信息,都是在客户端会有缓存的,当然会提前跟broker去获取。

3.接着这个数据会被发送到producer内部的一块缓冲区里,然后producer内部有一个Sender线程,会从缓冲区里提取消息封装成一个一个的batch,然后每个batch发送给分区的leader副本所在的broker。

3.1.2 生产者demo代码

略(网络查找)

3.1.3 生产者核心参数

常见异常处理

不管是异步还是同步,都可能让你处理异常,常见的异常如下: 
1)LeaderNotAvailableException:这个就是如果某台机器挂了,此时leader副本不可用,会导致你 写入失败,要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可。如果 说你平时重启kafka的broker进程,肯定会导致leader切换,一定会导致你写入报错,是 LeaderNotAvailableException 
2)NotControllerException:这个也是同理,如果说Controller所在Broker挂了,那么此时会有问 题,需要等待Controller重新选举,此时也是一样就是重试即可 
3)NetworkException:网络异常,重试即可 我们之前配置了一个参数,retries,他会自动重试的,但是如果重试几次之后还是不行,就会提供 Exception给我们来处理了。 
参数:retries 默认值是3 
参数:retry.backoff.ms 两次重试之间的时间间隔

提升消息吞吐量

1)buffer.memory:设置发送消息的缓冲区,默认值是33554432,就是32MB 如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说 这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住
2)compression.type,默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以 减小数据量,提升吞吐量,但是会加大producer端的cpu开销 
3)batch.size,设置每个batch的大小,如果batch太小,会导致频繁网络请求,吞吐量下降;如果 batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲 在内存里 默认值是:16384,就是16kb,也就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch 的值可以增大一些来提升吞吐量,可以自己压测一下 
4)linger.ms,这个值默认是0,意思就是消息必须立即被发送,但是这是不对的,一般设置一个100毫秒之 类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自 然就会发送出去。但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟 时间太长,也避免给内存造成过大的一个压力。

ACK参数

acks参数,其实是控制发送出去的消息的持久化机制的 
1)如果acks=0,那么producer根本不管写入broker的消息到底成功没有,发送一条消息出去,立马就可 以发送下一条消息,这是吞吐量最高的方式,但是可能消息都丢失了,你也不知道的,但是说实话,你如果真 是那种实时数据流分析的业务和场景,就是仅仅分析一些数据报表,丢几条数据影响不大的。会让你的发送吞 吐量会提升很多,你发送弄一个batch出,不需要等待人家leader写成功,直接就可以发送下一个batch 了,吞吐量很大的,哪怕是偶尔丢一点点数据,实时报表,折线图,饼图。 
2)acks=all,或者acks=-1:这个leader写入成功以后,必须等待其他ISR中的副本都写入成功,才可以 返回响应说这条消息写入成功了,此时你会收到一个回调通知 3)acks=1:只要leader写入成功,就认为消息成功了,默认给这个其实就比较合适的,还是可能会导致数 据丢失的,如果刚写入leader,leader就挂了,此时数据必然丢了,其他的follower没收到数据副本,变 成leader 如果要想保证数据不丢失,得如下设置: a)min.insync.replicas = 2,ISR里必须有2个副本,一个leader和一个follower,最最起码的一 个,不能只有一个leader存活,连一个follower都没有了 b)acks = -1,每次写成功一定是leader和follower都成功才可以算做成功,leader挂了,follower 上是一定有这条数据,不会丢失 
c) retries = Integer.MAX_VALUE,无限重试,如果上述两个条件不满足,写入一直失败,就会无限次 重试,保证说数据必须成功的发送给两个副本,如果做不到,就不停的重试,除非是面向金融级的场景,面向 企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失

重试乱序

消息重试是可能导致消息的乱序的,因为可能排在你后面试,此时消息就会乱序,所以可以使用“max.in.fligh样可以保证producer同一时间只能发送一条消息
3.2 kafka消费者 3.2.1 消费者消费几个重要概念

offset管理:早期通过zk管理offset元数据,但是zk不太适合做高并发 *** 作,后使用kafka自身内部的topic:consumer_offsets,默认50个分区,会根据提交的offset请求kv(key=groupid+topic+partition value=offset ),对key进行hash取模,放入对应的consumer_offsets的partition中

coordinator角色:每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance

rebalance策略(了解):1.range策略 2.round-robin策略 3.sticky策略

3.2.2 消费者demo代码

略(网络查找)

3.2.3 消费者重要参数
【heartbeat.interval.ms】 consumer心跳时间,必须得保持心跳才能知道consumer是否故障了,然后如果故障之后,就会通过心跳下 发rebalance的指令给其他的consumer通知他们进行rebalance的 *** 作 
【session.timeout.ms】 kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒 
【max.poll.interval.ms】 如果在两次poll *** 作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费 组,分区分配给别人去消费,一遍来说结合你自己的业务处理的性能来设置就可以了 
【fetch.max.bytes】 获取一条消息最大的字节数,一般建议设置大一些 
【max.poll.records】 一次poll返回消息的最大条数,默认是500条 
【connection.max.idle.ms】 consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要 重新建立socket连接,这个建议设置为-1,不要去回收 
【auto.offset.reset】 earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消 费 topica -> partition0:1000 partitino1:2000 latest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从当前位置 开始消费nonetopic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常 注:我们生产里面一般设置的是latest 
【enable.auto.commit】 这个就是开启自动提交唯一 
【auto.commit.ineterval.ms这个指的是多久条件一次偏移量

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5700229.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存