在本文中,您将学习:
- 使用docker启动本地环境kafka
- 使用go-kafka链接本地kafka进行消息生产和消费
https://github.com/sl40/go-babysit
git clone https://github.com/sl40/go-babysit.git启动本地环境kafka
要在开发设置中使用 Kafka,请创建以下docker-compose.yml文件
docker-compose.yml
version: "3" services: zookeeper: image: 'bitnami/zookeeper:latest' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:latest' ports: - '9092:9092' - '9093:9093' environment: - KAFKA_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_ConNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093 - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT depends_on: - zookeeper
要部署kafka,请在docker-compose.yml文件所在的目录中运行以下命令:
docker-compose up -d使用脚本kafka脚本测试本地环境
你可以使用kafka客户端进行测试,也可以跳过这个环境直接使用go进行测试
kafka客户端下可以查看官方文档 https://kafka.apache.org/downloads
打开2个命令行窗口,分别运行下面两个命令:
启动生产者
kafka-console-producer.sh --broker-list kafka:9093 --topic test
启动消费者
kafka-console-consumer.sh --bootstrap-server kafka:9093 --topic test
现在可以在生产者窗口输入
one twe three
查看消费者窗口,会显示生产者发送的消息。恭喜你,本地环境搭建完成!
下面将演示go如何链接kafka并且生产和消费消息
package main import ( "context" "fmt" kafka "github.com/segmentio/kafka-go" "log" ) func main() { r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9093"}, GroupID: "consumer-group-id", Topic: "test", MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) for { m, err := r.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %sn", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) } if err := r.Close(); err != nil { log.Fatal("failed to close reader:", err) } }Go消费者代码
package main import ( "context" kafka "github.com/segmentio/kafka-go" "log" ) func main() { // make a writer that produces to test, using the least-bytes distribution w := &kafka.Writer{ Addr: kafka.TCP("localhost:9093"), Topic: "test", Balancer: &kafka.LeastBytes{}, } err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte("Hello World!"), }, kafka.Message{ Key: []byte("Key-B"), Value: []byte("One!"), }, kafka.Message{ Key: []byte("Key-C"), Value: []byte("Two!"), }, ) if err != nil { log.Fatal("failed to write messages:", err) } if err := w.Close(); err != nil { log.Fatal("failed to close writer:", err) } }运行Go代码
在kafka目录下使用go mod下载依赖
go mod download
在consumer目录下运行消费者
go run mian.go message at topic/partition/offset test/0/18: Key-A = Hello World! message at topic/partition/offset test/0/19: Key-B = One! message at topic/partition/offset test/0/20: Key-C = Two!
在producer目录下运行生产者
go run mian.go
消费者窗口会打印结果,恭喜你使用go完成了kafka最基础的 *** 作!
- 本文go使用的库 kafka-go
- kafka的基本概念
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)