kafka原理+基本命令+集群搭建教程

kafka原理+基本命令+集群搭建教程,第1张

kafka原理+基本命令+集群搭建教程 ⼀、Kafka介绍

Kafka是最初由linkedin公司开发,是⼀个分布式、⽀持分区的(partition)、多副本的
(replica),基于zookeeper协调的分布式消息系统,它的最⼤的特性就是可以实时的处理
⼤量数据以满⾜各种需求场景:⽐如基于hadoop的批处理系统、低延迟的实时系统、
Storm/Spark流式处理引擎,web/nginx⽇志、访问⽇志,消息服务等等,⽤scala语⾔编
写,linkedin于2010年贡献给了Apache基⾦会并成为顶级开源 项⽬。

1.Kafka的使⽤场景

⽇志收集:⼀个公司可以⽤Kafka收集各种服务的log,通过kafka以统⼀接⼝服务的⽅式
开放给各种consumer,例如hadoop、Hbase、Solr等。消息系统:解耦和⽣产者和消费者、缓存消息等。⽤户活动跟踪:Kafka经常被⽤来记录web⽤户或者app⽤户的各种活动,如浏览⽹⻚、
搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过
订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖
掘。运营指标:Kafka也经常⽤来记录运营监控数据。包括收集各种分布式应⽤的数据,⽣产
各种 *** 作的集中反馈,⽐如报警和报告。 2.Kafka基本概念

kafka是⼀个分布式的,分区的消息(官⽅称之为commit log)服务。它提供⼀个消息系统应该
具备的功能,但是确有着独特的设计。可以这样来说,Kafka借鉴了JMS规范的思想,但是确
并没有完全遵循JMS规范。
⾸先,让我们来看⼀下基础的消息(Message)相关术语:

名称解释Broker消息中间件处理节点,⼀个Kafka节点就是⼀个broker,⼀个或者 多个Broker可以组成⼀个Kafka集群TopicKafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都需 要指定⼀个topicProducer消息⽣产者,向Broker发送消息的客户端Consumer消息消费者,从Broker读取消息的客户端ConsumerGroup每个Consumer属于⼀个特定的Consumer Group,⼀条消息可以 被多个不同的ConsumerGroup消费,但是⼀个Consumer Group 中只能有⼀个Consumer能够消费该消息Partition物理上的概念,⼀个topic可以分为多个partition,每个partition内 部消息是有序

因此,从⼀个较⾼的层⾯上来看,producer通过⽹络发送消息到Kafka集群,然后consumer
来进⾏消费,如下图:

服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。

⼆、kafka基本使⽤ 1.安装前的环境准备

安装jdk安装zk
https://editor.csdn.net/md/?articleId=122386019官⽹下载kafka的压缩包:http://kafka.apache.org/downloads解压缩⾄如下路径

/usr/local/java

修改配置⽂件:/usr/local//java/kafka2.13-3.0/config/server.propertie

#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
#kafka部署的机器ip和提供服务的端⼝号
listeners=PLAINTEXT://192.168.31.158:9092
#kafka的消息存储⽂件
log.dir==/usr/local/java/kafka/data/kafka-log
#kafka连接zookeeper的地址
zookeeper.connect=192.168.31.190:2181

2.启动kafka服务器

./kafka-server-start.sh -daemon …/config/server.properties

验证是否启动成功:
进⼊到zk中的节点看id是0的broker有没有存在(上线)
参考:https://blog.csdn.net/qq_40749830/article/details/122386019

ls /brokers/ids/

server.properties核⼼配置详解:

