RabbitMQ *** 作(go)

RabbitMQ *** 作(go),第1张

go *** 作rabbitmq rabbitmq 安装和部署

本文仅介绍通过docker安装和部署

docker 拉取rabbitmq镜像rabbitmq:management
docker pull rabbitmq:management
检查镜像是否拉取成功
docker images

REPOSITORY               TAG          IMAGE ID       CREATED        SIZE
rabbitmq                 management   6c3c2a225947   7 days ago     253MB
wurstmeister/kafka       latest       72bcc3eef08e   12 days ago    508MB
mysql                    5.7          938b57d64674   2 months ago   448MB
redis                    latest       7faaec683238   2 months ago   113MB
wurstmeister/zookeeper   latest       3f43f72cb283   2 years ago    510MB
可以看到拉取的rabbitmq镜像
创建和启动rabbitmq容器命令
docker run -d --hostname rabbit-host --name my-rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672  rabbitmq:management
参数说明
-d: 指定容器运行于前台还是后台,默认为false
--hostname: 容器主机名
--name: 容器名字,可以通过容器名字管理容器,links特性需要使用名字
-e: 指定环境变量,容器中可以使用该环境变量
RABBITMQ_DEFAULT_USER: rabbitmq用户名
RABBITMQ_DEFAULT_PASS: rabbitmq密码
-p: 容器所暴露的端口  
端口说明: 
15672: 管理页面端口
5672: 生产者消费者端口
25672: 集群端口
查看容器是否启动成功
docker ps

CONTAINER ID   IMAGE                 COMMAND                  CREATED         STATUS          PORTS                                                                                                                        NAMES
d99adea2cca6   rabbitmq:management   "docker-entrypoint.s…"   4 seconds ago   Up 3 seconds    4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 0.0.0.0:15672->15672/tcp, 15691-15692/tcp, 0.0.0.0:25672->25672/tcp   my-rabbitmq

可以看到容器启动成功
通过15672端口打开后端管理界面http://127.0.0.1:15672/
使用创建容器时设置的用户名和密码即可登录到管理后台

至此完成了rabbitmq的安装和部署

go *** 作rabbitmq 主要接口说明 创建连接
// Dial accepts a string in the AMQP URI format and returns a new Connection
// over TCP using PlainAuth.  Defaults to a server heartbeat interval of 10
// seconds and sets the handshake deadline to 30 seconds. After handshake,
// deadlines are cleared.
//
// Dial uses the zero value of tls.Config when it encounters an amqps://
// scheme.  It is equivalent to calling DialTLS(amqp, nil).
func Dial(url string) (*Connection, error)

参数说明:
url string: rabbitmq 连接url

创建管道
/*
Channel opens a unique, concurrent server channel to process the bulk of AMQP
messages.  Any error from methods on this receiver will render the receiver
invalid and a new Channel should be opened.
*/
func (c *Connection) Channel() (*Channel, error) 
交换器类型

rabbitmq 主要词汇介绍

Producer: 生产者,产生消息。
Connect: 连接,生产者与rabbitmq server之间建立的TCP连接。
Channel: 信道,一条连接可以有多条信道,不同信道之间互不干扰(多线程)。
body: 要传递的消息。
exchange: 交换器,负责把消息转发到对应的队列。
exchangeName: 交换器名称,每个交换器对应一个名称,发送消息时会附带交换器名称,根据交换器名称选择对应的交换器。
queue: 队列,用于缓存消息。
BandingKey: 绑定键,一个队列可以有一个到多个绑定键,通过绑定 *** 作可以绑定交换器和队列,交换器会根据绑定键的名称找到对应的队列
RotingKey: 路由键,发送消息时,需要附带一条路由键,交换器会对路由键和绑定键进行匹配,如果匹配成功,则消息会转发到绑定键对应的队列中。
Consumer: 消费者,处理消息。

