go-rabbitmq

go-rabbitmq,第1张

简单模式 编码
package RabbitMQ

import (
	"fmt"
	"log"

	"github.com/streadway/amqp"
)

const RABBITMQ_PATH = "amqp://irismall:[email protected]:5672/iris-mall"

type RabbitMQ struct {
	conn      *amqp.Connection
	channel   *amqp.Channel
	QueueName string
	Exchange  string
	Key       string
	Path      string
}

//实例化
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
	rabbitmq := &RabbitMQ{
		QueueName: queueName,
		Exchange:  exchange,
		Key:       key,
		Path:      RABBITMQ_PATH,
	}
	var err error

	rabbitmq.conn, err = amqp.Dial(rabbitmq.Path)
	rabbitmq.GetError(err, "RabbitMQ连接不正确!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.GetError(err, "RabbitMQ连接channel不正确!")
	return rabbitmq
}

//销毁连接
func (r *RabbitMQ) Destory() {
	r.channel.Close()
	r.conn.Close()
}

//获取错误
func (r *RabbitMQ) GetError(err error, message string) {
	if err != nil {
		log.Fatalf("%s:%s", err, message)
		panic(fmt.Sprintf("%s:%s", err, message))
	}
}

//简单模式
func NewSimpleRabbitMQ(queueName string) *RabbitMQ {
	return NewRabbitMQ(queueName, "", "")
}

