- 优点:时效性强,可以立即得到结果
- 缺点:
- 耦合度高,每次加入新需求都要该原来的代码
- 性能和吞吐能力下降,调用者需要等待提供者响应后才能继续下一步 *** 作
- 有额外资源消耗,调用者在等待服务响应过程中,不能释放请求占用的资源
- 有级联失效问题,如果服务提供者出现问题,所有调用方都会跟着出现问题
优点:
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量削峰:当有大量请求时,可以先放在broker里,服务器根据自己的处理速度再去broker里处理请求,以此来缓解服务器压力
缺点:
- 依赖于Broker的可靠性、安全性、吞吐能力,所以必须保证broker足够可靠
- 架构复杂,业务没有明显的业务流程线,不好追踪管理
MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是上述中事件驱动架构中的Broker,下面通过黑马的截图来看看几种常见的MQ区别
RabbitMQ学习RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网:https://www.rabbitmq.com/,结构如下
基本概念
- channel: *** 作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息的队列
- virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组
安装
这次安装是在以前安装好的docker容器上进行的安装,步骤如下
- 下载RabbitMQ
docker pull rabbitmq:3-management #冒号后面是版本号
阿萨
-
下载好后运行容器
docker run -e RABBITMQ_DEFAULT_USER=itcast #用户名 -e RABBITMQ_DEFAULT_PASS=123321 #密码 --name mq #容器名字 --hostname mq1 #主机名 -p 15672:15672 #rabbitmq的管理平台端口,可通过该端口登录管理平台 -p 5672:5672 #收发消息的端口 -d #后台运行 rabbitmq:3-management #镜像名称
完成后就安装好了
可以通过输入IP地址和端口在浏览器访问
常见的消息模型
基本消息队列,下图是官网解释
工作消息队列概念
以上两种都是通过队列来发送,没有exchange交换机的概念,并且消息被消费后立即销毁
以下三种都属于发布订阅模式(允许将同一个消息发送给多个消费者),根据交换机类型不同又分为三类,分别是
- 广播模式(Fanout),一次向多个消费者发送消息,x指的是exchange交换机
-
路由模式(Direct),选择性接收消息
- 主题模式(Topic),基于模式(主题)接收消息
基本消息队列消息发送流程
- 创建connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列消息接收流程
- 创建connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
AMQP:是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求
SpringAMQP:是基于AMQP协议的一套api规范,提供了模板来收发消息。包括了两部分,其中spring-amqp是基础抽象,spring-rabbit是底层默认实现的,官网地址Spring AMQP
通俗来讲就是通过官方原生的方法来创建消息队列太麻烦了,spring现在通过封装底层实现来帮我们简化了创建流程和代码难度
下面通过SpringAMQP创建做一个最简单的基本消息队列
- 创建一个父工程并引入相关依赖
org.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-starter-test -
创建推送者Publisher子项目,并且配置yml文件
logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 192.168.65.129 # rabbitMQ的ip地址 port: 5672 # rabbitmq收发消息的端口 username: itcast # rabbitmq用户名 password: 123321 # rabbitmq密码 virtual-host: / #虚拟主机名字
-
写一个单元测试
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; //引入rabbitmq的协议标准对象 @Test public void testSendMessage2SimpleQueue() { String queueName = "simple.queue"; //队列名字 String message = "hello, spring amqp!"; //队列消息 rabbitTemplate.convertAndSend(queueName, message); } }
这里注意,我指定的队列名字叫做"simple.queue",但是在rabbitmq里面没有这个队列,需要我手动去创建一个,如下
创建成功后,启动单元测试,点击队列名字,看一下效果
以上就是发送消息到队列的过程,但并没有接收消息,现在写如何接收队列里的消息
- 创建Consumer消费者springboot子工程
- 引入依赖(父工程引过了所以可以不用引)
- 配置yml文件
logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 192.168.65.129 # rabbitMQ的ip地址 port: 5672 # 端口 username: itcast password: 123321 virtual-host: /
-
创建一个监听类,这里取名listener,
@Component //注入为spring的一个bean,好让spring监控到 public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") //这里是指定我们监听的是哪一个消息队列 public void listenSimpleQueue(String msg){ System.out.println("消费者接收到的消息是:"+msg); } }
完成后启动application启动类,然后就可以接收消息了,如下
注意:消费完后队列里的消息会被销毁,所以再去rabbitmq的队列里就看不见已经消费过的消息了
下面通过SpringAMQP创建做一个工作队列(work模型),就是将多个消费者绑定到同一队列,同一条消息只会被一个消费者处理
任务:在一秒内由p端发送50条数据,C1处理速度是40条每秒,C2是10条每秒,让两个消费者协同处理
- 在发送端写好发送代码
@Test public void WorkQueue() throws InterruptedException { String queueName="simple.queue"; String message="工作队列"; for (int i = 0; i <50 ; i++) { rabbitTemplate.convertAndSend(queueName,message+i); Thread.sleep(20);//这里表示50条数据在一秒钟发完 }
-
在消费者端写好两个消费者代码
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到的消息是:"+msg+LocalTime.now()); Thread.sleep(25);//表示消费者1每秒处理40个消息 } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2接收到的消息是:"+msg+LocalTime.now()); Thread.sleep(100); //表示消费者2每秒处理10个消息 }
这里要注意的是,rabbitmq默认才是消息预取的方式,就是两个消费者都先把所有消息从队列里拿过来再处理,如果有50个消息,每个消费者就先拿25个,到手后再一起处理,如果两个消费者的消费能力不一样,这显然是有弊端的,所以需要进行如下配置
-
在yml文件做配置
logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 192.168.65.129 # rabbitMQ的ip地址 port: 5672 # 端口 username: itcast password: 123321 virtual-host: / listener: simple: prefetch: 1 #这是新加的配置,表示每个消费者最多先预取1个消息,消费完后再取下一个,保证了每个消费者都根据自己的消费速度来处理消息
-
然后分别启动消息发送者和消费者,查看情况,如果电脑处理器正常一般就是消费者一接收了40条消息,消费者2接受了10条消息,耗时在一秒内
首先是广播模式案列实现,Fanout exchange会将收到的消息路由到每一个跟其绑定的queue
步骤如下:
- 在消息发送端写发送代码,以前都是直接发给队列,现在是发送给交换机,所以代码如下
@Test public void testSendFanoutExchange() { // 交换机名称 String exchangeName = "exchange01"; // 消息 String message = "hello, every one!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "", message); }
-
在消费者端新建一个配置类,用于建立交换机和队列名称,并将交换机和队列绑定,代码如下
@Configuration//表示该类为配置类 public class FanoutConfig { // 交换机有多种类型,这里是广播模式,所以声明FanoutExchange交换机名字叫做"exchange01" @Bean //声明为一个bean交给spring管理 public FanoutExchange fanoutExchange(){ return new FanoutExchange("exchange01"); } // 声明队列名字叫做fanout.queue1 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } // 绑定队列1到交换机 @Bean public Binding fanoutBinding(Queue fanoutQueue1,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);//表示将fanoutQueue1队列绑定到fanoutExchange交换机 } // fanout.queue2 @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } // 绑定队列2到交换机 @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue2) .to(fanoutExchange); } }
-
完成消费者代码的编写
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到fanout.queue1的消息是:"+"{"+msg+"}"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到fanout.queue2的消息是:"+"{"+msg+"}"); }
完成后重启服务,效果如下
如此便实现了一个消息被多个消费者消费的目的
路由模式(routes)
前面介绍的广播模式,可以将一个消息发送给多个与路由器绑定的队列;而现在介绍的路由模式,是把消息发送给与路由器绑定了相同key值的队列,实现了消息的指定发送,即选择性接收消息
发布者发送消息时,会指定一个RoutingKey,而队列则会绑定一个bindingKey,当这两个参数的值相同时,路由器才会将消息路由到队列中去,现在做一个案列,来实现消息的指定发送
- 这次先写发送端代码
@Test public void testSendDirectExchange() { // 交换机名称 String exchangeName = "direct exchange"; // 消息 String message = "发给所有队列"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "common" , message); }
-
写接收端代码,这次不通过bean的方式来写,直接在注解上配置队列名字和路由器名字及类型
//路由模式 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), //声明队列的名字 exchange = @Exchange(name = "direct exchange", type = ExchangeTypes.DIRECT), //声明路由器的名字和路由器的类型,这里是direct类型 key = {"common", "队列A的key"} //声明路由器的key值,这里声明一个共有的common和一个特有的”队列A的key“ )) public void listenDirectQueue1(String msg){ System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value= @Queue(name = "direct.queue2"), exchange = @Exchange(name = "direct exchange",type = ExchangeTypes.DIRECT), key = {"common","队列B的key"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者接收到direct.queue2的消息"+"【" + msg + "】"); }
注意看接收端的代码,有一个key值是common,即是共有的key值,我们在发送端也写有common这个值,启动服务看效果
可以发现通过共有的key值,路由器把消息分别发送给了两个队列,由两个消费者端接收到了一样的消息,这和广播模式很像
现在修改routingKey值,改为某个队列特有的key,将key值改为"队列A的key",看效果
现在就只有direct.queue1队列才能收到消息了,另外一个队列因为key值不同所有没能收到消息,由此实现了路由模式的选择性接收消息的效果
话题模式(Topic)
TopicExchange和DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割
并且在绑定时可以以#或*作为通配符
#:代指一个或多个单词
*:代指一个单词
话题模式就是要同一个话题的消息才能被接收,比如有中国天气和外国天气,中国军事和外国军事,这时如果设置routingKey为中国.#,则发送中国天气和中国军事这两个板块。如果设置为#.军事,则发送中国军事和外国军事这两个板块,下面举例
- 先写发送端
@Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "Topic exchange"; // 消息 String message = "这是中国军事频道"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.junshi" , message); } @Test public void testSendTopicExchange2() { // 交换机名称 String exchangeName = "Topic exchange"; // 消息 String message = "这是外国军事频道"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "waiguo.junshi" , message); } }
-
写接收端代码,和路由模式差不多,就是bandingKey要改一下
@RabbitListener(bindings = @QueueBinding( value = @Queue(name= "topce.queue1"), exchange = @Exchange(name= "Topic exchange" ,type = ExchangeTypes.TOPIC), key = "china.#" ) ) public void listenTopicQueue1(String msg){ System.out.println("只接收到与china相关的消息,不管是什么内容"+"【" + msg + "】"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name= "topce.queue2"), exchange = @Exchange(name= "Topic exchange" ,type = ExchangeTypes.TOPIC), key = "#.junshi" ) ) public void listenTopicQueue2(String msg){ System.out.println("只接收到junshi相关的消息,不管是哪个国家的"+"【" + msg + "】"); }
先写启动中国军事频道的测试类,会发现接收端所有中国的话题会报道中国军事,军事话题会报道军事
再把发送端代码的中国军事改为天气
@Test public void testSendTopicExchange() { // 交换机名称 String exchangeName = "Topic exchange"; // 消息 String message = "今天中国气温适中"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.tianqi" , message); }
效果如下
在springAMQP的发送方法中,接收消息的类型是Object,也就是所我们可以发送任意对象的消息,SpringAMQP会自动序列化字节后发送
下面通过创建对象的方式,将对象从发送端传递到消费端,看看如何实现
- 在夫父工程引入相关依赖
com.fasterxml.jackson.core jackson-databind -
在消费者端配置类声明一个队列
@Bean public Queue objectQueue(){ return new Queue("object.queue"); }
-
在消费者端写好接收代码
@RabbitListener(queues = "object.queue") public void listenObjectQueue2(List msg){ System.out.println("接收到object.queue的消息:" + msg);}
-
在发送端写发送代码,以对象形势发送,这里用list对象
@Test public void testSendObject() { List list =new ArrayList(); list.add("小鱼"); list.add("大鱼"); // 发送消息 rabbitTemplate.convertAndSend( "object.queue",list); }
-
分别在发送端和接收端的启动类写转换工具的方法
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
然后启动服务,效果如下
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)