虚机:172.31.213.48, 172.31.213.39, 172.31.213.35
软件版本:
kafka: kafka_2.13-2.7.1.tgz
下载链接:https://archive.apache.org/dist/kafka/2.7.1/kafka_2.13-2.7.1.tgz
zookeeper: apache-zookeeper-3.6.3-bin.tar.gz (不要下载source版本,否则需要自行编译后才能启动)
下载链接:https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
二、zookeeper集群搭建首先,将172.31.213.48的公钥配置到 172.31.213.39和 172.31.213.35, 这样先配置172.31.213.48,然后将修改好的文件scp 到另外两台机器,只需在另外两台机器上修改myid等特定标识,极大的减轻集群搭建的工作量。
将apache-zookeeper-3.6.3-bin.tar.gz 解压到 /tools/mykafka/zookeeper-3.6.3-bin目录下, 进入config目录,
拷贝配置样本 zoo_sample.cfg 为 zoo.cfg 并进行修改:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/data/zookeeper dataLogDir=/data/zookeeperLog clientPort=2181 # server.1 这个1是服务器的标识,可以是任意有效数字,标识这是第几个服务器节点,这个标识要写到dataDir目录下面myid文件里 # 指名集群间通讯端口和选举端口 server.1=172.31.213.48:2288:3388 server.2=172.31.213.26:2288:3388 server.3=172.31.213.40:2288:3388
配置参数说明:
- tickTime:用于计算的基础时间单元。比如 session 超时:N*tickTime;
- initLimit:用于集群,允许从节点连接并同步到 master 节点的初始化连接时间,以 tickTime 的倍数来表示;
- syncLimit:用于集群, master 主节点与从节点之间发送消息,请求和应答时间长度(心跳机制);
- dataDir:数据存储位置;
- dataLogDir:日志目录;
- clientPort:用于客户端连接的端口,默认 2181
修改完配置后再172.31.213.48 机器上使用 :
scp -r mykakfa nucc@172.31.213.26:/tools
scp -r mykakfa nucc@172.31.213.26:/tools
将文件远程拷贝到另外两台机器。
标识节点分别在三个节点的数据存储目录下新建 myid 文件,并写入对应的节点标识。Zookeeper 集群通过 myid 文件识别集群节点,并通过上文配置的节点通信端口和选举端口来进行节点通信,选举出 leader 节点。
在 dataDir=/data/zookeeper 目录下,创建myid文件,并写入id , 172.31.213.48 写入1, 172.31.213.26写入2,172.31.213.46写入3。
启动集群在三台机器上,分别执行
sh /tools/mykafka/zookeeper-3.6.3-bin/bin/zkServer.sh start
启动zookeeper
集群验证使用 jps 查看进程,并且使用 zkServer.sh status 查看集群各个节点状态。如图三个节点进程均启动成功,并且两个节点为 follower 节点,一个节点为 leader 节点。
二、kafka集群搭建同样先配置172.31.213.48机器,然后将修改后的配置文件远程拷贝到另外两台机器。
修改配置文件将 kafka_2.13-2.7.1.tgz 下载解压到 /tools/mykafka/kafka路径下,修改config目录下的server.properties配置文件:
# The id of the broker. 集群中每个节点的唯一标识 broker.id=1 //与zookeeper的myid保持一致即可。172.31.213.39的broker.id=2,172.31.213.39 的broker.id=3 #允许删除topic delete.topic.enable=true #允许自动创建topic auto.create.topics.enable=true # 监听地址 listeners=PLAINTEXT://172.31.213.48:9092 //写成本机真实的IP #topic中数据保存时间,这里设置10分钟 log.retention.minutes=10 # 数据的存储位置 log.dirs=/data/kafka-logs # Zookeeper连接地址 zookeeper.connect=172.31.213.48:2181,172.31.213.48:2181,172.31.213.48:2181
这里需要说明的是 log.dirs 指的是数据日志的存储位置,确切的说,就是分区数据的存储位置,而不是程序运行日志的位置。程序运行日志的位置是通过同一目录下的 log4j.properties 进行配置的。
同样将修改好的文件scp到另外两台机器上,并修改对应的broker.id
启动集群在三台机器上使用以下命令分别启动kafka , 可以使用 jps 查看进程,每台机器上应该有一个kafka和一个zookeeper进程。
sh /tools/mykafka/kafka/bin/kafka-server-start.sh ../config/server.properties验证
创建测试主题:
sh /tools/mykafka/kafka/bin/kafka-topics.sh --create --bootstrap-server 172.31.213.48:9092 --replication-factor 3 --partitions 10 --topic record-general
创建后可以使用以下命令查看创建的主题信息:
sh /tools/mykafka/kafka/bin/kafka-topics.sh --describe --bootstrap-server 172.31.213.48:9092 --topic record-general三、参考链接:
https://juejin.cn/post/6844903949779091470?share_token=3c9f40c5-7e74-4cc8-ab4d-4a9d466c88ab
四、遇到的问题kafka producer发送消息 Failed to update metadata after 60000ms问题
网上博客一般给出四个原因:
-
/etc/hosts中的主机ip映射的hostname与配置的listeners中的hostname不一致
-
未开放防火墙端口或关闭防火墙
-
未指定kafka对外提供服务入口地址
编辑kafka的config目录下的server.properties,添加 对外提供服务入口地址:
注意:此ip为kafka所在主机的ip地址 listeners=PLAINTEXT://192.168.5.228:9092 12
4.项目中的kafka版本与服务器上安装的kafka版本不一致
修改maven的pom.xml文件,指定对应的kafka坐标,
我安装的是kafka_2.12-2.1.0,对应坐标为:
org.apache.kafka kafka_2.110.10.0.0
经过反复排查,均不是上述原因,通过一下博客内容,对比springboot的kafka配置,发现是生产者kafka配置问题:
Springboot集成kafka
https://blog.csdn.net/weixin_43914685/article/details/113804393?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link
spring配置文件:
spring.kafka.bootstrap-servers: 192.168.1.108:9092,192.168.1.109:9092,192.168.1.110:9092
#指定kafka server的地址,集群配多个用逗号隔开
注意:bootstrap-servers: IP:9092, 不要写成 http://IP:9092, 否则生产者报 Failed to update metadata after 60000ms 的异常!
五、kafka架构1、结构名词解释:
1)Producer :消息生产者,就是向kafka broker发消息的客户端;
2)Consumer :消息消费者,向kafka broker取消息的客户端;
3)Topic :可以理解为一个队列;
4) Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
5)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;
7)Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。
参考链接:
https://juejin.cn/post/6915285027123265544
https://juejin.cn/post/6965401216599736351
https://juejin.cn/post/6844904050064883725
https://blog.csdn.net/student__software/article/details/81486431
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)