目录指引:
springcloud教程(一)_Eureka,Nacos,Feign,Gateway 前往查看springcloud教程(二)_Docker 前往查看springcloud教程(三)_RabbitMQ 前往查看springcloud教程(四)_Elasticsearch 前往查看 RabbitMQ R.1 单机部署
在centos7中使用docker来安装
(1)下载镜像 方式一:从hub.docker.com在线拉取docker pull rabbitmq:3-management方式二:本地加载
将本地镜像包 mq.tar 上传到服务器中 /tmp 目录下,进入/tmp目录使用命令加载镜像
docker load -i mq.tar
使用 docker images 查看镜像列表
(2)运行mq容器第一个-p是rabbitmq管理平台端口
第二个-p是消息通信端口
docker run -e RABBITMQ_DEFAULT_USER=peppacatt -e RABBITMQ_DEFAULT_PASS=123456 --name rabbitmq_wsw --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
使用 docker ps 查看正在运行中的容器
访问 服务器ip:15672 进入管理平台,使用上面配置的账号和密码进入
- 发送者publisher把消息发送到exchange交换机交换机把消息路由到队列queue消费者再从队列当中获取消息,处理消息
rabbitmq中的几个概念:
channels: *** 作mq的工具exchange:路由消息到队列中queue:缓存消息virtual host(虚拟主机): 是对queue,exchange等资源的逻辑分组 R.2.2消息模型
参见 rabbitmq.com 官网中的文档
蓝色:交换机
红色:消息队列
p:发送者
c:消费者
前两种都是基于队列完整消息的发送和接收,没有涉及到交换机,不是完整的消息推送模式
发送者代码示例:
package cn.itcast.mq.helloworld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; public class PublisherTest { @Test public void testSendMessage() throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.5.132"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("peppacatt"); factory.setPassword("123456"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列 String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.发送消息 String message = "hello, peppacatt!"; channel.basicPublish("", queueName, null, message.getBytes()); System.out.println("发送消息成功:【" + message + "】"); // 5.关闭通道和连接 channel.close(); connection.close(); } }
消费者代码示例:
package cn.itcast.mq.helloworld; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerTest { public static void main(String[] args) throws IOException, TimeoutException { // 1.建立连接 ConnectionFactory factory = new ConnectionFactory(); // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码 factory.setHost("192.168.5.132"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("peppacatt"); factory.setPassword("123456"); // 1.2.建立连接 Connection connection = factory.newConnection(); // 2.创建通道Channel Channel channel = connection.createChannel(); // 3.创建队列(此处还要创建队列的原因:生产者和消费者启动的顺序是不确定的,为了避免消费者使用队列的时候队列还不存在的情况) String queueName = "simple.queue"; channel.queueDeclare(queueName, false, false, false, null); // 4.订阅消息 channel.basicConsume(queueName, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 5.处理消息 String message = new String(body); System.out.println("接收到消息:【" + message + "】"); } }); System.out.println("等待接收消息。。。。"); } }
消息在被消费者接收之后,rabbitmq消息队列中的消息会被立即删除
总结:
什么是SpringAMQP
步骤:
(2)publisher发送者服务org.springframework.boot spring-boot-starter-amqp
1.在publisher发送者服务中编辑yml,添加mq连接信息
spring: rabbitmq: host: 192.168.217.136 #RabbitMQ的ip地址 port: 5672 #端口 username: peppacatt #用户名 password: 123456 #密码 virtual-host: / #虚拟主机
2.在发送者服务中编写代码发送消息
注意:消息队列中的消息阅后即焚,如果此时consumer服务是开启的,将会把此时发送到队列中的消息接收,RabbitMQ管理平台中的队列将无消息显示,可先停掉consumer服务
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { //队列名称 String queueName = "simple.queue"; //要发送的消息 String msg = "hello SpringAMQP!!!"; rabbitTemplate.convertAndSend(queueName, msg); } }(3)consumer消费者服务
1.在consumer消费者服务中编辑yml,添加mq连接信息
spring: rabbitmq: host: 192.168.217.136 #rabbitmq的ip地址 port: 5672 #端口 username: peppacatt #用户名 password: 123456 #密码 virtual-host: / #虚拟主机
2.在consumer消费者服务中新建一个类,编写消费逻辑
package cn.itcast.mq.config; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitMQListener { //queues 指定队列名称(可指定多个也可指定一个) @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg){ System.out.println("消费者接收到simple.queue的消息:["+msg+"]"); } }
3.运行consumer启动类main函数,服务启动后将会持续监听上面指定的队列 simple.queue 中的消息,只要有发送者向该队列发送消息,consumer服务就会接受该消息
R.3.2 工作消息队列 (1) 引入依赖…
(2)publisher服务1.编辑yml
…
2.编写发送代码
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; // @Test // public void testSimpleQueue() { // //队列名称 // String queueName = "simple.queue"; // //要发送的消息 // String msg = "hello SpringAMQP!!!"; // rabbitTemplate.convertAndSend(queueName, msg); // } @Test public void testSendMsgWorkQueue() throws InterruptedException { //队列名称 String queueName = "simple.queue"; //要发送的消息 String msg = "hello SpringAMQP!!!"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend(queueName, msg+"第"+i+"条"); //不要一下发完 1秒发送50条消息 Thread.sleep(20); } } }(3)consumer服务
package cn.itcast.mq.config; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.LocalTime; @Component public class RabbitMQListener { //queues 指定队列名称(可指定多个也可指定一个) // @RabbitListener(queues = "simple.queue") // public void listenSimpleQueue(String msg){ // System.out.println("消费者接收到simple.queue的消息:["+msg+"]"); // } @RabbitListener(queues = "simple.queue") public void listenWorkQueue(String msg) throws InterruptedException { System.out.println("消费者0号wwwwwwwwwwwwwwwwww接收到simple.queue的消息:[" + msg + "]" + LocalTime.now()); //一秒接受50条消息 Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1号sssssssssssssssssss接收到simple.queue的消息:[" + msg + "]" + LocalTime.now()); //一秒接受5条消息 Thread.sleep(200); } }结果和总结
- 先启动consumer服务运行publisher发送代码查看consumer输出端的信息,
发现消费者0号接受的消息序号都是偶数,消费者1号接受的消息序号都是奇数,而且接受消息的时间大于了一秒
上面我们定义的两个消费者接收消息的能力不同,照理来说应该是消费者0号接受的消息多一些才合理,结果是两个消费者接收的消息数量是一样的,这是由于RabbitMQ内部的消息预取机制造成的
消息预取:当大量消息消息到达队列时,两个消费者会提前将消息拿过去(不管自己处理消息的能力强弱),平均分配队列中的消息,导致消息的接受大于了一秒
设置了消息预取限制之后,谁的能力强谁处理的消息就越多,可在案例中打印的时间看出处理消息的时间差不多为1秒左右
案例:
1.在consumer服务中声明Exchange,Queue,Binding
package cn.itcast.mq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutExchangeConfig { //声明FanoutExchange交换机 @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("peppacatt.fanout"); } //声明第一个队列 @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } //将第一个队列绑定到交换机 @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } //声明第二个队列 @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } //将第二个队列绑定到交换机 @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
运行consumer服务启动类,查看RabbitMQ管理端:
2.编写RabbitListener
package cn.itcast.mq.config; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitMQListener { //queues 指定队列名称(可指定多个也可指定一个) // @RabbitListener(queues = "simple.queue") // public void listenSimpleQueue(String msg){ // System.out.println("消费者接收到simple.queue的消息:["+msg+"]"); // } // @RabbitListener(queues = "simple.queue") // public void listenWorkQueue(String msg) throws InterruptedException { // System.out.println("消费者0号wwwwwwwwwwwwwwwwww接收到simple.queue的消息:[" + msg + "]" + LocalTime.now()); // //一秒接受50条消息 // Thread.sleep(20); // } // // @RabbitListener(queues = "simple.queue") // public void listenWorkQueue1(String msg) throws InterruptedException { // System.out.println("消费者1号sssssssssssssssssss接收到simple.queue的消息:[" + msg + "]" + LocalTime.now()); // //一秒接受5条消息 // Thread.sleep(200); // } @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) throws InterruptedException { System.out.println("消费者接收到fanout.queue1的消息:[" + msg + "]"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) throws InterruptedException { System.out.println("消费者接收到fanout.queue2的消息:[" + msg + "]"); } }(3)publisher服务
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; // @Test // public void testSimpleQueue() { // //队列名称 // String queueName = "simple.queue"; // //要发送的消息 // String msg = "hello SpringAMQP!!!"; // rabbitTemplate.convertAndSend(queueName, msg); // } // @Test // public void testSendMsgWorkQueue() throws InterruptedException { // //队列名称 // String queueName = "simple.queue"; // //要发送的消息 // String msg = "hello SpringAMQP!!!"; // for (int i = 0; i < 50; i++) { // rabbitTemplate.convertAndSend(queueName, msg+"第"+i+"条"); // //不要一下发完 1秒发送50条消息 // Thread.sleep(20); // } // } @Test public void testSendFanoutExchange(){ //交换机名称 String exchangeName = "peppacatt.fanout"; //消息 String msg = "hello Fanout Exchange!!!"; //发送消息 rabbitTemplate.convertAndSend(exchangeName, "", msg); } }
执行测试方法
案例:
步骤:
编写RabbitListener
package cn.itcast.mq.config; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitMQListener { //queues 指定队列名称(可指定多个也可指定一个) // @RabbitListener(queues = "simple.queue") // public void listenSimpleQueue(String msg){ // System.out.println("消费者接收到simple.queue的消息:["+msg+"]"); // } // @RabbitListener(queues = "simple.queue") // public void listenWorkQueue(String msg) throws InterruptedException { // System.out.println("消费者0号wwwwwwwwwwwwwwwwww接收到simple.queue的消息:[" + msg + "]" + LocalTime.now()); // //一秒接受50条消息 // Thread.sleep(20); // } // // @RabbitListener(queues = "simple.queue") // public void listenWorkQueue1(String msg) throws InterruptedException { // System.out.println("消费者1号sssssssssssssssssss接收到simple.queue的消息:[" + msg + "]" + LocalTime.now()); // //一秒接受5条消息 // Thread.sleep(200); // } // @RabbitListener(queues = "fanout.queue1") // public void listenFanoutQueue1(String msg) throws InterruptedException { // System.out.println("消费者接收到fanout.queue1的消息:[" + msg + "]"); // } // // @RabbitListener(queues = "fanout.queue2") // public void listenFanoutQueue2(String msg) throws InterruptedException { // System.out.println("消费者接收到fanout.queue2的消息:[" + msg + "]"); // } //在@RabbitListener注解上直接声明Queue和Exchange省去了声明Bean @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1") , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT) , key = {"red", "blue"} )) public void listenerDirectQueue1(String msg) { System.out.println("消费者接收到direct.queue1的消息:[" + msg + "]"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2") , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT) , key = {"red", "yellow"} )) public void listenerDirectQueue2(String msg) { System.out.println("消费者接收到direct.queue2的消息:[" + msg + "]"); } }
启动consumer服务,查看RabbitMQ管理端
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; // @Test // public void testSimpleQueue() { // //队列名称 // String queueName = "simple.queue"; // //要发送的消息 // String msg = "hello SpringAMQP!!!"; // rabbitTemplate.convertAndSend(queueName, msg); // } // @Test // public void testSendMsgWorkQueue() throws InterruptedException { // //队列名称 // String queueName = "simple.queue"; // //要发送的消息 // String msg = "hello SpringAMQP!!!"; // for (int i = 0; i < 50; i++) { // rabbitTemplate.convertAndSend(queueName, msg+"第"+i+"条"); // //不要一下发完 1秒发送50条消息 // Thread.sleep(20); // } // } // @Test // public void testSendFanoutExchange(){ // //交换机名称 // String exchangeName = "peppacatt.fanout"; // //消息 // String msg = "hello Fanout Exchange!!!"; // //发送消息 // rabbitTemplate.convertAndSend(exchangeName, "", msg); // } @Test public void testSendDirectExchange() { //交换机名称 String exchangeName = "peppacatt.direct"; //消息 String msg = "hello Direct Exchange!!!"; //发送消息 rabbitTemplate.convertAndSend(exchangeName, "blue", msg); } }
运行test
只有绑定了指定routingKey的队列才能接收到消息
案例:
package cn.itcast.mq.config; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitMQListener { //queues 指定队列名称(可指定多个也可指定一个) // @RabbitListener(queues = "simple.queue") // public void listenSimpleQueue(String msg){ // System.out.println("消费者接收到simple.queue的消息:["+msg+"]"); // } // @RabbitListener(queues = "simple.queue") // public void listenWorkQueue(String msg) throws InterruptedException { // System.out.println("消费者0号wwwwwwwwwwwwwwwwww接收到simple.queue的消息:[" + msg + "]" + LocalTime.now()); // //一秒接受50条消息 // Thread.sleep(20); // } // // @RabbitListener(queues = "simple.queue") // public void listenWorkQueue1(String msg) throws InterruptedException { // System.out.println("消费者1号sssssssssssssssssss接收到simple.queue的消息:[" + msg + "]" + LocalTime.now()); // //一秒接受5条消息 // Thread.sleep(200); // } // @RabbitListener(queues = "fanout.queue1") // public void listenFanoutQueue1(String msg) throws InterruptedException { // System.out.println("消费者接收到fanout.queue1的消息:[" + msg + "]"); // } // // @RabbitListener(queues = "fanout.queue2") // public void listenFanoutQueue2(String msg) throws InterruptedException { // System.out.println("消费者接收到fanout.queue2的消息:[" + msg + "]"); // } // //在@RabbitListener注解上直接声明Queue和Exchange省去了声明Bean // @RabbitListener(bindings = @QueueBinding( // value = @Queue(name = "direct.queue1") // , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT) // , key = {"red", "blue"} // )) // public void listenerDirectQueue1(String msg) { // System.out.println("消费者接收到direct.queue1的消息:[" + msg + "]"); // } // // @RabbitListener(bindings = @QueueBinding( // value = @Queue(name = "direct.queue2") // , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT) // , key = {"red", "yellow"} // )) // public void listenerDirectQueue2(String msg) { // System.out.println("消费者接收到direct.queue2的消息:[" + msg + "]"); // } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1") , exchange = @Exchange(name = "peppacatt.topic", type = ExchangeTypes.TOPIC) , key = "china.#" )) public void listenerTopicQueue1(String msg) { System.out.println("消费者接收到topic.queue1的消息:[" + msg + "]"); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2") , exchange = @Exchange(name = "peppacatt.topic", type = ExchangeTypes.TOPIC) , key = "#.news" )) public void listenerTopicQueue2(String msg) { System.out.println("消费者接收到topic.queue2的消息:[" + msg + "]"); } }
运行consumer服务
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; // @Test // public void testSimpleQueue() { // //队列名称 // String queueName = "simple.queue"; // //要发送的消息 // String msg = "hello SpringAMQP!!!"; // rabbitTemplate.convertAndSend(queueName, msg); // } // @Test // public void testSendMsgWorkQueue() throws InterruptedException { // //队列名称 // String queueName = "simple.queue"; // //要发送的消息 // String msg = "hello SpringAMQP!!!"; // for (int i = 0; i < 50; i++) { // rabbitTemplate.convertAndSend(queueName, msg+"第"+i+"条"); // //不要一下发完 1秒发送50条消息 // Thread.sleep(20); // } // } // @Test // public void testSendFanoutExchange(){ // //交换机名称 // String exchangeName = "peppacatt.fanout"; // //消息 // String msg = "hello Fanout Exchange!!!"; // //发送消息 // rabbitTemplate.convertAndSend(exchangeName, "", msg); // } // @Test // public void testSendDirectExchange() { // //交换机名称 // String exchangeName = "peppacatt.direct"; // //消息 // String msg = "hello Direct Exchange!!!"; // //发送消息 // rabbitTemplate.convertAndSend(exchangeName, "blue", msg); // } @Test public void testSendTopicExchange() { //交换机名称 String exchangeName = "peppacatt.topic"; //消息 String msg = "hello Top Exchange!!!"; //发送消息 rabbitTemplate.convertAndSend(exchangeName, "china.news", msg); } }
执行test方法
在形参列表内按ctrl+p
package cn.itcast.mq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutExchangeConfig { // //声明FanoutExchange交换机 // @Bean // public FanoutExchange fanoutExchange(){ // return new FanoutExchange("peppacatt.fanout"); // } // // //声明第一个队列 // @Bean // public Queue fanoutQueue1(){ // return new Queue("fanout.queue1"); // } // // //将第一个队列绑定到交换机 // @Bean // public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ // return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); // } // // //声明第二个队列 // @Bean // public Queue fanoutQueue2(){ // return new Queue("fanout.queue2"); // } // // //将第二个队列绑定到交换机 // @Bean // public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ // return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); // } @Bean public Queue objectQueue(){ return new Queue("object.queue"); } }
运行consumer服务
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; import java.util.Map; @RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest { @Autowired private RabbitTemplate rabbitTemplate; // @Test // public void testSimpleQueue() { // //队列名称 // String queueName = "simple.queue"; // //要发送的消息 // String msg = "hello SpringAMQP!!!"; // rabbitTemplate.convertAndSend(queueName, msg); // } // @Test // public void testSendMsgWorkQueue() throws InterruptedException { // //队列名称 // String queueName = "simple.queue"; // //要发送的消息 // String msg = "hello SpringAMQP!!!"; // for (int i = 0; i < 50; i++) { // rabbitTemplate.convertAndSend(queueName, msg+"第"+i+"条"); // //不要一下发完 1秒发送50条消息 // Thread.sleep(20); // } // } // @Test // public void testSendFanoutExchange(){ // //交换机名称 // String exchangeName = "peppacatt.fanout"; // //消息 // String msg = "hello Fanout Exchange!!!"; // //发送消息 // rabbitTemplate.convertAndSend(exchangeName, "", msg); // } // @Test // public void testSendDirectExchange() { // //交换机名称 // String exchangeName = "peppacatt.direct"; // //消息 // String msg = "hello Direct Exchange!!!"; // //发送消息 // rabbitTemplate.convertAndSend(exchangeName, "blue", msg); // } // @Test // public void testSendTopicExchange() { // //交换机名称 // String exchangeName = "peppacatt.topic"; // //消息 // String msg = "hello Top Exchange!!!"; // //发送消息 // rabbitTemplate.convertAndSend(exchangeName, "china.news", msg); // } @Test public void testSendObjectQueue() { Mapmsg = new HashMap(); msg.put("name", "小王"); msg.put("age", 21); //发送消息 rabbitTemplate.convertAndSend("object.queue", msg); } }
执行test方法
1.在publisher服务或者父工程引入依赖
com.fasterxml.jackson.core jackson-databind
2.在publisher服务中声明Bean MessageConverter
package cn.itcast.mq.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MessageConverterConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
清理掉RabbitMQ中队列object.queue中之前发送的消息
重新执行test
1.在consumer服务或父类引入依赖
com.fasterxml.jackson.core jackson-databind
2.在consumer服务中配置Bean MessageConverter
…
3.编写RabbitListener
package cn.itcast.mq.config; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component public class RabbitMQListener { //queues 指定队列名称(可指定多个也可指定一个) // @RabbitListener(queues = "simple.queue") // public void listenSimpleQueue(String msg){ // System.out.println("消费者接收到simple.queue的消息:["+msg+"]"); // } // @RabbitListener(queues = "simple.queue") // public void listenWorkQueue(String msg) throws InterruptedException { // System.out.println("消费者0号wwwwwwwwwwwwwwwwww接收到simple.queue的消息:[" + msg + "]" + LocalTime.now()); // //一秒接受50条消息 // Thread.sleep(20); // } // // @RabbitListener(queues = "simple.queue") // public void listenWorkQueue1(String msg) throws InterruptedException { // System.out.println("消费者1号sssssssssssssssssss接收到simple.queue的消息:[" + msg + "]" + LocalTime.now()); // //一秒接受5条消息 // Thread.sleep(200); // } // @RabbitListener(queues = "fanout.queue1") // public void listenFanoutQueue1(String msg) throws InterruptedException { // System.out.println("消费者接收到fanout.queue1的消息:[" + msg + "]"); // } // // @RabbitListener(queues = "fanout.queue2") // public void listenFanoutQueue2(String msg) throws InterruptedException { // System.out.println("消费者接收到fanout.queue2的消息:[" + msg + "]"); // } // //在@RabbitListener注解上直接声明Queue和Exchange省去了声明Bean // @RabbitListener(bindings = @QueueBinding( // value = @Queue(name = "direct.queue1") // , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT) // , key = {"red", "blue"} // )) // public void listenerDirectQueue1(String msg) { // System.out.println("消费者接收到direct.queue1的消息:[" + msg + "]"); // } // // @RabbitListener(bindings = @QueueBinding( // value = @Queue(name = "direct.queue2") // , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT) // , key = {"red", "yellow"} // )) // public void listenerDirectQueue2(String msg) { // System.out.println("消费者接收到direct.queue2的消息:[" + msg + "]"); // } // @RabbitListener(bindings = @QueueBinding( // value = @Queue(name = "topic.queue1") // , exchange = @Exchange(name = "peppacatt.topic", type = ExchangeTypes.TOPIC) // , key = "china.#" // )) // public void listenerTopicQueue1(String msg) { // System.out.println("消费者接收到topic.queue1的消息:[" + msg + "]"); // } // // @RabbitListener(bindings = @QueueBinding( // value = @Queue(name = "topic.queue2") // , exchange = @Exchange(name = "peppacatt.topic", type = ExchangeTypes.TOPIC) // , key = "#.news" // )) // public void listenerTopicQueue2(String msg) { // System.out.println("消费者接收到topic.queue2的消息:[" + msg + "]"); // } @RabbitListener(queues = "object.queue") public void listenObjectQueue(Mapmsg){ System.out.println("接收到object.queue的消息:["+msg+"]"); } }
运行consumer服务
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)