kafka集群部署

kafka集群部署,第1张

kafka集群只要解压后修改各节点的broker.id,配置zookeeper连接后分别启动各节点kafka即可。

前提:jdk和zookeeper安装

参考jdk安装和zookeeper集群安装

下载并解压
wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz
tar -zxvf kafka_2.11-2.1.1.tgz
修改配置文件各节点的broker.id
cd  kafka_2.11-2.1.1
vi config/server.properties

#broker的全局唯一编号,各节点不能重复
broker.id=1
#删除topic功能开启
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/MPP/kafka_2.11-2.1.1/logs
#配置连接Zookeeper集群地址
zookeeper.connect=10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka
#ip地址为发送消息端连接kafka的ip地址,各节点各自的ip
advertised.listeners=PLAINTEXT://10.37.62.98:9092
advertised.host.name=10.37.62.99
#默认超时时间太短需要修改大一些
zookeeper.connection.timeout.ms=60000

配置环境变量

vi ~/.bash_profile

## KAFKA ##
export KAFKA_HOME=/opt/MPP/kafka_2.11-2.1.1
export PATH=$PATH:$KAFKA_HOME/bin
## KAFKA ##
#立即生效环境变量
source ~/.bash_profile
启动集群:分别配启动个节点

kafka_2.11-2.1.1/bin/kafka-server-start.sh /opt/MPP/kafka_2.11-2.1.1/config/server.properties &

停止kafka

kafka_2.11-2.1.1/bin/kafka-server-stop.sh

查看日志

tail -f kafka_2.11-2.1.1/logs/server.log

测试
1)查看当前服务器中的所有topic
 kafka-topics.sh --zookeeper  10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka --list

2)创建topic

kafka-topics.sh --zookeeper 10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka --create --replication-factor 1 --partitions 1 --topic first
选项说明:
--topic 定义topic名
--replication-factor  定义副本数
--partitions  定义分区数

3)删除topic
	kafka-topics.sh --zookeeper 10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka --delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除。
 
4)发送消息
 kafka-console-producer.sh --broker-list 10.37.62.99:9092 --topic first

5)消费消息
kafka-console-consumer.sh --bootstrap-server 10.37.62.99:9092 --topic first
# from-beginning:会把主题中以往所有的数据都读取出来。
kafka-console-consumer.sh --bootstrap-server 10.37.62.99:9092 --from-beginning --topic first

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/872152.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存