kafka系列一:集群搭建

kafka系列一:集群搭建,第1张

kafka系列一:集群搭建 kafka集群 搭建记录 一、环境配置

虚机:172.31.213.48, 172.31.213.39, 172.31.213.35

软件版本:

kafka: kafka_2.13-2.7.1.tgz

下载链接:https://archive.apache.org/dist/kafka/2.7.1/kafka_2.13-2.7.1.tgz

zookeeper: apache-zookeeper-3.6.3-bin.tar.gz (不要下载source版本,否则需要自行编译后才能启动)

下载链接:https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz

二、zookeeper集群搭建

首先,将172.31.213.48的公钥配置到 172.31.213.39和 172.31.213.35, 这样先配置172.31.213.48,然后将修改好的文件scp 到另外两台机器,只需在另外两台机器上修改myid等特定标识,极大的减轻集群搭建的工作量。

将apache-zookeeper-3.6.3-bin.tar.gz 解压到 /tools/mykafka/zookeeper-3.6.3-bin目录下, 进入config目录,

拷贝配置样本 zoo_sample.cfg 为 zoo.cfg 并进行修改:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
dataLogDir=/data/zookeeperLog
clientPort=2181

# server.1 这个1是服务器的标识,可以是任意有效数字,标识这是第几个服务器节点,这个标识要写到dataDir目录下面myid文件里
# 指名集群间通讯端口和选举端口
server.1=172.31.213.48:2288:3388
server.2=172.31.213.26:2288:3388
server.3=172.31.213.40:2288:3388

配置参数说明:

  • tickTime:用于计算的基础时间单元。比如 session 超时:N*tickTime;
  • initLimit:用于集群,允许从节点连接并同步到 master 节点的初始化连接时间,以 tickTime 的倍数来表示;
  • syncLimit:用于集群, master 主节点与从节点之间发送消息,请求和应答时间长度(心跳机制);
  • dataDir:数据存储位置;
  • dataLogDir:日志目录;
  • clientPort:用于客户端连接的端口,默认 2181

修改完配置后再172.31.213.48 机器上使用 :

scp -r mykakfa nucc@172.31.213.26:/tools

scp -r mykakfa nucc@172.31.213.26:/tools

将文件远程拷贝到另外两台机器。

标识节点

分别在三个节点的数据存储目录下新建 myid 文件,并写入对应的节点标识。Zookeeper 集群通过 myid 文件识别集群节点,并通过上文配置的节点通信端口和选举端口来进行节点通信,选举出 leader 节点。

在 dataDir=/data/zookeeper 目录下,创建myid文件,并写入id , 172.31.213.48 写入1, 172.31.213.26写入2,172.31.213.46写入3。

启动集群

在三台机器上,分别执行

sh  /tools/mykafka/zookeeper-3.6.3-bin/bin/zkServer.sh start

启动zookeeper

集群验证

使用 jps 查看进程,并且使用 zkServer.sh status 查看集群各个节点状态。如图三个节点进程均启动成功,并且两个节点为 follower 节点,一个节点为 leader 节点。

二、kafka集群搭建

同样先配置172.31.213.48机器,然后将修改后的配置文件远程拷贝到另外两台机器。

修改配置文件

将 kafka_2.13-2.7.1.tgz 下载解压到 /tools/mykafka/kafka路径下,修改config目录下的server.properties配置文件:

# The id of the broker. 集群中每个节点的唯一标识
broker.id=1 
//与zookeeper的myid保持一致即可。172.31.213.39的broker.id=2,172.31.213.39 的broker.id=3

#允许删除topic
delete.topic.enable=true 
    
#允许自动创建topic
auto.create.topics.enable=true       
    
# 监听地址
listeners=PLAINTEXT://172.31.213.48:9092     //写成本机真实的IP

#topic中数据保存时间,这里设置10分钟
log.retention.minutes=10
    
# 数据的存储位置
log.dirs=/data/kafka-logs

# Zookeeper连接地址
zookeeper.connect=172.31.213.48:2181,172.31.213.48:2181,172.31.213.48:2181

这里需要说明的是 log.dirs 指的是数据日志的存储位置,确切的说,就是分区数据的存储位置,而不是程序运行日志的位置。程序运行日志的位置是通过同一目录下的 log4j.properties 进行配置的。

同样将修改好的文件scp到另外两台机器上,并修改对应的broker.id

启动集群

在三台机器上使用以下命令分别启动kafka , 可以使用 jps 查看进程,每台机器上应该有一个kafka和一个zookeeper进程。

sh  /tools/mykafka/kafka/bin/kafka-server-start.sh  ../config/server.properties
验证

创建测试主题:

sh  /tools/mykafka/kafka/bin/kafka-topics.sh --create --bootstrap-server 172.31.213.48:9092 --replication-factor 3 --partitions 10 --topic record-general

创建后可以使用以下命令查看创建的主题信息:

sh  /tools/mykafka/kafka/bin/kafka-topics.sh --describe --bootstrap-server 172.31.213.48:9092 --topic record-general
三、参考链接:

https://juejin.cn/post/6844903949779091470?share_token=3c9f40c5-7e74-4cc8-ab4d-4a9d466c88ab

四、遇到的问题

kafka producer发送消息 Failed to update metadata after 60000ms问题

网上博客一般给出四个原因:

  1. /etc/hosts中的主机ip映射的hostname与配置的listeners中的hostname不一致

  2. 未开放防火墙端口或关闭防火墙

  3. 未指定kafka对外提供服务入口地址

    编辑kafka的config目录下的server.properties,添加 对外提供服务入口地址:

    注意:此ip为kafka所在主机的ip地址
    listeners=PLAINTEXT://192.168.5.228:9092
    12
    

4.项目中的kafka版本与服务器上安装的kafka版本不一致
修改maven的pom.xml文件,指定对应的kafka坐标,
我安装的是kafka_2.12-2.1.0,对应坐标为:


      org.apache.kafka
      kafka_2.11
      0.10.0.0

经过反复排查,均不是上述原因,通过一下博客内容,对比springboot的kafka配置,发现是生产者kafka配置问题:

Springboot集成kafka

https://blog.csdn.net/weixin_43914685/article/details/113804393?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link

spring配置文件:

spring.kafka.bootstrap-servers: 192.168.1.108:9092,192.168.1.109:9092,192.168.1.110:9092

#指定kafka server的地址,集群配多个用逗号隔开

注意:bootstrap-servers:  IP:9092,   不要写成 http://IP:9092,  否则生产者报  Failed to update metadata after 60000ms 的异常!

五、kafka架构

1、结构名词解释:
1)Producer :消息生产者,就是向kafka broker发消息的客户端;

2)Consumer :消息消费者,向kafka broker取消息的客户端;

3)Topic :可以理解为一个队列;

4) Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;

5)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;

6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;

7)Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

参考链接:

https://juejin.cn/post/6915285027123265544

https://juejin.cn/post/6965401216599736351

https://juejin.cn/post/6844904050064883725

https://blog.csdn.net/student__software/article/details/81486431

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5654477.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存