rabbitmq有四种交换器类型: direct,fanout,topic,headers
headers 匹配 AMQP 消息的 header 而不是路由键,headers和direct完全一致,但是headers性能较差,目前几乎没有使用,所以仅介绍其余三种交换器。
direct
消息中的路由键(routing key)如果和Binding中的binding key一致,交换器将消息发送到对应的队列中。路由键和队列名完全匹配,direct是完全匹配,单播模式。
fanout
每个发送到fanout交换器的消息都会分发到所有绑定的队列中。fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会转发到与该路由器所绑定的队列上。fanout消息转发是最快的。
topic
topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用".“隔开。它同样也会识别两个通配符,符号”#“和符号”*"。#匹配0个或多个单词,*匹配不多不少一个单词。

创建交换器
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error 

参数说明

name: 交换器的名称
kind: 交换器类型,direct,fanout,topic
durable: 是否持久化,true(持久化)。把交换器的配置存到磁盘,rabbitmq重启后,会自动加载交换器。
autoDelete: 是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都交换器解绑后,会自动删除交换器。
internal: 是否为内部, true 表示是。客户端无法直接发送消息到内部交换器,只有交换器可以发送消息到内部交换器。
noWait: 是否非阻塞,true 表示是。阻塞:创建交换器的请求发送后,等待服务器返回消息。非阻塞:不等待服务器返回消息。
args: 填写nil即可。
创建队列
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)

参数说明

name: 队列名称
durable: 是否持久化,true(持久化)。将队列存到磁盘,rabbitmq重启后,不会丢失队列信息。(宕机时未存盘,还是会丢失数据;持久化会影响性能)
autoDelete: 是否自动删除,true表示是。至少有一个消费者连接时才会触发,所有消费者断开连接时,队列会自动删除。
exclusive: 是否设置排他, true 表示是。设置排他。
noWait: 是否非阻塞,true 表示是。阻塞:创建交换器的请求发送后,等待服务器返回消息。非阻塞:不等待服务器返回消息。
args: 填写nil即可。
队列绑定
func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error

参数说明

name: 队列名称
key: 要绑定的键,BindingKey
exchange: 交换器名字。
noWait: 是否非阻塞,true 表示是。阻塞:创建交换器的请求发送后,等待服务器返回消息。非阻塞:不等待服务器返回消息。
args: 填写nil即可。
交换器绑定
func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) error 

参数说明

源交换器根据路由键和绑定键将消息转发到目的交换器。

destination: 目的交换器,通常是内部交换器。
key: 绑定的键
source: 源交换器。
noWait: 是否非阻塞,true 表示是。阻塞:创建交换器的请求发送后,等待服务器返回消息。非阻塞:不等待服务器返回消息。
args: 填写nil即可。
发送消息
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error

参数说明

exchange: 消息要发送到的交换器名称
key: 路由键(RoutingKey)
mandatory: true当消息无法通过交换器匹配到队列时,会通过basic.return 告知生产者;false 无法匹配时,舍弃消息。(不建议使用,较复杂)
immediate: true 当消息到达queue后,队列上无消费者时会通过basic.return 告知生产者;false 消息一直缓存在队列中。
msg: 要发送的消息。
接收消息–推
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)

参数说明

rabbitmq服务器将消息推送给消费者

queue: 队列名字
consumer: 消费者标签,用于区分不同的消费者
autoAck: 是否自动回复ACK,true表示是。建议实用false手动回复(可控性强)。
exclusive: 设置排他,当前队列只给一个消费者使用
noLocal: true 生产者和消费者不能是同一个连接(connect)
noWait: 是否非阻塞,true 表示是。阻塞:创建交换器的请求发送后,等待服务器返回消息。非阻塞:不等待服务器返回消息。
args: 填写nil即可。
接收消息–拉
func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)

参数说明

queue: 队列名称
autoAck: 是否设置自动回复
手动回复消息
func (ch *Channel) Ack(tag uint64, multiple bool) error 

func (ch *Channel) Reject(tag uint64, requeue bool) error 

参数说明

multiple: true 回复当前信道内所有未回复的ack,false 回复当前条目的消息
requeue: true 将消息重新加回队列; false丢掉消息
关闭连接
//关闭连接
func (c *Connection) Close() error
//关闭管道
func (ch *Channel) Close() error
amqp基本使用案例

rabbitmq.go

package RabbitMQ

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

//初始化参数
const(
	User string = "admin"
	Pass string = "admin"
	Host string = "127.0.0.1"
	Port string = "5672"
)

