Go语言使用RabbitMQ

Go语言使用RabbitMQ,第1张

基本概念

什么是消息队列

消息队列是一种应用(进程)间的通信方式。

生产者只需把消息发布到MQ,消费者只需重MQ中取出,可靠传递由消息队列中的消息系统来确保。

消息队列有什么用

消息队列是一种异步协作机制,最根本的用处在于将一些不需要即时生效的 *** 作拆分出来异步执行,从而达到可靠传递、流量削峰等目的。

比如如果有一个业务需要发送短信,可以在主流程完成之后发送消息到MQ后,让主流程完结。而由另外的线程拉取MQ的消息,完成发送短信的 *** 作。

常用的消息队列

常用的MQ大概有ActiveMQ、RabbitMQ、RocketMQ、Kafka

ActiveMQ,基于Java

优点:对Java的JMS支持最好;多线程并发;

缺点:历史悠久,版本更新慢。现在慢慢用的少了;

RabbitMQ,基于Erlang

优点:生态丰富,是现在主流的MQ;支持多种客户端、支持AJAX;

缺点:对想深入源码的Java选手不太友好;

RocketMQ,基于Java

优点:为海量数据打造;主张拉模式;天然集群、HA、负载均衡;

缺点:生态较小

Kafka,基于Scala

优点:分布式高可拓展;高性能;容错强

缺点:消息重复;乱序;维护成本高

什么是RabbitMQ

消息中间件

erlang:一种并发函数式语言

AMQP:Advanced Message Queuing Protocol,高级消息队列协议。由Exchange、Queue和Bind组成

RabbitMQ是一个erlang开发的AMQP实现

生产者将消息发送到Exchange上,通过Exchange从而Binding到Queues上。

Exchange有三种具体类型:

direct:如果消息中的RoutingKey和Binding中的BindingKey一致就转发fanout:消息被分发到所有队列中topic:将RoutingKey和队列的模式进行匹配 应用场景

异步

可以理解为将遇到非必须的业务时,立即响应客户端,不关系业务何时完成

比如在用户注册时,有将信息写入数据库和发送注册成功邮件两项业务。

数据库写入完成即标志着用户注册成功,此时如果继续处理发送邮件的业务,会给客户端带来不必要的等待时间。引入消息队列后,在队列中写入完成注册的消息后,即可完成整个注册流程。至于邮件,可以等到邮件业务从消息队列中取出消息再发送。

把不紧急的业务从主线中剥离出来,主线不必考虑不紧急的业务何时完成的时候,可以考虑使用消息队列实现异步。

解耦

考虑两个系统间存在消息传递,一个系统的故障会影响到整个业务的正常运转。可以用消息队列来保证消息可靠传递

比如一个订单系统和一个库存系统,完成订单之后,需要进行库存调度。考虑到如果库存系统故障,会引起已完成的订单消息的丢失,而做很多异常处理会使业务变得臃肿。这个时候可考虑引入消息队列,使用消息队列保证可靠传输,从而减少业务逻辑。

削峰

考虑短时间的大量请求,可能会带来内存溢出、大面积连接超时等情况,使得服务器崩溃。引入消息队列后,可以控制请求到业务处理系统的流量,从而防止崩溃现象的出现。

比如秒杀场景。大量请求同时涌入,服务器不能分配足够的资源响应,或者带宽不足,导致宕机。可以引入消息队列来限流,MQ通过限制同一时间的出口消息,使得流量在服务器能够承受的范围之内。等待一部分请求处理完成之后,再向业务处理系统导入新的消息。

Go语言使用RabbitMQ

docker安装RabbitMQ

sudo docker pull rabbitmq
sudo docker images
sudo docker run --name rabbitmq -d -p 5672:5672
sudo docker ps

go安装rabbitmq客户端

go get github.com/streadway/amqp

HelloRabbitMQ

send.go

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
	defer conn.Close()
	ch, _ := conn.Channel()
	defer ch.Close()
	q, _ := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	body := "Hello World!"
	_ = ch.Publish(
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
}

recv.go

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
	defer conn.Close()
	ch, _ := conn.Channel()
	defer ch.Close()
	q, _ := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	msgs, _ := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	forever := make(chan bool)
	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()
	<-forever
}
Reference

几种常见的MQ总结对比

消息队列之RabbitMQ

服务为什么会崩溃

17 | 消息队列:秒杀时如何处理每秒上万次的下单请求?

docker安装RabbitMq

RabbitMQ Go语言客户端教程1——HelloWorld

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/990226.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存