参考注意事项一、下载安装zookeeper二、下载安装kafka
参考官方:kafka 下载地址
在Golang中使用最流行的消息中间件kafka
win环境下,kafka需要java环境,安装java-jdk
win环境下安装kafka
先进先出
kafka依赖zookeeper,所以需要下载安装zookeeper
http://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/ tar -zxvf apache-zookeeper-3.7.0.tar.gz
修改配置文件
mv zoo_sample.cfg zoo.cfg
启动zookeeper
./zkServer.sh start二、下载安装kafka
https://kafka.apache.org/downloads tar -zxvf kafka_2.13-3.0.0.tgz
启动kafka
bin/kafka-server-start.sh config/server.properties
创建主题
# quickstart-events 主题名称 # 创建的服务 localhost:9092 bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic quickstart-events --bootstrap-server localhost:9092
发送消息
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
接收消息
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
下载安装kafka Golang客户端
go get github.com/Shopify/sarama
使用Golang创建消息生产者
package main import ( "fmt" "log" "os" "time" "github.com/Shopify/sarama" ) var Address = []string{"192.168.18.128:9092"} func main() { syncProducer(Address) // aSyncProducer() } //同步消息模式 func syncProducer(address []string) { // 配置 config := sarama.NewConfig() // 属性设置 config.Producer.Return.Successes = true config.Producer.Timeout = 5 * time.Second // 创建生成者 p, err := sarama.NewSyncProducer(address, config) // 判断错误 if err != nil { log.Printf("sarama.NewSyncProducer err, message=%s n", err) return } // 最后关闭生产者 defer p.Close() // 主题名称 topic := "topic1" // 消息 srcValue := "sync: this is a message. index=%d" // 循环发消息 for i := 0; i < 10; i++ { // 格式化消息 value := fmt.Sprintf(srcValue, i) // 创建消息 msg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(value), } // 发送消息 part, offset, err := p.SendMessage(msg) if err != nil { log.Printf("send message(%s) err=%s n", value, err) } else { fmt.Fprintf(os.Stdout, value+"发送成功,partition=%d, offset=%d n", part, offset) } // 每隔两秒发送一个消息 time.Sleep(2 * time.Second) } } // 异步消息 func aSyncProducer() { config := sarama.NewConfig() //等待服务器所有副本都保存成功后的响应 config.Producer.RequiredAcks = sarama.WaitForAll //随机向partition发送消息 config.Producer.Partitioner = sarama.NewRandomPartitioner //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用. config.Producer.Return.Successes = true config.Producer.Return.Errors = true //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置 //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息 config.Version = sarama.V0_10_0_1 fmt.Println("start make producer") //使用配置,新建一个异步生产者 producer, e := sarama.NewAsyncProducer([]string{"192.168.18.128:9092"}, config) if e != nil { fmt.Println(e) return } defer producer.AsyncClose() //循环判断哪个通道发送过来数据. fmt.Println("start goroutine") go func(p sarama.AsyncProducer) { for { select { case <-p.Successes(): //fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition) case fail := <-p.Errors(): fmt.Println("err: ", fail.Err) } } }(producer) var value string for i := 0; ; i++ { time.Sleep(500 * time.Millisecond) time11 := time.Now() value = "this is a message 0606 " + time11.Format("15:04:05") // 发送的消息,主题。 // 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。 msg := &sarama.ProducerMessage{ Topic: "topic2", } //将字符串转化为字节数组 msg.Value = sarama.ByteEncoder(value) //fmt.Println(value) //使用通道发送 producer.Input() <- msg } }
命令行客户端接收消息测试
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
使用Golang创建消息消费者
package main import ( "fmt" "time" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" ) var ( kafkaConsumer *cluster.Consumer kafkaBrokers = []string{"192.168.18.128:9092"} kafkaTopic = "topic1" groupId = "test_1" ) func init() { // 配置 var err error config := cluster.NewConfig() config.Consumer.Return.Errors = true config.Group.Return.Notifications = true config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange config.Consumer.Offsets.Initial = -2 config.Consumer.Offsets.CommitInterval = 1 * time.Second config.Group.Return.Notifications = true // 创建消费者 kafkaConsumer, err = cluster.NewConsumer(kafkaBrokers, groupId, []string{kafkaTopic}, config) if err != nil { panic(err.Error()) } if kafkaConsumer == nil { panic(fmt.Sprintf("consumer is nil. kafka info -> {brokers:%v, topic: %v, group: %v}", kafkaBrokers, kafkaTopic, groupId)) } fmt.Printf("kafka init success, consumer -> %v, topic -> %v, ", kafkaConsumer, kafkaTopic) } func main() { for { select { case msg, ok := <-kafkaConsumer.Messages(): if ok { fmt.Printf("kafka 接收到的消息: %s n", msg.Value) kafkaConsumer.MarkOffset(msg, "") } else { fmt.Printf("kafka 监听服务失败") } case err, ok := <-kafkaConsumer.Errors(): if ok { fmt.Printf("consumer error: %v", err) } case ntf, ok := <-kafkaConsumer.Notifications(): if ok { fmt.Printf("consumer notification: %v", ntf) } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)