func Test(){
	fmt.Println("Test RabbitMQ")

}
//连接rabbitmq
func Connect() (*amqp.Connection,*amqp.Channel,error){
	//构造链接URL
	rabUrl := fmt.Sprintf("amqp://%s:%s@%s:%s%s",User, Pass, Host, Port, "/")
	fmt.Printf("url:%s\n",rabUrl)
	//创建链接
	mqConn, err := amqp.Dial(rabUrl)
	failOnError(err,"创建链接失败!")
	//defer mqConn.Close()
	if err != nil{
		fmt.Printf("RabbitMQ 打开链接失败:%s\n",err)
		return nil, nil, err
	}
	//打开一个通道
	mqChan, _err := mqConn.Channel()
	failOnError(_err,"打开管道失败!")
	//defer mqChan.Close()
	if _err != nil{
		fmt.Printf("RabbitMQ 打开管道失败:%s\n",_err)
		return nil, nil,  _err
	}
	//mqChan.Ack()
	return mqConn,mqChan,nil
}
//关闭RabbitMQ 连接
func Close(mqConn *amqp.Connection,mqChan *amqp.Channel) error {
	if mqConn != nil{
		err := mqConn.Close()
		return err
	}
	if mqChan != nil{
		err := mqChan.Close()
		return err
	}
	return nil
}
//推送消息
//qName 队列名称
//mqChan 信道指针
//body 要发送的消息
func Publish(mqChan *amqp.Channel,qName string, body string) error{
	//创建交换器 fanout
	err := mqChan.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
	failOnError(err,"failed declare a Exchange")
	//创建队列
	q, err := mqChan.QueueDeclare(qName,false,false,false,false,nil)
	failOnError(err,"failed declare a queue")
	//生产者推送消息
	err = mqChan.Publish("logs",q.Name,false,false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body: []byte(body),
		})
	failOnError(err,"failed Publish")
	fmt.Printf("send:%s\n",body)
	return err
}
//创建消费者并打印接收到的消息
//qName 队列名称
//mqChan 信道指针
func Consume(mqChan *amqp.Channel,qName string) error{
	//创建交换器 fanout
	err := mqChan.ExchangeDeclare("logs","fanout",true, false,false, false, nil)
	failOnError(err,"failed declare a Exchange")
	//创建队列
	q, err := mqChan.QueueDeclare(qName,false,false,false,false,nil)
	failOnError(err,"failed declare a queue")
	//绑定队列
	err = mqChan.QueueBind(q.Name,"","logs",false,nil)
	failOnError(err,"Failed to bind a queue")
	//创建消费者
	msgs, err := mqChan.Consume(q.Name,"",true,false,false,false,nil)
	failOnError(err,"create Consume failed")
	res := make(chan  bool)
	go func() {
		for m := range msgs{
			fmt.Printf("[x]recv:%s\n",m.Body)
		}
	}()
	<-res
	return err
}

//错误日志打印
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

send.go

package main

import (
	"fmt"
	rab "RabbitMQ"
)

func main()  {
	fmt.Println("测试rabbitmq")
	//创建连接并创建一个信道
	mqConn,mqChan,err := rab.Connect()
	if err != nil{
		fmt.Println("connect rabbitmq fail")
		return
	}
	fmt.Println("connect success")
	defer mqConn.Close()
	defer mqChan.Close()
	rab.Publish(mqChan,"test","1234567890")//推送消息
}

recv.go

package main

import (
	"fmt"
	rab "RabbitMQ"
)

func main()  {
	fmt.Println("测试rabbitmq")
	//创建连接并创建一个信道
	mqConn,mqChan,err := rab.Connect()
	if err != nil{
		fmt.Println("connect rabbitmq fail")
		return
	}
	fmt.Println("connect success")
	defer mqConn.Close()
	defer mqChan.Close()
	rab.Consume(mqChan,"test") //消费者接收推送的消息
}

测试结果

publish 推送消息
go run send.go
url:amqp://admin:admin@127.0.0.1:5672/
connect success
send:hello world
推送完成,启动消费者接收推送信息
go run recv.go
url:amqp://admin:admin@127.0.0.1:5672/
connect success
[x]recv:hello world

在发送消息时能够在管理页面中看到相关的信息,截图如下

点击获取源代码

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/994839.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存