Win10配置Kafka以及Zookeeper环境(run out of available brokers错误解决)

Win10配置Kafka以及Zookeeper环境(run out of available brokers错误解决),第1张

Win10配置Kafka以及Zookeeper环境(run out of available brokers错误解决) 背景

在某实验项目中,需要部署一个日志采集系统,其中需要用到Java环境、kafka、zookeeper中间件,其中kafka、zookeeper均部署到本地,本文主要记录环境配置过程。

  • 版本信息:
    • Java:jdk1.8.0_151
    • kafka:kafka_2.12-0.11.0.0
    • zookeeper:zookeeper-3.3.6
Java环境准备
  • 为提高下载速度,推挤国内镜像站下载,本文使用华为云镜像站:

    • java 镜像地址:https://repo.huaweicloud.com/java/jdk/
  • 配置系统环境变量

    • 建议不要装默认地址C:Program FilesJavajdk1.8.0_171,因为其带有空格
      • (不过也有博文说不用care这一条)
    • 配置JAVA_HOME:C:Javajdk1.8.0_151
    • 配置ClassPath环境变量:.;%JAVA_HOME%libdt.jar;%JAVA_HOME%libtools.jar;
      • 注意前面有“.”和“;”一定是英文状态下的符号
    • 在Path中添加:
      • C:Javajdk1.8.0_151bin
      • C:Javajre1.8.0_151bin
  • Java环境测试

    • 点击键盘Windows+r键,打开运行提示框输入cmd按回车键打开命令框,输入 java -version
    • 若能正常显示,则表示安装成功。
安装 Zookeeper

历史版本检索:https://archive.apache.org/dist/zookeeper/

注意,此处下载的均是免安装版的,windows 和 Linux 均有编译好的程序,可直接运行

本文下载的是:zookeeper-3.3.6.tar.gz

下载好后,还不能直接使用,需要配置一下配置文件

  • 先将zoo_sample.cfg拷贝一份,并命名为zoo.cfg,修改:zoo.cfg

     # The number of milliseconds of each tick
     tickTime=2000
     # The number of ticks that the initial 
     # synchronization phase can take
     initLimit=10
     # The number of ticks that can pass between 
     # sending a request and getting an acknowledgement
     syncLimit=5
     # the directory where the snapshot is stored.
     # dataDir=/tmp/zookeeper
     # 添加本地日志文件目录  <--------------------------- 只用修改这里
     dataDir=C:/ProgramData/ToolLog/zookeeper/data
     dataLogDir=C:/ProgramData/ToolLog/zookeeper/log
     # the port at which the clients will connect
     clientPort=2181
    
  • 修改保存后,在zookeeperbin目录下,双击即可运行zkServer.cmd

安装 Kafka_2.12-0.11.0.0

下载链接:https://kafka.apache.org/downloads.html

下载好后,修改配置文件:config目录下的server.properties

修改log.dirs为D:kafka_logs(自定义目录完整路径)

返回到解压文件夹下,执行:

  • .binwindowskafka-server-start.bat .configserver.properties

至此,环境配置完毕。

测试程序

本测试程序基于Go语言开发,程序功能为向kafka中的某个话题发送数据消息。

需要说明的是:在创建生产者对象时,需要指明环境中Kafka版本号,否则会导致创建失败!!!!!!!

  • 关键语句

    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll
    	config.Producer.Partitioner = sarama.NewRandomPartitioner
    	config.Producer.Return.Successes = true
    	 
    	config.Version = sarama.V0_11_0_0
    
  • 错误提示

    producer close, err: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

  • 完整测试程序

    package main
    
    import (
    	"fmt"
    	"github.com/Shopify/sarama"
    )
    
    func main() {
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll
    	config.Producer.Partitioner = sarama.NewRandomPartitioner
    	config.Producer.Return.Successes = true
    	config.Version = sarama.V0_11_0_0   //  <-------  注意这里需要选择自己环境中kafka版本,否则程序会运行失败
    
    	msg := &sarama.ProducerMessage{}
    	msg.Topic = "nginx_log"
    	msg.Value = sarama.StringEncoder("this is a good test, my message is good")
    
    	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    	if err != nil {
    		fmt.Println("producer close, err:", err)
    		return
    	}
    
    	defer client.Close()
    
    	pid, offset, err := client.SendMessage(msg)
    	if err != nil {
    		fmt.Println("send message failed,", err)
    		return
    	}
    
    	fmt.Printf("pid:%v offset:%vn", pid, offset)
    }
    

参考文献:
https://blog.csdn.net/weixin_41402352/article/details/84325136
https://www.cnblogs.com/wangzhaobo/p/14345281.html

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5572677.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存