本文仅介绍通过docker安装和部署
docker 拉取rabbitmq镜像rabbitmq:managementdocker pull rabbitmq:management
检查镜像是否拉取成功
docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
rabbitmq management 6c3c2a225947 7 days ago 253MB
wurstmeister/kafka latest 72bcc3eef08e 12 days ago 508MB
mysql 5.7 938b57d64674 2 months ago 448MB
redis latest 7faaec683238 2 months ago 113MB
wurstmeister/zookeeper latest 3f43f72cb283 2 years ago 510MB
可以看到拉取的rabbitmq镜像
创建和启动rabbitmq容器命令
docker run -d --hostname rabbit-host --name my-rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 rabbitmq:management
参数说明
-d: 指定容器运行于前台还是后台,默认为false
--hostname: 容器主机名
--name: 容器名字,可以通过容器名字管理容器,links特性需要使用名字
-e: 指定环境变量,容器中可以使用该环境变量
RABBITMQ_DEFAULT_USER: rabbitmq用户名
RABBITMQ_DEFAULT_PASS: rabbitmq密码
-p: 容器所暴露的端口
端口说明:
15672: 管理页面端口
5672: 生产者消费者端口
25672: 集群端口
查看容器是否启动成功
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
d99adea2cca6 rabbitmq:management "docker-entrypoint.s…" 4 seconds ago Up 3 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 0.0.0.0:15672->15672/tcp, 15691-15692/tcp, 0.0.0.0:25672->25672/tcp my-rabbitmq
可以看到容器启动成功
通过15672端口打开后端管理界面http://127.0.0.1:15672/
使用创建容器时设置的用户名和密码即可登录到管理后台
至此完成了rabbitmq的安装和部署
// Dial accepts a string in the AMQP URI format and returns a new Connection
// over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
// seconds and sets the handshake deadline to 30 seconds. After handshake,
// deadlines are cleared.
//
// Dial uses the zero value of tls.Config when it encounters an amqps://
// scheme. It is equivalent to calling DialTLS(amqp, nil).
func Dial(url string) (*Connection, error)
参数说明:
url string: rabbitmq 连接url
/*
Channel opens a unique, concurrent server channel to process the bulk of AMQP
messages. Any error from methods on this receiver will render the receiver
invalid and a new Channel should be opened.
*/
func (c *Connection) Channel() (*Channel, error)
交换器类型
rabbitmq 主要词汇介绍
Producer: 生产者,产生消息。
Connect: 连接,生产者与rabbitmq server之间建立的TCP连接。
Channel: 信道,一条连接可以有多条信道,不同信道之间互不干扰(多线程)。
body: 要传递的消息。
exchange: 交换器,负责把消息转发到对应的队列。
exchangeName: 交换器名称,每个交换器对应一个名称,发送消息时会附带交换器名称,根据交换器名称选择对应的交换器。
queue: 队列,用于缓存消息。
BandingKey: 绑定键,一个队列可以有一个到多个绑定键,通过绑定 *** 作可以绑定交换器和队列,交换器会根据绑定键的名称找到对应的队列
RotingKey: 路由键,发送消息时,需要附带一条路由键,交换器会对路由键和绑定键进行匹配,如果匹配成功,则消息会转发到绑定键对应的队列中。
Consumer: 消费者,处理消息。
rabbitmq有四种交换器类型: direct,fanout,topic,headers
headers 匹配 AMQP 消息的 header 而不是路由键,headers和direct完全一致,但是headers性能较差,目前几乎没有使用,所以仅介绍其余三种交换器。
direct
消息中的路由键(routing key)如果和Binding中的binding key一致,交换器将消息发送到对应的队列中。路由键和队列名完全匹配,direct是完全匹配,单播模式。
fanout
每个发送到fanout交换器的消息都会分发到所有绑定的队列中。fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会转发到与该路由器所绑定的队列上。fanout消息转发是最快的。
topic
topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用".“隔开。它同样也会识别两个通配符,符号”#“和符号”*"。#匹配0个或多个单词,*匹配不多不少一个单词。
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
参数说明
name: 交换器的名称
kind: 交换器类型,direct,fanout,topic
durable: 是否持久化,true(持久化)。把交换器的配置存到磁盘,rabbitmq重启后,会自动加载交换器。
autoDelete: 是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都交换器解绑后,会自动删除交换器。
internal: 是否为内部, true 表示是。客户端无法直接发送消息到内部交换器,只有交换器可以发送消息到内部交换器。
noWait: 是否非阻塞,true 表示是。阻塞:创建交换器的请求发送后,等待服务器返回消息。非阻塞:不等待服务器返回消息。
args: 填写nil即可。
创建队列
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
参数说明
name: 队列名称
durable: 是否持久化,true(持久化)。将队列存到磁盘,rabbitmq重启后,不会丢失队列信息。(宕机时未存盘,还是会丢失数据;持久化会影响性能)
autoDelete: 是否自动删除,true表示是。至少有一个消费者连接时才会触发,所有消费者断开连接时,队列会自动删除。
exclusive: 是否设置排他, true 表示是。设置排他。
noWait: 是否非阻塞,true 表示是。阻塞:创建交换器的请求发送后,等待服务器返回消息。非阻塞:不等待服务器返回消息。
args: 填写nil即可。
队列绑定
func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
参数说明
name: 队列名称
key: 要绑定的键,BindingKey
exchange: 交换器名字。
noWait: 是否非阻塞,true 表示是。阻塞:创建交换器的请求发送后,等待服务器返回消息。非阻塞:不等待服务器返回消息。
args: 填写nil即可。
交换器绑定
func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) error
参数说明
源交换器根据路由键和绑定键将消息转发到目的交换器。
destination: 目的交换器,通常是内部交换器。
key: 绑定的键
source: 源交换器。
noWait: 是否非阻塞,true 表示是。阻塞:创建交换器的请求发送后,等待服务器返回消息。非阻塞:不等待服务器返回消息。
args: 填写nil即可。
发送消息
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
参数说明
exchange: 消息要发送到的交换器名称
key: 路由键(RoutingKey)
mandatory: true当消息无法通过交换器匹配到队列时,会通过basic.return 告知生产者;false 无法匹配时,舍弃消息。(不建议使用,较复杂)
immediate: true 当消息到达queue后,队列上无消费者时会通过basic.return 告知生产者;false 消息一直缓存在队列中。
msg: 要发送的消息。
接收消息–推
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
参数说明
rabbitmq服务器将消息推送给消费者
queue: 队列名字
consumer: 消费者标签,用于区分不同的消费者
autoAck: 是否自动回复ACK,true表示是。建议实用false手动回复(可控性强)。
exclusive: 设置排他,当前队列只给一个消费者使用
noLocal: true 生产者和消费者不能是同一个连接(connect)
noWait: 是否非阻塞,true 表示是。阻塞:创建交换器的请求发送后,等待服务器返回消息。非阻塞:不等待服务器返回消息。
args: 填写nil即可。
接收消息–拉
func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)
参数说明
queue: 队列名称
autoAck: 是否设置自动回复
手动回复消息
func (ch *Channel) Ack(tag uint64, multiple bool) error
func (ch *Channel) Reject(tag uint64, requeue bool) error
参数说明
multiple: true 回复当前信道内所有未回复的ack,false 回复当前条目的消息
requeue: true 将消息重新加回队列; false丢掉消息
关闭连接
//关闭连接
func (c *Connection) Close() error
//关闭管道
func (ch *Channel) Close() error
amqp基本使用案例
rabbitmq.go
package RabbitMQ
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
//初始化参数
const(
User string = "admin"
Pass string = "admin"
Host string = "127.0.0.1"
Port string = "5672"
)
func Test(){
fmt.Println("Test RabbitMQ")
}
//连接rabbitmq
func Connect() (*amqp.Connection,*amqp.Channel,error){
//构造链接URL
rabUrl := fmt.Sprintf("amqp://%s:%s@%s:%s%s",User, Pass, Host, Port, "/")
fmt.Printf("url:%s\n",rabUrl)
//创建链接
mqConn, err := amqp.Dial(rabUrl)
failOnError(err,"创建链接失败!")
//defer mqConn.Close()
if err != nil{
fmt.Printf("RabbitMQ 打开链接失败:%s\n",err)
return nil, nil, err
}
//打开一个通道
mqChan, _err := mqConn.Channel()
failOnError(_err,"打开管道失败!")
//defer mqChan.Close()
if _err != nil{
fmt.Printf("RabbitMQ 打开管道失败:%s\n",_err)
return nil, nil, _err
}
//mqChan.Ack()
return mqConn,mqChan,nil
}
//关闭RabbitMQ 连接
func Close(mqConn *amqp.Connection,mqChan *amqp.Channel) error {
if mqConn != nil{
err := mqConn.Close()
return err
}
if mqChan != nil{
err := mqChan.Close()
return err
}
return nil
}
//推送消息
//qName 队列名称
//mqChan 信道指针
//body 要发送的消息
func Publish(mqChan *amqp.Channel,qName string, body string) error{
//创建交换器 fanout
err := mqChan.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
failOnError(err,"failed declare a Exchange")
//创建队列
q, err := mqChan.QueueDeclare(qName,false,false,false,false,nil)
failOnError(err,"failed declare a queue")
//生产者推送消息
err = mqChan.Publish("logs",q.Name,false,false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err,"failed Publish")
fmt.Printf("send:%s\n",body)
return err
}
//创建消费者并打印接收到的消息
//qName 队列名称
//mqChan 信道指针
func Consume(mqChan *amqp.Channel,qName string) error{
//创建交换器 fanout
err := mqChan.ExchangeDeclare("logs","fanout",true, false,false, false, nil)
failOnError(err,"failed declare a Exchange")
//创建队列
q, err := mqChan.QueueDeclare(qName,false,false,false,false,nil)
failOnError(err,"failed declare a queue")
//绑定队列
err = mqChan.QueueBind(q.Name,"","logs",false,nil)
failOnError(err,"Failed to bind a queue")
//创建消费者
msgs, err := mqChan.Consume(q.Name,"",true,false,false,false,nil)
failOnError(err,"create Consume failed")
res := make(chan bool)
go func() {
for m := range msgs{
fmt.Printf("[x]recv:%s\n",m.Body)
}
}()
<-res
return err
}
//错误日志打印
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
send.go
package main
import (
"fmt"
rab "RabbitMQ"
)
func main() {
fmt.Println("测试rabbitmq")
//创建连接并创建一个信道
mqConn,mqChan,err := rab.Connect()
if err != nil{
fmt.Println("connect rabbitmq fail")
return
}
fmt.Println("connect success")
defer mqConn.Close()
defer mqChan.Close()
rab.Publish(mqChan,"test","1234567890")//推送消息
}
recv.go
package main
import (
"fmt"
rab "RabbitMQ"
)
func main() {
fmt.Println("测试rabbitmq")
//创建连接并创建一个信道
mqConn,mqChan,err := rab.Connect()
if err != nil{
fmt.Println("connect rabbitmq fail")
return
}
fmt.Println("connect success")
defer mqConn.Close()
defer mqChan.Close()
rab.Consume(mqChan,"test") //消费者接收推送的消息
}
测试结果
publish 推送消息
go run send.go
url:amqp://admin:admin@127.0.0.1:5672/
connect success
send:hello world
推送完成,启动消费者接收推送信息
go run recv.go
url:amqp://admin:admin@127.0.0.1:5672/
connect success
[x]recv:hello world
在发送消息时能够在管理页面中看到相关的信息,截图如下
点击获取源代码
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)