订阅者-》channel创建者-》消费者
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//链接到队列 没有则创建
ch.QueueDeclare(
"woshiqueue", // 队列名称
false, // 是否持久化
false, // 是否自动删除 至少有一个链接 都断开才会删除
false, //是否排他
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
//消费队列
msgs, err := ch.Consume(
"woshiqueue", // 队列名称
"", // 消费者名字
true, // 收到消息后,是否不需要回复确认即被认为被消费
false, // 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接
false, //无用
false, // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
nil, // 其他参数
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
生产者-》发布
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//链接到队列 没有则创建
ch.QueueDeclare(
"woshiqueue", // 队列名称
false, // 是否持久化
false, // 是否自动删除 至少有一个链接 都断开才会删除
false, //是否排他
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "Hello World!"
err = ch.Publish(
"", // 交换机
"woshiqueue", // channel
false, // mandatory
/*
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;
*/
false, // immediate
/*
RabbitMQ3.0不再支持immediate标志
当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者
*/
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
}
需要安装rabbitMq 服务具体查看安装详情->windos 安装rabbitmqhttps://mp.csdn.net/mp_blog/creation/editor/122492640
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)