Golang中使用kafka消息中间件

Golang中使用kafka消息中间件,第1张

Golang中使用kafka消息中间件

文章目录

参考注意事项一、下载安装zookeeper二、下载安装kafka

参考

官方:kafka 下载地址
在Golang中使用最流行的消息中间件kafka

注意事项

win环境下,kafka需要java环境,安装java-jdk
win环境下安装kafka
先进先出

一、下载安装zookeeper

kafka依赖zookeeper,所以需要下载安装zookeeper

http://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/
tar -zxvf apache-zookeeper-3.7.0.tar.gz   

修改配置文件

mv zoo_sample.cfg zoo.cfg

启动zookeeper

./zkServer.sh start
二、下载安装kafka
https://kafka.apache.org/downloads
tar -zxvf kafka_2.13-3.0.0.tgz

启动kafka

bin/kafka-server-start.sh config/server.properties

创建主题

# quickstart-events 主题名称
# 创建的服务 localhost:9092
bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092

发送消息

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

接收消息

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

下载安装kafka Golang客户端

go get github.com/Shopify/sarama

使用Golang创建消息生产者

package main

import (
	"fmt"
	"log"
	"os"
	"time"

	"github.com/Shopify/sarama"
)

var Address = []string{"192.168.18.128:9092"}

func main() {
	syncProducer(Address)
	// aSyncProducer()
}

//同步消息模式
func syncProducer(address []string) {
	// 配置
	config := sarama.NewConfig()
	// 属性设置
	config.Producer.Return.Successes = true
	config.Producer.Timeout = 5 * time.Second
	// 创建生成者
	p, err := sarama.NewSyncProducer(address, config)
	// 判断错误
	if err != nil {
		log.Printf("sarama.NewSyncProducer err, message=%s n", err)
		return
	}
	// 最后关闭生产者
	defer p.Close()
	// 主题名称
	topic := "topic1"
	// 消息
	srcValue := "sync: this is a message. index=%d"
	// 循环发消息
	for i := 0; i < 10; i++ {
		// 格式化消息
		value := fmt.Sprintf(srcValue, i)
		// 创建消息
		msg := &sarama.ProducerMessage{
			Topic: topic,
			Value: sarama.ByteEncoder(value),
		}
		// 发送消息
		part, offset, err := p.SendMessage(msg)
		if err != nil {
			log.Printf("send message(%s) err=%s n", value, err)
		} else {
			fmt.Fprintf(os.Stdout, value+"发送成功,partition=%d, offset=%d n", part, offset)
		}
		// 每隔两秒发送一个消息
		time.Sleep(2 * time.Second)
	}
}

// 异步消息
func aSyncProducer() {

	config := sarama.NewConfig()
	//等待服务器所有副本都保存成功后的响应
	config.Producer.RequiredAcks = sarama.WaitForAll
	//随机向partition发送消息
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
	//注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
	config.Version = sarama.V0_10_0_1

	fmt.Println("start make producer")
	//使用配置,新建一个异步生产者
	producer, e := sarama.NewAsyncProducer([]string{"192.168.18.128:9092"}, config)
	if e != nil {
		fmt.Println(e)
		return
	}
	defer producer.AsyncClose()

	//循环判断哪个通道发送过来数据.
	fmt.Println("start goroutine")
	go func(p sarama.AsyncProducer) {
		for {
			select {
			case <-p.Successes():
				//fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
			case fail := <-p.Errors():
				fmt.Println("err: ", fail.Err)
			}
		}
	}(producer)

	var value string
	for i := 0; ; i++ {
		time.Sleep(500 * time.Millisecond)
		time11 := time.Now()
		value = "this is a message 0606 " + time11.Format("15:04:05")

		// 发送的消息,主题。
		// 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。
		msg := &sarama.ProducerMessage{
			Topic: "topic2",
		}

		//将字符串转化为字节数组
		msg.Value = sarama.ByteEncoder(value)
		//fmt.Println(value)

		//使用通道发送
		producer.Input() <- msg
	}
}

命令行客户端接收消息测试

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

使用Golang创建消息消费者

package main

import (
	"fmt"
	"time"

	"github.com/Shopify/sarama"
	cluster "github.com/bsm/sarama-cluster"
)

var (
	kafkaConsumer *cluster.Consumer
	kafkaBrokers  = []string{"192.168.18.128:9092"}
	kafkaTopic    = "topic1"
	groupId       = "test_1"
)

func init() {
	// 配置
	var err error
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
	config.Consumer.Offsets.Initial = -2
	config.Consumer.Offsets.CommitInterval = 1 * time.Second
	config.Group.Return.Notifications = true
	// 创建消费者
	kafkaConsumer, err = cluster.NewConsumer(kafkaBrokers, groupId, []string{kafkaTopic}, config)
	if err != nil {
		panic(err.Error())
	}
	if kafkaConsumer == nil {
		panic(fmt.Sprintf("consumer is nil. kafka info -> {brokers:%v, topic: %v, group: %v}", kafkaBrokers, kafkaTopic, groupId))
	}
	fmt.Printf("kafka init success, consumer -> %v, topic -> %v, ", kafkaConsumer, kafkaTopic)
}

func main() {
	for {
		select {
		case msg, ok := <-kafkaConsumer.Messages():
			if ok {
				fmt.Printf("kafka 接收到的消息: %s n", msg.Value)
				kafkaConsumer.MarkOffset(msg, "")
			} else {
				fmt.Printf("kafka 监听服务失败")
			}
		case err, ok := <-kafkaConsumer.Errors():
			if ok {
				fmt.Printf("consumer error: %v", err)
			}
		case ntf, ok := <-kafkaConsumer.Notifications():
			if ok {
				fmt.Printf("consumer notification: %v", ntf)
			}
		}
	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705204.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存