ropertyDefaultDescriptionbroker.id0每个broker都可以⽤⼀个唯⼀的⾮负整数id进⾏标 识;这个id可以作为broker的“名字”,你可以选择任 意你喜欢的数字作为id,只要id是唯⼀的即可。log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯⼀的,可 以是多个,路径之间只需要使⽤逗号分隔即可;每 当创建新partition时,都会选择在包含最少 partitions的路径下进⾏。listenersPLAINTEXT://192.168.65.60:9092server接受客户端连接的端⼝,ip配置kafka本机ip 即可zookeeper.connectlocalhost:2181zooKeeper连接字符串的格式为: hostname:port,此处hostname和port分别是 ZooKeeper集群中某个节点的host和port; zookeeper如果是集群,连接⽅式为 hostname1:port1, hostname2:port2, hostname3:port3log.retention.hours168每个⽇志⽂件删除之前保存的时间。默认数据保存 时间对所有topic都⼀样。num.partitions1创建topic的默认分区数default.replication.factor1⾃动创建topic的默认副本数量,建议设置为⼤于等 于2min.insync.replicas1当producer设置acks为-1时,min.insync.replicas 指定replicas的最⼩数⽬(必须确认每⼀个repica的 写数据都是成功的),如果这个数⽬没有达到, producer发送消息会产⽣异常delete.topic.enablefalse是否允许删除主题 3.创建主题topic

topic是什么概念?topic可以实现消息的分类,不同消费者订阅不同的topic

执⾏以下命令创建名为“test”的topic,这个topic只有⼀个partition,并且备份因⼦也设置为1:

//3、正确写法2.2及更好版本 --zookeeper localhost:2181 替换为 --bootstrap-server localhost:9092
./kafka-topics.sh --bootstrap-server 192.168.31.158:9092 --create --topic gwz --partitions 1 --replication-factor 1
4.查看主题详情

查询kafak中所有主题:

./kafka-topics.sh --bootstrap-server 192.168.31.158:9092 --list

查看主题详情:

./kafka-topics.sh --bootstrap-server 192.168.31.158:9092 --describe --topic gwz

删除主题
在server.properties中添加如下配置重启生效
delete.topic.enable=true

./kafka-topics.sh --bootstrap-server 192.168.31.158:9092 --delete --topic gwz
5.控制台生产

我把主机名修改为kafka了,输入命令hostname可查看主机名

 ./kafka-console-producer.sh  --topic gwz --bootstrap-server kafka:9092

6.控制台消费

⽅式⼀:从最后⼀条消息的偏移量+1开始消

 ./kafka-console-consumer.sh  --topic gwz --bootstrap-server kafka:9092

⽅式⼆:从头开始消费

 ./kafka-console-consumer.sh  --topic gwz --bootstrap-server kafka:9092 --from-beginning
7.查看主题消费偏移量
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka:9092 --topic gwz --time -1

几个注意点:

消息会被存储消息是顺序存储消息是有偏移量的消费时可以指明偏移量进⾏消费

其他具体命令参考kafka官网:运营
https://kafka.apache.org/documentation/#operations

三、Kafka中的关键细节 1.消息的顺序存储

消息的发送⽅会把消息发送到broker中,broker会存储消息,消息是按照发送的顺序进⾏存储。因此消费者在消费消息时可以指明主题中消息的偏移量。默认情况下,是从最后⼀个消
息的下⼀个偏移量开始消费。

2. 单播消息的实现

单播消息:⼀个消费组⾥ 只会有⼀个消费者能消费到某⼀个topic中的消息。于是可以创建多个消费者,这些消费者在同⼀个消费组中。

./kafka-console-consumer.sh --bootstrap-server kafka:9092 --
consumer-property group.id=myGroup --topic gwz
3.多播消息的实现

在⼀些业务场景中需要让⼀条消息被多个消费者消费,那么就可以使⽤多播模式。kafka实现多播,只需要让不同的消费者处于不同的消费组即可。

./kafka-console-consumer.sh --bootstrap-server kafka:9092 --
consumer-property group.id=myGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --
consumer-property group.id=myGroup2 --topic test
4.查看消费组及信息
# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
# 查看消费组中的具体信息:⽐如当前偏移量、最后⼀条消息的偏移量、堆积的消息数量
 ./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --
describe --group myGroup

Currennt-offset: 当前消费组的已消费偏移量Log-end-offset: 主题对应分区消息的结束偏移量(HW)Lag: 当前消费组未消费的消息数 四、主题、分区的概念 1.主题Topic

主题Topic可以理解成是⼀个类别的名称。

2.partition分区


