kafka集群只要解压后修改各节点的broker.id,配置zookeeper连接后分别启动各节点kafka即可。
前提:jdk和zookeeper安装参考jdk安装和zookeeper集群安装
下载并解压wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz
tar -zxvf kafka_2.11-2.1.1.tgz
修改配置文件各节点的broker.id
cd kafka_2.11-2.1.1
vi config/server.properties
#broker的全局唯一编号,各节点不能重复
broker.id=1
#删除topic功能开启
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/MPP/kafka_2.11-2.1.1/logs
#配置连接Zookeeper集群地址
zookeeper.connect=10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka
#ip地址为发送消息端连接kafka的ip地址,各节点各自的ip
advertised.listeners=PLAINTEXT://10.37.62.98:9092
advertised.host.name=10.37.62.99
#默认超时时间太短需要修改大一些
zookeeper.connection.timeout.ms=60000
vi ~/.bash_profile
## KAFKA ##
export KAFKA_HOME=/opt/MPP/kafka_2.11-2.1.1
export PATH=$PATH:$KAFKA_HOME/bin
## KAFKA ##
#立即生效环境变量
source ~/.bash_profile
启动集群:分别配启动个节点
kafka_2.11-2.1.1/bin/kafka-server-start.sh /opt/MPP/kafka_2.11-2.1.1/config/server.properties &
停止kafkakafka_2.11-2.1.1/bin/kafka-server-stop.sh
查看日志tail -f kafka_2.11-2.1.1/logs/server.log
测试1)查看当前服务器中的所有topic
kafka-topics.sh --zookeeper 10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka --list
2)创建topic
kafka-topics.sh --zookeeper 10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka --create --replication-factor 1 --partitions 1 --topic first
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
3)删除topic
kafka-topics.sh --zookeeper 10.37.62.95:2181,10.37.62.96:2181,10.37.62.97:2181/kafka --delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除。
4)发送消息
kafka-console-producer.sh --broker-list 10.37.62.99:9092 --topic first
5)消费消息
kafka-console-consumer.sh --bootstrap-server 10.37.62.99:9092 --topic first
# from-beginning:会把主题中以往所有的数据都读取出来。
kafka-console-consumer.sh --bootstrap-server 10.37.62.99:9092 --from-beginning --topic first
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)