- kafka环境搭建
- 单节点
- 集群
测试环境
centos 7.8.2003
JDK安装
下载地址: https://www.oracle.com/java/technologies/downloads/
当前下载路径:https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
下载安装:
$ curl -O https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz $ tar -xvf jdk-17_linux-x64_bin.tar.gz #jdk-17.0.1 $ echo "export JAVA_HOME=`pwd`/jdk-17.0.1" >>/etc/profile $ echo 'export CLASSPATH=$:CLASSPATH:$JAVA_HOME/lib/' >>/etc/profile $ echo 'export PATH=$PATH:$JAVA_HOME/bin' >>/etc/profile $ source /etc/profile $ java -version java version "17.0.1" 2021-10-19 LTS Java(TM) SE Runtime Environment (build 17.0.1+12-LTS-39) Java HotSpot(TM) 64-Bit Server VM (build 17.0.1+12-LTS-39, mixed mode, sharing)单节点
下载安装
#下载地址:http://kafka.apache.org/downloads #当前版本:2.8.1 $ curl -O https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz $ tar -xvf kafka_2.13-2.8.1.tgz #进入kafka目录 $ cd kafka_2.13-2.8.1
配置文件修改
$ vi config/server.properties #添加监听的ip端口配置 listeners=PLAINTEXT://:9092 #日志存储目录 log.dirs= /kafka-logs
启动
#先启动zookeeper $ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties #然后启动kafka $ ./bin/kafka-server-start.sh ./config/server.properties
zookeeper查看节点信息
$ ./bin/zookeeper-shell.sh localhost:2181 #输入如下命令(id为kafka配置文件中的broker.id): #不知道id可以先查看 #跟多命令可以使用help查看 > ls /brokers/ids [0] > get /brokers/ids/0 {"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://a4b96b9307b2:9092"],"jmx_port":-1,"port":9092,"host":"a4b96b9307b2","version":5,"timestamp":"1634886207746"} # 查看topic分区信息 > ls /brokers/topics//partitions
基于脚本的kafka *** 作
所有脚本可在下载的bin目录下找到
官网介绍:https://kafka.apache.org/documentation/#configuration
#主题(topic) #测试主题名test #查看topic列表 $ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092 #创建topic, --partitions: 分区数,--replication-factor: 副本数,不能超过broker数量 $ ./bin/kafka-topics.sh --bootstrap-server:9092 --create --topic test --partitions 20 --replication-factor 3 --config x=y #修改topic配置和分区配置 #分区只能增加,不支持减少 $ ./bin/kafka-topics.sh --bootstrap-server :9092 --alter --topic test --partitions 30 #修改配置 #增加配置 $ ./bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name test --alter --add-config x=y #删除配置 $ ./bin/kafka-configs.sh --bootstrap-server :9092 --entity-type topics --entity-name test --alter --delete-config x #删除主题 $ ./bin/kafka-topics.sh --bootstrap-server :9092 --delete --topic test #获取分区最新offset $ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list :9092 --topic test
消费测试
#启动消费者,消费消息 ./bin/kafka-console-consumer.sh --bootstrap-server集群:9092 --topic test1 --group group1 #启动生产者,生产消息 ./bin/kafka-console-producer.sh --bootstrap-server :9092 --topic test1
zookeeper使用kafka包里自带的,集群最少需要三台服务器,且官方强烈建议使用奇数台服务器。
zookeeper配置:
更多配置参考官网(版本3.7.0):https://zookeeper.apache.org/doc/r3.7.0/zookeeperStarted.html
$ vi config/zookeeper.properties #注释可能不准确,原文解释可查看官网 #zookeeper使用的基本时间单位,单位是ms,用于心跳,最小会话超时将是tickTime 的两倍 tickTime=2000 #简单理解为保存数据的目录 dataDir=/var/lib/zookeeper #监听端口 clientPort=2181 #连接到主节点的超时时间,使用tickTime为时间单位,即超时时间为2000 * 5 = 10000ms = 10s initLimit=5 #从节点与主节点同步的超时时间,使用tickTime为时间单位,即超时时间为2000 * 2 = 4000ms = 4s syncLimit=2 #集群信息,第一个ip用于从节点与主节点进行通信,第二个ip用于选举主节点,都是tcp通信 server.1=:2888:3888 server.2= :2888:3888 server.3= :2888:3888 #创建zookeeper集群需要的myid文件(文件路径为配置文件中的 ), 然后在里面添加一个数字来标识此节点,与kafka broker id类似 $ echo 1 >> /myid
kafka配置:
$ vi config/server.properties #broker id,每个节点需设置不同的值 broker.id=0 #添加zookeeper集群信息 zookeeper.connect=:2181, :2181, :2181
kafka官方建议需配置项:
# ZooKeeper zookeeper.connect=[list of ZooKeeper servers] # Log configuration num.partitions=8 default.replication.factor=3 # 不能大于集群数量 log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).] # Other configurations broker.id=[An integer. Start with 0 and increment by 1 for each new broker.] listeners=[list of listeners] auto.create.topics.enable=false min.insync.replicas=2 queued.max.requests=[number of concurrent requests]
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)