//简单模式生产者
func (r *RabbitMQ) SimplePublish(message string) {
	_, err := r.channel.QueueDeclare(
		r.QueueName,
		false, // durable  是否持久化数据
		false, // autoDelete 是否自动删除
		false, // exclusive 排他性,权限私有
		false, // noWaite 是否阻塞
		nil,
	)

	if err != nil {
		fmt.Println(err)
	}
	r.channel.Publish(
		r.Exchange,
		r.QueueName,
		false, //mandatory 如果是true,会根据exchange和routekey规则,如果无法找到符合条件的队列会把消息的消息返回给发送者
		false, //immediate true 表示当exchange将消息发送到队列后发现队列没有绑定消费者,则会把消息发回给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

//简单模式消费者
func (r *RabbitMQ) SimpleConsume() {
	_, err := r.channel.QueueDeclare(
		r.QueueName,
		false, // durable  是否持久化数据
		false, // autoDelete 是否自动删除
		false, // exclusive 排他性,权限私有
		false, // noWaite 是否阻塞
		nil,
	)

	if err != nil {
		fmt.Println(err)
	}

	message, err := r.channel.Consume(
		r.QueueName,
		"",    //区分多个消费者
		true,  //autoAck 是否自动应答
		false, //exclusive 是否排它性
		false, //noLocal true表示不能将同一个connection中发送的消息传递给这个connection中的消费者
		false,
		nil,
	)

	if err != nil {
		fmt.Println(err)
	}

	channelWait := make(chan bool)
	go func() {
		for msg := range message {
			fmt.Println(string(msg.Body))
		}
	}()

	log.Printf("[*] waiting......")
	<-channelWait
}

生产者
package main

import (
	"fmt"
	"tools/rabbitmq"
)

func main() {
	rb := RabbitMQ.NewSimpleRabbitMQ("mallproduct")
	rb.SimplePublish("这个就是个消息生产者!!!")
	fmt.Println("发送成功......")
}

消费者
package main

import "tools/rabbitmq"

func main() {
	bq := RabbitMQ.NewSimpleRabbitMQ("mallproduct")
	bq.SimpleConsume()
}

工作模式

一个消息只能被一个消费者消费
>

编码

和简单模式一样,只是多了几个消费端

生产者
package main

import (
	"fmt"
	"strconv"
	"tools/rabbitmq"
)

func main() {
	rb := RabbitMQ.NewSimpleRabbitMQ("mallproduct")

	for i:=0;i<100;i++{
		rb.SimplePublish("这个就是个消息生产者!!!第"+strconv.Itoa(i)+"个消费者")
	}

	fmt.Println("发送成功......")
}

消费者

多个消费者,每个消息只给其中一个消费者消费一次,其它的消费者就不能获取此条消息

package main

import "tools/rabbitmq"

func main() {
	bq := RabbitMQ.NewSimpleRabbitMQ("mallproduct")
	bq.SimpleConsume()
}

订阅模式

编码
package RabbitMQ

import (
	"fmt"
	"log"

	"github.com/streadway/amqp"
)

const RABBITMQ_PATH = "amqp://irismall:[email protected]:5672/iris-mall"

type RabbitMQ struct {
	conn      *amqp.Connection
	channel   *amqp.Channel
	QueueName string
	Exchange  string
	Key       string
	Path      string
}

//实例化
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
	rabbitmq := &RabbitMQ{
		QueueName: queueName,
		Exchange:  exchange,
		Key:       key,
		Path:      RABBITMQ_PATH,
	}
	var err error

	rabbitmq.conn, err = amqp.Dial(rabbitmq.Path)
	rabbitmq.GetError(err, "RabbitMQ连接不正确!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.GetError(err, "RabbitMQ连接channel不正确!")
	return rabbitmq
}

//销毁连接
func (r *RabbitMQ) Destory() {
	r.channel.Close()
	r.conn.Close()
}

//获取错误
func (r *RabbitMQ) GetError(err error, message string) {
	if err != nil {
		log.Fatalf("%s:%s", err, message)
		panic(fmt.Sprintf("%s:%s", err, message))
	}
}

//订阅模式创建实例化
func NewPubSubRabbitMQ(exchangeName string)*RabbitMQ{
	var err error
	rabbitmq:= NewRabbitMQ("",exchangeName,"")
	rabbitmq.conn,err= amqp.Dial(rabbitmq.Path)
	rabbitmq.GetError(err,"创建conn失败")
	rabbitmq.channel,err=rabbitmq.conn.Channel()
	rabbitmq.GetError(err,"创建channel失败")
	return rabbitmq
}

//生产者
func(r *RabbitMQ)PublishPub(message string){
	  err:=r.channel.ExchangeDeclare(
	  		r.Exchange,
	  		"fanout",
	  		true,
	  		false,
			false,
			false,
			nil,
	  	)
	  r.GetError(err,"RabbitMQ创建exchange失败!")

	  err=r.channel.Publish(
	  	r.Exchange,
	  	"",
	  	false,
	  	false,
	  	amqp.Publishing{
	  		ContentType: "text/plain",
	  		Body: []byte(message),
		},
	  	)
	  r.GetError(err,"消费者创建失败")
}

//消费者
func(r *RabbitMQ)RecieveSub(){
	//创建交换机
	err:=r.channel.ExchangeDeclare(
		r.Exchange,
		"fanout",
		true,
		false,
		false,
		false,
		nil,
	)
	r.GetError(err,"RabbitMQ创建exchange失败!")

	//创建队列
	q,err := r.channel.QueueDeclare(
		"",
		false,
		false,
		true,
		false,
		nil,
		)
	r.GetError(err,"创建队列失败")

	//交换机和队列绑定
	err=r.channel.QueueBind(
		//这里指定的是队列名称,由于上面设置的队列是空的代表随机,这里要获取名字,只能从q中取
		q.Name,
		"",
		r.Exchange,
		false,
		nil,
		)

	//消费者
	messages,err :=r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
		)
	waitChannel:=make(chan bool)
	go func(){
		for msg:=range(messages){
			fmt.Println(string(msg.Body))
		}
	}()
	<-waitChannel
}


生产者
package main

import (
	"fmt"
	"strconv"
	"time"
	"tools/rabbitmq"
)

func main() {
	rb := RabbitMQ.NewPubSubRabbitMQ("iris-mall")

	for i:=0;i<100;i++{
		rb.PublishPub("订阅模式生产的第"+strconv.Itoa(i)+"条数据")
		fmt.Println("订阅模式生产的第"+strconv.Itoa(i)+"条数据")
		time.Sleep(1*time.Second)
	}

	fmt.Println("发送成功......")
}


消费者
package main

import "tools/rabbitmq"

func main() {
	bq := RabbitMQ.NewPubSubRabbitMQ("iris-mall")
	bq.RecieveSub()
}

路由模式

编码
package RabbitMQ

import (
	"fmt"
	"log"

	"github.com/streadway/amqp"
)

const RABBITMQ_PATH = "amqp://irismall:[email protected]:5672/iris-mall"

type RabbitMQ struct {
	conn      *amqp.Connection
	channel   *amqp.Channel
	QueueName string
	Exchange  string
	Key       string
	Path      string
}

//实例化
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
	rabbitmq := &RabbitMQ{
		QueueName: queueName,
		Exchange:  exchange,
		Key:       key,
		Path:      RABBITMQ_PATH,
	}
	var err error

	rabbitmq.conn, err = amqp.Dial(rabbitmq.Path)
	rabbitmq.GetError(err, "RabbitMQ连接不正确!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.GetError(err, "RabbitMQ连接channel不正确!")
	return rabbitmq
}

//销毁连接
func (r *RabbitMQ) Destory() {
	r.channel.Close()
	r.conn.Close()
}

//获取错误
func (r *RabbitMQ) GetError(err error, message string) {
	if err != nil {
		log.Fatalf("%s:%s", err, message)
		panic(fmt.Sprintf("%s:%s", err, message))
	}
}

//订阅模式创建实例化
func NewRoutingRabbitMQ(exchangeName string,routingKey string)*RabbitMQ{
	var err error
	rabbitmq:= NewRabbitMQ("",exchangeName,routingKey)
	rabbitmq.conn,err= amqp.Dial(rabbitmq.Path)
	rabbitmq.GetError(err,"创建conn失败")
	rabbitmq.channel,err=rabbitmq.conn.Channel()
	rabbitmq.GetError(err,"创建channel失败")
	return rabbitmq
}

//生产者
func(r *RabbitMQ)PublishRouting(message string){
	  err:=r.channel.ExchangeDeclare(
	  		r.Exchange,
	  		"direct",
	  		true,
	  		false,
			false,
			false,
			nil,
	  	)
	  r.GetError(err,"RabbitMQ创建exchange失败!")

	  err=r.channel.Publish(
	  	r.Exchange,
	  	r.Key,
	  	false,
	  	false,
	  	amqp.Publishing{
	  		ContentType: "text/plain",
	  		Body: []byte(message),
		},
	  	)
	  r.GetError(err,"消费者创建失败")
}

//消费者
func(r *RabbitMQ)RecieveRouting(){
	//创建交换机
	err:=r.channel.ExchangeDeclare(
		r.Exchange,
		"direct",
		true,
		false,
		false,
		false,
		nil,
	)
	r.GetError(err,"RabbitMQ创建exchange失败!")

	//创建队列
	q,err := r.channel.QueueDeclare(
		"",
		false,
		false,
		true,
		false,
		nil,
		)
	r.GetError(err,"创建队列失败")

	//交换机和队列绑定
	err=r.channel.QueueBind(
		//这里指定的是队列名称,由于上面设置的队列是空的代表随机,这里要获取名字,只能从q中取
		q.Name,
		r.Key,
		r.Exchange,
		false,
		nil,
		)

	//消费者
	messages,err :=r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
		)
	waitChannel:=make(chan bool)
	go func(){
		for msg:=range(messages){
			fmt.Println(string(msg.Body))
		}
	}()
	<-waitChannel
}

生产者
package main

import (
	"fmt"
	"strconv"
	"tools/rabbitmq"
)

func main() {
	rb := RabbitMQ.NewRoutingRabbitMQ("amq.direct","first")
	rb1 := RabbitMQ.NewRoutingRabbitMQ("amq.direct","second")

	for i:=0;i<100;i++{
		rb.PublishRouting("订阅模式"+rb.Key+"生产的第"+strconv.Itoa(i)+"条数据")
		rb1.PublishRouting("订阅模式"+rb1.Key+"生产的第"+strconv.Itoa(i)+"条数据")
	}

	fmt.Println("发送成功......")
}


消费者
package main

import "tools/rabbitmq"

func main() {
	bq := RabbitMQ.NewRoutingRabbitMQ("amq.direct","first")
	bq.RecieveRouting()
}

package main

import "tools/rabbitmq"

func main() {
	bq := RabbitMQ.NewRoutingRabbitMQ("amq.direct","second")
	bq.RecieveRouting()
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/993960.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存