⼀个主题中的消息量是⾮常⼤的,因此可以通过分区的设置,来分布式存储这些消息。⽐如⼀个topic创建了3个分区。那么topic中的消息就会分别存放在这三个分区中。

./kafka-topics.sh --create --bootstrap-server kafka:2181 --partitions 2 --
topic test1

分区的作用:

可以分布式存储可以并⾏写

实际上是存在data/kafka-logs/test-0 和 test-1中0000000.log⽂件中
小细节:
定期将⾃⼰消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,key是consumerGroupId+topic+分区号,value就是当前offset的值,kafka会定期清理topic⾥的消息,最后就保留最新的那条数据

因为__consumer_offsets可能会接收⾼并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),这样可以通过加机器的⽅式抗⼤并发。

通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区
公式:hash(consumerGroupId) % __consumer_offsets主题的分区数

五、Kafka集群及副本的概念 1.搭建kafka集群,3个broker

准备3个server.properties⽂件
每个⽂件中的这些内容要调整

server.properties:

broker.id=0
//kafka服务器所在地址
listeners=PLAINTEXT://192.168.65.60:9092
//数据文件存放位置
log.dir=/usr/local/data/kafka-logs

server1.properties:

broker.id=1
//kafka服务器所在地址
listeners=PLAINTEXT://192.168.65.60:9093
//数据文件存放位置
log.dir=/usr/local/data/kafka-logs-1

server2.properties:

broker.id=2
//kafka服务器所在地址
listeners=PLAINTEXT://192.168.65.60:9094
//数据文件存放位置
log.dir=/usr/local/data/kafka-logs-2

使⽤如下命令来启动3台服务器

./kafka-server-start.sh -daemon …/config/server0.properties
./kafka-server-start.sh -daemon …/config/server1.properties
./kafka-server-start.sh -daemon …/config/server2.properties

搭建完后通过查看zk中的/brokers/ids 看是否启动成功

2.副本的概念

副本是对分区的备份。在集群中,不同的副本会被部署在不同的broker上。下⾯例⼦:创建1
个主题,2个分区、3个副本。

./kafka-topics.sh --create --zookeeper kafka:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topi


通过查看主题信息,其中的关键数据:

replicas:
当前副本存在的broker节点leader:副本⾥的概念
每个partition都有⼀个broker作为leader。
消息发送⽅要把消息发给哪个broker?就看副本的leader是在哪个broker上⾯。副本⾥
的leader专⻔⽤来接收消息。
接收到消息,其他follower通过poll的⽅式来同步数据。follower:leader处理所有针对这个partition的读写请求,⽽follower被动复制leader,
不提供读写(主要是为了保证多副本数据与消费的⼀致性),如果leader所在的broker
挂掉,那么就会进⾏新leader的选举,⾄于怎么选,在之后的controller的概念中介绍。

通过kill掉leader后再查看主题情况

# kill掉leader
ps -aux | grep server.properties
kill 17631
# 查看topic情况
./kafka-topics.sh --describe --zookeeper 172.16.253.35:2181 --topic myreplicated-topic

isr:
可以同步的broker节点和已同步的broker节点,存放在isr集合中。 3.broker、主题、分区、副本

kafka集群中由多个broker组成⼀个broker中存放⼀个topic的不同partition——副本
4.kafka集群消息的发送

./kafka-console-producer.sh --broker-list
192.168.31.158:9092,192.168.31.158:9093,192.168.31.158:9094 --topic my-replicated-topic
5.kafka集群消息的消费
./kafka-console-consumer.sh --bootstrap-server
192.168.31.158:9092,192.168.31.158:9093,192.168.31.158:9094 --from-beginning --topic my-replicated-topic
6.关于分区消费组消费者的细节


图中Kafka集群有两个broker,每个broker中有多个partition。⼀个partition只能被⼀个消费组⾥的某⼀个消费者消费,从⽽保证消费顺序。Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同⼀个topic中的多个partition中保证总的消费顺序性。⼀个消费者可以消费多个partition。

消费组中消费者的数量不能⽐⼀个topic中的partition数量多,否则多出来的消费者消费不到
消息。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5705336.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存