Golang 实现 rabbitMq 和生产消费

Golang 实现 rabbitMq 和生产消费,第1张

订阅者-》channel创建者-》消费者



package main

import (
"log"
"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}


func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	//链接到队列 没有则创建
	ch.QueueDeclare(
		"woshiqueue", // 队列名称
		false,   // 是否持久化
		false,   // 是否自动删除 至少有一个链接 都断开才会删除
		false,   //是否排他
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	//消费队列
	msgs, err := ch.Consume(
		"woshiqueue", // 队列名称
		"",     // 消费者名字
		true,   // 收到消息后,是否不需要回复确认即被认为被消费
		false,  // 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接
		false,  //无用
		false,  // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
		nil,    // 其他参数
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

生产者-》发布

package main

import (
	"github.com/streadway/amqp"
	"log"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}



func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	//链接到队列 没有则创建
	ch.QueueDeclare(
		"woshiqueue", // 队列名称
		false,   // 是否持久化
		false,   // 是否自动删除 至少有一个链接 都断开才会删除
		false,   //是否排他
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	body := "Hello World!"
	err = ch.Publish(
		"",     // 交换机
		"woshiqueue", // channel
		false,  // mandatory
		/*
		当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;
		*/
		false,  // immediate
		/*
		RabbitMQ3.0不再支持immediate标志
		当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者
		*/
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	log.Printf(" [x] Sent %s", body)
	failOnError(err, "Failed to publish a message")
}

需要安装rabbitMq 服务具体查看安装详情->windos 安装rabbitmqhttps://mp.csdn.net/mp_blog/creation/editor/122492640

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/994013.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存