rabbitmq分为生产者和消费者,生产者是创建消息,消费者是使用消息。
先写生产者。
package main import ( "fmt" "github.com/streadway/amqp" "log" "time" ) func main() { //链接rabbitmq服务器 con, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Println(err) return } //创建和服务器的管道 ch, err := con.Channel() if err != nil { log.Println(err) return } //在管道中声明队列 que, err := ch.QueueDeclare("hello", true, false, false, false, nil) if err != nil { log.Println(err) return } //持续的创建消息 for { n := time.Now().String() ch.Publish("", que.Name, false, false, amqp.Publishing{Body: []byte("你好世界" + n), ContentType: "text/plain", DeliveryMode: amqp.Persistent}) time.Sleep(time.Second) fmt.Println("发送" + n) } }
消费者
package main import ( "fmt" "github.com/streadway/amqp" "log" "time" ) func main() { //链接服务器 con, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Println(err) return } //和生产者相同,创建管道 ch, err := con.Channel() if err != nil { log.Println(err) return } //创建队列 que, err := ch.QueueDeclare("hello", true, false, false, false, nil) if err != nil { log.Println(err) return } //从队列中读取到数据,注意:返回的是一个管道,如果生产者在此队列中持续的创建消息,我们可以持续的从管道中读取消息。 msg, err := ch.Consume(que.Name, "", false, false, false, false, nil) if err != nil { log.Println(err) return } go func() { for value := range msg { fmt.Println(string(value.Body)) value.Ack(false) } }() time.Sleep(time.Hour) }一些参数的说明
que, err := ch.QueueDeclare("hello", true, false, false, false, nil)
msg, err := ch.Consume(que.Name, "", false, false, false, false, nil)
msg, err := ch.Consume(que.Name, "", false, false, false, false, nil) if err != nil { log.Println(err) return } go func() { for value := range msg { fmt.Println(string(value.Body)) value.Ack(false) } }()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)