rabbitmq分为生产者和消费者,生产者是创建消息,消费者是使用消息。
先写生产者。
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
func main() {
//链接rabbitmq服务器
con, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Println(err)
return
}
//创建和服务器的管道
ch, err := con.Channel()
if err != nil {
log.Println(err)
return
}
//在管道中声明队列
que, err := ch.QueueDeclare("hello", true, false, false, false, nil)
if err != nil {
log.Println(err)
return
}
//持续的创建消息
for {
n := time.Now().String()
ch.Publish("", que.Name, false, false, amqp.Publishing{Body: []byte("你好世界" + n), ContentType: "text/plain", DeliveryMode: amqp.Persistent})
time.Sleep(time.Second)
fmt.Println("发送" + n)
}
}
消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
func main() {
//链接服务器
con, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Println(err)
return
}
//和生产者相同,创建管道
ch, err := con.Channel()
if err != nil {
log.Println(err)
return
}
//创建队列
que, err := ch.QueueDeclare("hello", true, false, false, false, nil)
if err != nil {
log.Println(err)
return
}
//从队列中读取到数据,注意:返回的是一个管道,如果生产者在此队列中持续的创建消息,我们可以持续的从管道中读取消息。
msg, err := ch.Consume(que.Name, "", false, false, false, false, nil)
if err != nil {
log.Println(err)
return
}
go func() {
for value := range msg {
fmt.Println(string(value.Body))
value.Ack(false)
}
}()
time.Sleep(time.Hour)
}
一些参数的说明
que, err := ch.QueueDeclare("hello", true, false, false, false, nil)
/*
第一个参数是队列的名称,在创建一个管道时,同一个管道可以创建多个队列。
第二个参数是队列的持久化,false时,如果与服务器断开链接,此时,队列也就会被自动删除,如果为true,队列会保留
*/
msg, err := ch.Consume(que.Name, "", false, false, false, false, nil)
/*
第一个参数是消费者读取的队列
第三个参数是是否自动确认收到,如果为true,则返回ack,服务器端收到后,会自动删除此条数据。如果为false,则需要我们自己手动调用返回ack。
*/
msg, err := ch.Consume(que.Name, "", false, false, false, false, nil)
if err != nil {
log.Println(err)
return
}
go func() {
for value := range msg {
fmt.Println(string(value.Body))
value.Ack(false)
}
}()
/*
注意:我们这里并没有设置为true,自动返回ack,而是手动返回ack。
这里有一个和计网相似的功能,就是ack确认的数量。
如果ack(false),则只确认此条消息的接收。
如果ack(true),则确认此条消息之前的所有的消息。
*/
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)