分布式的基于发布订阅模式的消息队列1、概念 1)Producer
消息生产者,就是向 kafka broker 发消息的客户端2)Consumer
消息消费者,向 kafka broker 取消息的客户端3)Consumer Group (CG)
消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据, 一个分区只能由一个组内消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者4)Broker
一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。 一个 broker可以容纳多个 topic5)Topic
可以理解为一个队列, 生产者和消费者面向的都是一个 topic6)Partition
为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列7)Replica
副本,为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失, 且 kafka 仍然能够继续工作, kafka 提供了副本机制, 一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower8)leader
每个分区多个副本的“主”,生产者发送数据的对象, 以及消费者消费数据的对象都是 leader9)follower
每个分区多个副本中的“从”,实时从 leader 中同步数据, 保持和 leader 数据的同步。 leader 发生故障时,某个 follower 会成为新的 leader , follower与leader必定不在同一个broke上,消息的生产、消费只连接leader, 不会连接Follower10)Topic与partition的不同
是逻辑上概念,partition是物理上的概念。11)Kafka采用分片分区的方式
(防止log过大导致数据定位效率下降),每个partition分为多个segment, 每个segment由1个index和log组成 xxx.log 是存放实际数据的 --log.segment.bytes是配置大小的,默认一个日志片段是1G。 文件命名为当前文件的最小偏移量 xxx.index 是存放当前log索引(每条消息的偏移量和对应大小), 用于快速定位消息位置每个索引长度固定2、Partition作用 1)对topic的负载均衡能力 2)提高读写并发量 3)方便在集群中扩展
一个partition只能被一个consumer group中的一个consumer消费, 但是可以同时存在多个CG Topic partition的副本数<可用broker数二、Zookeeper 作用: 1)集群工作依赖ZK, 2) 0.9版本之前,消费者消费的位置信息会存储在ZK(内存中也会有一份) 3) 0.9版本之后offset存储在kafka系统的固定topic中,也可以存储在ZK中 4)选举
Kafka集群中有一个broker会被选举为controller,负责管理集群broker的上下线, 所有topic的分区副本分配和leader选举等工作, Controller的管理工作依赖ZK,controller文件中的brokerid(抢资源)。三、配置 *** 作 1)Kafka的服务配置文件
Server.properties Broker.id 要求全局唯一整数2)常用命令
Kafka-topic.sh topic的增删改查(删除需参数delete.topic.enable=true ) --partitions 定义分区数 --topic topic名 --replication-factor 定义副本数 --config retention.bytes=xxx 设置partition大小3)集群日志
server.log4)Producer命令
Kafka-console-producer.sh –-topic topic_name –broker-list :9092(broker端口)5)Consumer命令
(是0.9版本前的命令)Kafka-console-consumer.sh --topic topic_name –-zookeeper 端口 ##0.9版本后,offset值不存储在ZK中,故customer直接连接broker (0.9版本后) Kafka-console-consumer.sh --topic topic_name –-bootstrap-server :9092(端口),也可以连接ZK消费数据 –from-beginning 表示从头消费,不加参数表示实时消费 0.9版本后offset存储在__consumer_offsets这个topic中,50个partitions6)Procucer生产数据写入partition
默认是轮询写入的四、Kafka preducer生产者 1、分区的原因 1) 方便在集群中扩展 2) 可以提高并发 2、生产者写入方式 1)可以指定发送的partition 2) 不指定partition,
有key时,按照key的hash值与topic的partition数量取余得到partition的值,再顺序发送3) 既不指定partition,又无key时
第一次调用时随机生成一个整数,这个值与topic的可用partition数取余得到partition的值, 后续递增,即round-robin轮询3、数据可靠性 1)每个partition(leader)收到producer消息后,向producer发送ack。Producer收到ack后,进行下一轮消息发送 2)副本同步策略
要求全部同步完成,才返回ack,原因:选取新的leader时, 容忍n台节点故障,只需要n+1个副本即可,缺点,延时会高些3)ISR
leader维护了一个动态ISR,即和leader保持同步的follower集合, 当ISR中的follower完成数据同步后,leader就会给follower发送ack。 如果follower长时间未向leader同步数据,则该follower会被剔除出ISR, 时间阈值replica.lag.time.max.ms设置,leader故障后,会从ISR中选取新leader (假设5副本,则ISR中可以有3个follower,即3个副本同步完成后,即发送ack)4)Acks参数配置
0:producer不等待broker的ack,最低延时,但可能数据丢失 1:producer 只等待leader的ack,不等待follower, 如果follower同步完成之前,leader发生故障,会数据丢失 -1(all):producer等待partition的leader和follower(ISR中)全部落盘后返回ack。 缺点,同步完成后,broker发送ack之前,leader故障,会有数据重复。 {当ISR中只有leader时,leader故障,也会有数据丢失(极端情况)}5)数据一致性问题(消费、存储的一致性)
LEO(log end offset):每个副本的最后一个offset,即每个副本最大的一个offset HW:所有副本最小的LEO,即消费者能见到的最大的offset,ISR队列中最 小的LEO HW(high watermark)之前的数据才对consumer可见 Leader故障后,会从ISR选出新的leader,为保证副本之前的数据一致性, 其余follower会将各自log文件高于HW的部分截掉, 从新的leader同步数据(只能保证数据一致,不能保证数据丢失或者重复) Follower故障后,会被踢出ISR,待恢复后,follower会读取本地磁盘记录的上次的HW, 并将高于HW的log部分截掉,从HW处开始向leader同步, 等待follower的LEO大于等于该partition的HW,就可以重新加入ISR6)Exactly Once语义,即要求数据既不丢失也不重复
Kafka 0.11版本后引入幂等性概念 At Least Once(ACK=-1) + 幂等性=Exaclty once 启用幂等性 ,将pruducer参数enable.idompotence设置为true,即在broker段会对数据去重 幂等性原理:producer在初始化时会被分配一个PID,发往同一个partition的消息会附带Sequence Number。 Broker端会对四、事务(0.11版本后)做缓存,当具有相同主键的消息提交时, Broker只会持久化一条 幂等性不足:produce PID重启会发生变化,同时不同的partition具有不同的主键, 所以无法保证跨分区会话的Exactly Once。只能解决单次会话单个分区的数据去重
保证kafka在Exactly Once语义基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败1) producer事务
引入Transaction ID(客户端提供,全局唯一)并将producer获得的ID与该ID绑定, 这样,preducer重启后就可以通过Transaction ID 获取原来producer ID,保证幂等性 引入新的组件Transaction Coordinator,producer通过和该组件交互获取Transaction ID对应的任务状态。 该组件还负责将事务所有写入kafka的一个内部topic,这样即使服务重启, 进行中的事务状态可以得到恢复五、kafka customer 消费者 1、消费方式
采用拉取的方式 不足:如果kafka中没有对应topic数据,customer就会陷入循环中,一直返回空数据。 弥补方式:消费者在消费数据时,会传入一个时间参数timeout,如果当前没有数据可消费, customer会间隔一段时间再次返回,减少资源浪费 request.timeout.ms2、分区分配策略
(customer数量发生变化时,就会触发重分配)1)roundRobin 轮询 (按组划分)
将多个topic当作一个整体对待,构造TopicAndPartition类,通过hash值进行排序,对类进行轮询 优点:customer之间差值分区相差小 缺点:使用时要保证customer group内订阅topic一致2)Range 范围(默认使用)(按照主题划分)
可能或有消费者数据不对等问题3、offset维护
组成:group+customer+partition来确定offset1) ZK中保存
目录结构:customergroupoffset partition2)本地保存(0.9版本后)默认保存在kafka固定topic中(__consumer_offsets中)
要消费该topic,需要先修改配置文件(consumer.properties) Exclude.internal.topics=false六、高效读写 1、顺序读写磁盘 2、零复制技术 七,常用命令(此处使用华为集群,所以端口号有变化) 1. 查看当前集群Topic列表。
bin/kafka-topics.sh --list --zookeeper2. 查看单个Topic详细信息。
bin/kafka-topics.sh --describe --zookeeper--topic
kafka-broker-info.sh --zookeeper3. 删除Topic
bin/kafka-topics.sh --delete --zookeeper--topic
bin/kafka-topics.sh --create --zookeeper
partitions 6 --replication-factor 2 --topic
服务端“allow.everyone.if.no.acl.found”配置为True。 bin/kafka-console-producer.sh --broker-list--topic 称> --old-producer -sync
6、消费数据服务端“allow.everyone.if.no.acl.found”配置为True。 bin/kafka-console-consumer.sh --zookeeper--topic 7、赋Consumer权限命令
--from-beginning 由管理员用户 *** 作。 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=24002/kafka > --add --allow-principal User:<用户名> --consumer --topic
8、赋Producer权限命令称> --group <消费者组名称> 由管理员用户 *** 作。 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=24002/kafka > --add --allow-principal User:<用户名> --producer --topic
9、New Producer API生产消息 需要拥有该Topic生产者权限。 bin/kafka-console-producer.sh --broker-list--topic 称> --producer.config config/producer.properties
10、New Consumer API消费数据需要拥有该Topic的消费者权限 bin/kafka-console-consumer.sh --topic--bootstrap-server IP:21007> --new-consumer --consumer.config config/consumer.properties
11、recovery-point-offset-checkpoint##记录topic已经被写入磁盘的offset,可能比HW小,因为很可能到内存中还没flush到磁盘12、replication-offset-checkpoint##记录已经被复制到别的topic上的文件,也就是HW13、修改topic存储时间./kafka-topics.sh --zookeeper 166.0.1.11:24002/kafka --topic tb_sub_all_sf_fusion_stat_new --alter --config retention.ms=345600000欢迎分享,转载请注明来源:内存溢出
评论列表(0条)