CentOS7搭建Kafka集群

CentOS7搭建Kafka集群,第1张

目录

一、Kafka单机【依赖单机的单节点Zookeeper】

二、KafKa集群搭建 

三、Kafka Manager(web页面管理)的搭建

四、使用本地客户端测试MQ基本 *** 作


首先kafka的配置信息等信息依赖zookeeper,这里可以根据情况选择搭建zookeeper的形式(单机、一台服务器上搭建的集群、以及三台服务器搭建的集群等),可以参考:CentOS7搭建Zookeeper集群https://blog.csdn.net/it_lihongmin/article/details/124634608

一、Kafka单机【依赖单机的单节点Zookeeper】

kafka下载地址为:Apache Kafka,这里选择:kafka_2.13-3.0.0.tgz版本,百度云盘链接: https://pan.baidu.com/s/1ve4Caq0eDtTOnPZHC237iA?pwd=gbfehttps://pan.baidu.com/s/1ve4Caq0eDtTOnPZHC237iA?pwd=gbfe

进入服务器下载:wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz,或者可以直接使用已经下载的文件,进行上传。

 解压:tar -zxvf kafka_2.13-3.0.0.tgz

 进入目录根据情况修改配置信息:

  • zookeeper.connect: zookeeper地址,zookeeper集群多个使用逗号间隔;默认就是本机的zookeeper默认端口2181,如果就只是启动本地的zookeeper都不用修改了
  • zookeeper.connect.timeout.ms: 比较清楚了,链接超时时间,单号毫秒

 更多配置可以参考:

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口

可以使用命令查看zookeeper的2181端口的启动情况:netstat -anp| grep 2181

启动kafka服务:./kafka-server-start.sh ../config/server.properties &

查看kafka进程:ps -ef| grep kafka 

启动后使用自带的客户端根据下面的 *** 作,试着收发MQ。。。

二、KafKa集群搭建 

这里三台服务器分别是(并且zookeeper也是这三台服务器,并且基于上面的Zookeeper集群搭建的博客进行搭建的):47.94.140.208、82.156.54.7、39.107.127.88,与单体服务搭建一样,首先 wget下载 压缩包,或者上传压缩包到想要的位置,然后进行解压。修改配置文件:

【节点一】

roker.id=1
zookeeper.connect=47.94.140.208:2181,82.156.54.7:2181,39.107.127.88:2181

【节点二】

broker.id=2
zookeeper.connect=47.94.140.208:2181,82.156.54.7:2181,39.107.127.88:2181

【节点三】

broker.id=2
zookeeper.connect=47.94.140.208:2181,82.156.54.7:2181,39.107.127.88:2181

然后三台服务都进行启动:./kafka-server-start.sh -daemon ../config/server.properties &

过一会儿查看三台服务器是否还正常(因为启动的时候可能正常,过一会儿心跳等如果有问题则会进程终止掉):ps -ef| grep kafka 

 同样的还是可以使用下面的客户端命令去收发一下MQ

三、Kafka Manager(web页面管理)的搭建

kafka可以个web管理界面可选的有:kafka-manager、kafka-eagle。。。这里选择比较经典的kafka-manager,kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理 *** 作,github地址为:https://github.com/yahoo/kafka-manager,可选择的下载包地址为:https://github.com/yahoo/CMAK/releases,这里选择的版本为:3.0.0.2

官方可下载的已编译的版本,都使用的jdk11编译,所以这里也使用jdk11环境

yum install -y java-11-openjdk.x86_64

可以选择的版本有:

如果当前服务器使用的jdk8,则也可以使用早期版本包(解压后启动命令将 cmak换成kafka-manager即可):

上传至服务器位置,这里为:/home/mosty/kafka-manager

解压:unzip cmak-3.0.0.2.zip

 进入解压目录下的 config目录,修改application.conf文件,主要是修改zookeeper地址:

kafka-manager.zkhosts="47.94.140.208:2181,82.156.54.7:2181,39.107.127.88:2181"

 

 启动 kafka-manager:

进入bin目录,执行:./cmak

后台启动执行: nohup cmak &

指定配置文件和端口启动:./cmak -Dconfig.file=/home/mosty/kafka-manager/cmak-3.0.0.2/conf/application.conf -Dhttp.port=9123

 最后可以使用 netstat -anp| grep 9123 查看是否启动端口

 启动后记得关闭防火墙或者开放指定端口,并且如果是阿里云、华为云等云服务器需要在配置中开放 端口,然后访问页面如下:

四、使用本地客户端测试MQ基本 *** 作

1、创建topic

./kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 3 --partitions 6 --topic kfk_kevin_test

说明:

  • --bootstrap-server localhost:9092 指定使用broker的zookeeper;
  • --create --topic kfk_kevin_test 创建指定名称的topic;
  • --replication-factor 1 指定副本数,但是副本数不能大于broker数,否则会报如下的错;
  • --partitions 1 指定分区数;

 2、列出所有的队列

./kafka-topic.sh --bootstrap-server localhost:9092 --list

3、发送MQ

发送mq:./kafka-console-producer.sh -broker-list localhost:9092 --topic kfk_kevin_test

4、接收MQ

接收mq:./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic kfk_kevin_test

 5、查看topic信息

./kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic kfk_kevin_test

6、删除topic

./kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic kfk_kevin_test

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/919991.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存