1、前置条件
(1)、kafka 3.0官网提示必须要Java 8+的支持,因此这里本文安装Java 9,教程自行查找
(2)、kafka的功能实现依赖zookeeper,故需要先搭建下zookeeper集群,zookeeper集群搭建参考:
Centos7 搭建zookeeper集群Centos7 搭建zookeeper集群https://blog.csdn.net/qq_21875331/article/details/1217555342、节点规划
zookeeper使用四个节点:node01~04
kafka使用:node01~03
3、下载kafka
Kafka Downloads,该文章使用的版本为3.0
4、集群搭建
# 解压压缩包 tar -zxvf kafka_2.13-3.0.0.tgz # 把解压文件夹移动到/usr/local/下 mv kafka_2.13-3.0.0 /usr/local/ # 进入kafka配置目录 cd /usr/local/kafka_2.13-3.0.0/conf # 修改server.properties配置 broker.id=0 # node01node02node03 分别设置为0,1,2 # 打开监听配置,为了创建topic时能监听到 node01,node02,node03都要修改 listeners=PLAINTEXT://node01:9092 log.dirs=/var/kafka-logs # 修改日志保存路径 # zookeeper 使用node02,node03,node04 zookeeper.connect=node02:2181,node03:2181,node04:2181/kafka # 添加环境变量 vi /etc/profile # 新增环境变量配置 export KAFKA_HOME=/usr/local/kafka_2.13-3.0.0 export PATH=$PATH:$KAFKA_HOME/bin # 使得环境变量立刻生效 source /etc/profile
同步node01配置好的文件到其他节点
# 拷贝kafka安装文件目录到node02,node03(当前所在目录/usr/local) scp -r ./kafka_2.13-3.0.0 root@node02:`pwd` scp -r ./kafka_2.13-3.0.0 root@node03:`pwd` # 拷贝设置好的环境变量文件到node2、node3 scp -r /etc/profile root@node02:/etc/ scp -r /etc/profile root@node03:/etc/ # 分别修改node02、node03上kafka的配置文件server.properties vi server.properties broker.id=1 # node02 broker.id=2 # node03 # 分别使得node02,node03环境变量生效 source /etc/profile # 如果开启了防火墙,为保证服务启动能通,分别开放node01~02的2181端口号,没开启略过,我这里没开启 firewall-cmd --zone=public --add-port=2181/tcp --permanent firewall-cmd --reload
启动服务
# 分别启动node02~04节点上的zookeeper服务 zkServer.sh start # 查看下状态确保服务启动正常 zkServer.sh status # 启动kafka服务(node01,node02,node03) kafka-server-start.sh ./server.properties kafka-server-start.sh ./server.properties & # 后台启动 # 停止kafka服务的命令 kafka-server-stop.sh
5、kafaka集群简单使用
# 命令行创建topic,并制定分区为2,以及分区的副本数2(确保server。properties中listens监听的端口打开) kafka-topics.sh --create --bootstrap-server node02:9092,node03:9092 --replication-factor 2 --partitions 2 --topic xxoo # 查看topic列表 kafka-topics.sh --bootstrap-server node01:9092,node02:9092 --list # 查看具体topic描述信息 kafka-topics.sh --bootstrap-server node01:9092,node02:9092 --describe --topic xxoo # 新开窗口,在node03上启动consumer组 kafka-console-consumer.sh --topic xxoo --group ac --from-beginning --bootstrap-server node03:9092 # 新开窗口,在node01,node02上启动producer组(一个生产者生产的消息只能被一个topic下的一个consumer消费) kafka-console-producer.sh --topic xxoo --bootstrap-server node01:9092 kafka-console-producer.sh --topic xxoo --bootstrap-server node02:9092 # 查看组详情 kafka-consumer-groups.sh --bootstrap-server node01:9092 --all-groups --describe # 查看组列表 kafka-consumer-groups.sh --bootstrap-server node01:9092 --all-groups --list # 删除消费组 kafka-consumer-groups.sh --bootstrap-server node01:9092 --delete --group ac
6、安装kafka管理工具
(1)、下载EFAK
(2)、安装
# 创建安装文件,这里放在kafka安装目录下,目录可自定义 mkdir /usr/local/kafka_2.13-3.0.0/efak # 创建数据库存放目录 mkdir /usr/local/kafka_2.13-3.0.0/efak/db # 解压到安装目录 tar -zxvf efak-web-2.0.8-bin.tar.gz -C /usr/local/kafka_2.13-3.0.0/efak/ # 设置环境变量 vi /etc/profile # 新增环境变量 export KE_HOME=/usr/local/kafka_2.13-3.0.0/efak/efak-web-2.0.8 export PATH=$PATH:$KE_HOME/bin # 使得环境变量生效 source /etc/profile
修改efak配置文件,找到conf目录的system-config.properties文件
注意:中文标注点为需要修改的点
###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead ###################################### # 设置集群别名,这里只有一个集群使用一个即可 efak.zk.cluster.alias=cluster1 # 设置zookeeper集群地址,注意要和kafka配置文件中集群配置保持一致 cluster1.zk.list=10.118.241.55:2181,10.118.241.134:2181,10.118.241.85:2181/kafka #cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181 ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.efak.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=32 ###################################### # EFAK webui port ###################################### efak.webui.port=8048 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.efak.jmx.acl=false cluster1.efak.jmx.user=keadmin cluster1.efak.jmx.password=keadmin123 cluster1.efak.jmx.ssl=false cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore cluster1.efak.jmx.truststore.password=ke123456 ###################################### # kafka offset storage ###################################### cluster1.efak.offset.storage=kafka #cluster2.efak.offset.storage=zk ###################################### # kafka jmx uri ###################################### cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default ###################################### efak.metrics.charts=true efak.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### efak.sql.topic.records.max=5000 efak.sql.topic.preview.records.max=10 ###################################### # delete kafka topic token ###################################### efak.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.efak.sasl.enable=false cluster1.efak.sasl.protocol=SASL_PLAINTEXT cluster1.efak.sasl.mechanism=SCRAM-SHA-256 cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle"; cluster1.efak.sasl.client.id= cluster1.efak.blacklist.topics= cluster1.efak.sasl.cgroup.enable=false cluster1.efak.sasl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### efak.driver=org.sqlite.JDBC # 配置sqlite数据库存放目录,目录必须存在 efak.url=jdbc:sqlite:/usr/local/kafka_2.13-3.0.0/efak/db/ke.db efak.username=root efak.password=123456 ###################################### # kafka mysql jdbc driver address ###################################### #efak.driver=com.mysql.cj.jdbc.Driver #efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull #efak.username=root #efak.password=123456
修改完成保存退出
# 进入启动命令文件夹 cd /usr/local/kafka_2.13-3.0.0/efak/efak-web-2.0.8/bin # 授权 chmod 777 ke.sh # 启动 服务 ke.sh start # 重启 ke.sh restart # 停止服务 ke.sh stop
控制台打印启动成功信息
浏览器访问显示集群信息
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)