springcloud教程(三)

springcloud教程(三),第1张

springcloud教程(三)

目录指引:

springcloud教程(一)_Eureka,Nacos,Feign,Gateway 前往查看springcloud教程(二)_Docker 前往查看springcloud教程(三)_RabbitMQ 前往查看springcloud教程(四)_Elasticsearch 前往查看 RabbitMQ R.1 单机部署

在centos7中使用docker来安装

(1)下载镜像 方式一:从hub.docker.com在线拉取
docker pull rabbitmq:3-management
方式二:本地加载

将本地镜像包 mq.tar 上传到服务器中 /tmp 目录下,进入/tmp目录使用命令加载镜像

docker load -i mq.tar

使用 docker images 查看镜像列表

(2)运行mq容器

第一个-p是rabbitmq管理平台端口
第二个-p是消息通信端口

docker run 
 -e RABBITMQ_DEFAULT_USER=peppacatt 
 -e RABBITMQ_DEFAULT_PASS=123456 
 --name rabbitmq_wsw 
 --hostname mq1 
 -p 15672:15672 
 -p 5672:5672 
 -d 
 rabbitmq:3-management

使用 docker ps 查看正在运行中的容器
访问 服务器ip:15672 进入管理平台,使用上面配置的账号和密码进入

R.2 概述 R.2.1 结构和概念

    发送者publisher把消息发送到exchange交换机交换机把消息路由到队列queue消费者再从队列当中获取消息,处理消息

rabbitmq中的几个概念:

channels: *** 作mq的工具exchange:路由消息到队列中queue:缓存消息virtual host(虚拟主机): 是对queue,exchange等资源的逻辑分组 R.2.2消息模型

参见 rabbitmq.com 官网中的文档
蓝色:交换机
红色:消息队列
p:发送者
c:消费者

前两种都是基于队列完整消息的发送和接收,没有涉及到交换机,不是完整的消息推送模式

(1)BasicQueue基本消息队列

发送者代码示例:

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.5.132");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("peppacatt");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, peppacatt!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

消费者代码示例:

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.5.132");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("peppacatt");
        factory.setPassword("123456");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列(此处还要创建队列的原因:生产者和消费者启动的顺序是不确定的,为了避免消费者使用队列的时候队列还不存在的情况)
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

消息在被消费者接收之后,rabbitmq消息队列中的消息会被立即删除

总结:

(2)WorkQueue工作消息队列

R.3 SpringAMQP

什么是SpringAMQP

R.3.1 基本消息队列


步骤:

(1)在父工程中引入依赖
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
(2)publisher发送者服务

1.在publisher发送者服务中编辑yml,添加mq连接信息

spring:
  rabbitmq:
    host: 192.168.217.136 #RabbitMQ的ip地址
    port: 5672 #端口
    username: peppacatt #用户名
    password: 123456 #密码
    virtual-host: / #虚拟主机

2.在发送者服务中编写代码发送消息
注意:消息队列中的消息阅后即焚,如果此时consumer服务是开启的,将会把此时发送到队列中的消息接收,RabbitMQ管理平台中的队列将无消息显示,可先停掉consumer服务

package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Test
    public void testSimpleQueue() {
        //队列名称
        String queueName = "simple.queue";
        //要发送的消息
        String msg = "hello SpringAMQP!!!";
        rabbitTemplate.convertAndSend(queueName, msg);
    }

}

(3)consumer消费者服务

1.在consumer消费者服务中编辑yml,添加mq连接信息

spring:
  rabbitmq:
    host: 192.168.217.136 #rabbitmq的ip地址
    port: 5672 #端口
    username: peppacatt #用户名
    password: 123456 #密码
    virtual-host: / #虚拟主机

2.在consumer消费者服务中新建一个类,编写消费逻辑

package cn.itcast.mq.config;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQListener {

    //queues 指定队列名称(可指定多个也可指定一个)
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        System.out.println("消费者接收到simple.queue的消息:["+msg+"]");
    }

}

3.运行consumer启动类main函数,服务启动后将会持续监听上面指定的队列 simple.queue 中的消息,只要有发送者向该队列发送消息,consumer服务就会接受该消息

R.3.2 工作消息队列

(1) 引入依赖

(2)publisher服务

1.编辑yml

2.编写发送代码

package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;


//    @Test
//    public void testSimpleQueue() {
//        //队列名称
//        String queueName = "simple.queue";
//        //要发送的消息
//        String msg = "hello SpringAMQP!!!";
//        rabbitTemplate.convertAndSend(queueName, msg);
//    }

    @Test
    public void testSendMsgWorkQueue() throws InterruptedException {
        //队列名称
        String queueName = "simple.queue";
        //要发送的消息
        String msg = "hello SpringAMQP!!!";
        for (int i = 0; i < 50; i++) {
            rabbitTemplate.convertAndSend(queueName, msg+"第"+i+"条");
            //不要一下发完 1秒发送50条消息
            Thread.sleep(20);
        }
    }

}

(3)consumer服务
package cn.itcast.mq.config;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

@Component
public class RabbitMQListener {

    //queues 指定队列名称(可指定多个也可指定一个)
//    @RabbitListener(queues = "simple.queue")
//    public void listenSimpleQueue(String msg){
//        System.out.println("消费者接收到simple.queue的消息:["+msg+"]");
//    }

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue(String msg) throws InterruptedException {
        System.out.println("消费者0号wwwwwwwwwwwwwwwwww接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
        //一秒接受50条消息
        Thread.sleep(20);
    }

    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1号sssssssssssssssssss接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
        //一秒接受5条消息
        Thread.sleep(200);
    }

}

结果和总结
    先启动consumer服务运行publisher发送代码查看consumer输出端的信息,

发现消费者0号接受的消息序号都是偶数,消费者1号接受的消息序号都是奇数,而且接受消息的时间大于了一秒
上面我们定义的两个消费者接收消息的能力不同,照理来说应该是消费者0号接受的消息多一些才合理,结果是两个消费者接收的消息数量是一样的,这是由于RabbitMQ内部的消息预取机制造成的
消息预取:当大量消息消息到达队列时,两个消费者会提前将消息拿过去(不管自己处理消息的能力强弱),平均分配队列中的消息,导致消息的接受大于了一秒

消息预取限制

设置了消息预取限制之后,谁的能力强谁处理的消息就越多,可在案例中打印的时间看出处理消息的时间差不多为1秒左右

R.3.3 发布订阅模型

R.3.3.1 发布订阅-FanoutExchange



案例:

(1)consumer服务

1.在consumer服务中声明Exchange,Queue,Binding

package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutExchangeConfig {

    //声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("peppacatt.fanout");
    }

    //声明第一个队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    //将第一个队列绑定到交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    //声明第二个队列
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    //将第二个队列绑定到交换机
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }

}

运行consumer服务启动类,查看RabbitMQ管理端:


2.编写RabbitListener

package cn.itcast.mq.config;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQListener {

    //queues 指定队列名称(可指定多个也可指定一个)
//    @RabbitListener(queues = "simple.queue")
//    public void listenSimpleQueue(String msg){
//        System.out.println("消费者接收到simple.queue的消息:["+msg+"]");
//    }

//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue(String msg) throws InterruptedException {
//        System.out.println("消费者0号wwwwwwwwwwwwwwwwww接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
//        //一秒接受50条消息
//        Thread.sleep(20);
//    }
//
//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue1(String msg) throws InterruptedException {
//        System.out.println("消费者1号sssssssssssssssssss接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
//        //一秒接受5条消息
//        Thread.sleep(200);
//    }

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) throws InterruptedException {
        System.out.println("消费者接收到fanout.queue1的消息:[" + msg + "]");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) throws InterruptedException {
        System.out.println("消费者接收到fanout.queue2的消息:[" + msg + "]");
    }

}

(3)publisher服务
package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;


//    @Test
//    public void testSimpleQueue() {
//        //队列名称
//        String queueName = "simple.queue";
//        //要发送的消息
//        String msg = "hello SpringAMQP!!!";
//        rabbitTemplate.convertAndSend(queueName, msg);
//    }

//    @Test
//    public void testSendMsgWorkQueue() throws InterruptedException {
//        //队列名称
//        String queueName = "simple.queue";
//        //要发送的消息
//        String msg = "hello SpringAMQP!!!";
//        for (int i = 0; i < 50; i++) {
//            rabbitTemplate.convertAndSend(queueName, msg+"第"+i+"条");
//            //不要一下发完 1秒发送50条消息
//            Thread.sleep(20);
//        }
//    }

    @Test
    public void testSendFanoutExchange(){
        //交换机名称
        String exchangeName = "peppacatt.fanout";
        //消息
        String msg = "hello Fanout Exchange!!!";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", msg);
    }

}

执行测试方法

R.3.3.2 发布订阅-DirectExchange


案例:

步骤:

(1)consumer服务

编写RabbitListener

package cn.itcast.mq.config;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQListener {

    //queues 指定队列名称(可指定多个也可指定一个)
//    @RabbitListener(queues = "simple.queue")
//    public void listenSimpleQueue(String msg){
//        System.out.println("消费者接收到simple.queue的消息:["+msg+"]");
//    }

//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue(String msg) throws InterruptedException {
//        System.out.println("消费者0号wwwwwwwwwwwwwwwwww接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
//        //一秒接受50条消息
//        Thread.sleep(20);
//    }
//
//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue1(String msg) throws InterruptedException {
//        System.out.println("消费者1号sssssssssssssssssss接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
//        //一秒接受5条消息
//        Thread.sleep(200);
//    }

//    @RabbitListener(queues = "fanout.queue1")
//    public void listenFanoutQueue1(String msg) throws InterruptedException {
//        System.out.println("消费者接收到fanout.queue1的消息:[" + msg + "]");
//    }
//
//    @RabbitListener(queues = "fanout.queue2")
//    public void listenFanoutQueue2(String msg) throws InterruptedException {
//        System.out.println("消费者接收到fanout.queue2的消息:[" + msg + "]");
//    }

    //在@RabbitListener注解上直接声明Queue和Exchange省去了声明Bean
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1")
            , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT)
            , key = {"red", "blue"}
    ))
    public void listenerDirectQueue1(String msg) {
        System.out.println("消费者接收到direct.queue1的消息:[" + msg + "]");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2")
            , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT)
            , key = {"red", "yellow"}
    ))
    public void listenerDirectQueue2(String msg) {
        System.out.println("消费者接收到direct.queue2的消息:[" + msg + "]");
    }

}

启动consumer服务,查看RabbitMQ管理端

(2)publisher服务
package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;


//    @Test
//    public void testSimpleQueue() {
//        //队列名称
//        String queueName = "simple.queue";
//        //要发送的消息
//        String msg = "hello SpringAMQP!!!";
//        rabbitTemplate.convertAndSend(queueName, msg);
//    }

//    @Test
//    public void testSendMsgWorkQueue() throws InterruptedException {
//        //队列名称
//        String queueName = "simple.queue";
//        //要发送的消息
//        String msg = "hello SpringAMQP!!!";
//        for (int i = 0; i < 50; i++) {
//            rabbitTemplate.convertAndSend(queueName, msg+"第"+i+"条");
//            //不要一下发完 1秒发送50条消息
//            Thread.sleep(20);
//        }
//    }

//    @Test
//    public void testSendFanoutExchange(){
//        //交换机名称
//        String exchangeName = "peppacatt.fanout";
//        //消息
//        String msg = "hello Fanout Exchange!!!";
//        //发送消息
//        rabbitTemplate.convertAndSend(exchangeName, "", msg);
//    }

    @Test
    public void testSendDirectExchange() {
        //交换机名称
        String exchangeName = "peppacatt.direct";
        //消息
        String msg = "hello Direct Exchange!!!";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
    }

}

运行test
只有绑定了指定routingKey的队列才能接收到消息

R.3.3.3 发布订阅-TopicExchange


案例:

(1)consumer服务
package cn.itcast.mq.config;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQListener {

    //queues 指定队列名称(可指定多个也可指定一个)
//    @RabbitListener(queues = "simple.queue")
//    public void listenSimpleQueue(String msg){
//        System.out.println("消费者接收到simple.queue的消息:["+msg+"]");
//    }

//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue(String msg) throws InterruptedException {
//        System.out.println("消费者0号wwwwwwwwwwwwwwwwww接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
//        //一秒接受50条消息
//        Thread.sleep(20);
//    }
//
//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue1(String msg) throws InterruptedException {
//        System.out.println("消费者1号sssssssssssssssssss接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
//        //一秒接受5条消息
//        Thread.sleep(200);
//    }

//    @RabbitListener(queues = "fanout.queue1")
//    public void listenFanoutQueue1(String msg) throws InterruptedException {
//        System.out.println("消费者接收到fanout.queue1的消息:[" + msg + "]");
//    }
//
//    @RabbitListener(queues = "fanout.queue2")
//    public void listenFanoutQueue2(String msg) throws InterruptedException {
//        System.out.println("消费者接收到fanout.queue2的消息:[" + msg + "]");
//    }

//    //在@RabbitListener注解上直接声明Queue和Exchange省去了声明Bean
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(name = "direct.queue1")
//            , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT)
//            , key = {"red", "blue"}
//    ))
//    public void listenerDirectQueue1(String msg) {
//        System.out.println("消费者接收到direct.queue1的消息:[" + msg + "]");
//    }
//
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(name = "direct.queue2")
//            , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT)
//            , key = {"red", "yellow"}
//    ))
//    public void listenerDirectQueue2(String msg) {
//        System.out.println("消费者接收到direct.queue2的消息:[" + msg + "]");
//    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1")
            , exchange = @Exchange(name = "peppacatt.topic", type = ExchangeTypes.TOPIC)
            , key = "china.#"
    ))
    public void listenerTopicQueue1(String msg) {
        System.out.println("消费者接收到topic.queue1的消息:[" + msg + "]");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2")
            , exchange = @Exchange(name = "peppacatt.topic", type = ExchangeTypes.TOPIC)
            , key = "#.news"
    ))
    public void listenerTopicQueue2(String msg) {
        System.out.println("消费者接收到topic.queue2的消息:[" + msg + "]");
    }



}

运行consumer服务

(2)publisher服务
package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;


//    @Test
//    public void testSimpleQueue() {
//        //队列名称
//        String queueName = "simple.queue";
//        //要发送的消息
//        String msg = "hello SpringAMQP!!!";
//        rabbitTemplate.convertAndSend(queueName, msg);
//    }

//    @Test
//    public void testSendMsgWorkQueue() throws InterruptedException {
//        //队列名称
//        String queueName = "simple.queue";
//        //要发送的消息
//        String msg = "hello SpringAMQP!!!";
//        for (int i = 0; i < 50; i++) {
//            rabbitTemplate.convertAndSend(queueName, msg+"第"+i+"条");
//            //不要一下发完 1秒发送50条消息
//            Thread.sleep(20);
//        }
//    }

//    @Test
//    public void testSendFanoutExchange(){
//        //交换机名称
//        String exchangeName = "peppacatt.fanout";
//        //消息
//        String msg = "hello Fanout Exchange!!!";
//        //发送消息
//        rabbitTemplate.convertAndSend(exchangeName, "", msg);
//    }

//    @Test
//    public void testSendDirectExchange() {
//        //交换机名称
//        String exchangeName = "peppacatt.direct";
//        //消息
//        String msg = "hello Direct Exchange!!!";
//        //发送消息
//        rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
//    }

    @Test
    public void testSendTopicExchange() {
        //交换机名称
        String exchangeName = "peppacatt.topic";
        //消息
        String msg = "hello Top Exchange!!!";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.news", msg);
    }

}

执行test方法

R.3.4 消息转换器 测试发送Object类型消息


在形参列表内按ctrl+p

(1)consumer服务
package cn.itcast.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutExchangeConfig {

//    //声明FanoutExchange交换机
//    @Bean
//    public FanoutExchange fanoutExchange(){
//        return new FanoutExchange("peppacatt.fanout");
//    }
//
//    //声明第一个队列
//    @Bean
//    public Queue fanoutQueue1(){
//        return new Queue("fanout.queue1");
//    }
//
//    //将第一个队列绑定到交换机
//    @Bean
//    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
//        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
//    }
//
//    //声明第二个队列
//    @Bean
//    public Queue fanoutQueue2(){
//        return new Queue("fanout.queue2");
//    }
//
//    //将第二个队列绑定到交换机
//    @Bean
//    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
//        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
//    }
    
    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }

}

运行consumer服务

(2)publisher服务
package cn.itcast.mq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;


//    @Test
//    public void testSimpleQueue() {
//        //队列名称
//        String queueName = "simple.queue";
//        //要发送的消息
//        String msg = "hello SpringAMQP!!!";
//        rabbitTemplate.convertAndSend(queueName, msg);
//    }

//    @Test
//    public void testSendMsgWorkQueue() throws InterruptedException {
//        //队列名称
//        String queueName = "simple.queue";
//        //要发送的消息
//        String msg = "hello SpringAMQP!!!";
//        for (int i = 0; i < 50; i++) {
//            rabbitTemplate.convertAndSend(queueName, msg+"第"+i+"条");
//            //不要一下发完 1秒发送50条消息
//            Thread.sleep(20);
//        }
//    }

//    @Test
//    public void testSendFanoutExchange(){
//        //交换机名称
//        String exchangeName = "peppacatt.fanout";
//        //消息
//        String msg = "hello Fanout Exchange!!!";
//        //发送消息
//        rabbitTemplate.convertAndSend(exchangeName, "", msg);
//    }

//    @Test
//    public void testSendDirectExchange() {
//        //交换机名称
//        String exchangeName = "peppacatt.direct";
//        //消息
//        String msg = "hello Direct Exchange!!!";
//        //发送消息
//        rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
//    }

//    @Test
//    public void testSendTopicExchange() {
//        //交换机名称
//        String exchangeName = "peppacatt.topic";
//        //消息
//        String msg = "hello Top Exchange!!!";
//        //发送消息
//        rabbitTemplate.convertAndSend(exchangeName, "china.news", msg);
//    }

    @Test
    public void testSendObjectQueue() {
        Map msg = new HashMap();
        msg.put("name", "小王");
        msg.put("age", 21);
        //发送消息
        rabbitTemplate.convertAndSend("object.queue", msg);
    }

}

执行test方法

消息序列化


(1)publisher服务

1.在publisher服务或者父工程引入依赖

        
        
            com.fasterxml.jackson.core
            jackson-databind
        

2.在publisher服务中声明Bean MessageConverter

package cn.itcast.mq.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConverterConfig {

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

}

清理掉RabbitMQ中队列object.queue中之前发送的消息

重新执行test

(2)consumer服务

1.在consumer服务或父类引入依赖

        
        
            com.fasterxml.jackson.core
            jackson-databind
        

2.在consumer服务中配置Bean MessageConverter

3.编写RabbitListener

package cn.itcast.mq.config;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class RabbitMQListener {

    //queues 指定队列名称(可指定多个也可指定一个)
//    @RabbitListener(queues = "simple.queue")
//    public void listenSimpleQueue(String msg){
//        System.out.println("消费者接收到simple.queue的消息:["+msg+"]");
//    }

//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue(String msg) throws InterruptedException {
//        System.out.println("消费者0号wwwwwwwwwwwwwwwwww接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
//        //一秒接受50条消息
//        Thread.sleep(20);
//    }
//
//    @RabbitListener(queues = "simple.queue")
//    public void listenWorkQueue1(String msg) throws InterruptedException {
//        System.out.println("消费者1号sssssssssssssssssss接收到simple.queue的消息:[" + msg + "]" + LocalTime.now());
//        //一秒接受5条消息
//        Thread.sleep(200);
//    }

//    @RabbitListener(queues = "fanout.queue1")
//    public void listenFanoutQueue1(String msg) throws InterruptedException {
//        System.out.println("消费者接收到fanout.queue1的消息:[" + msg + "]");
//    }
//
//    @RabbitListener(queues = "fanout.queue2")
//    public void listenFanoutQueue2(String msg) throws InterruptedException {
//        System.out.println("消费者接收到fanout.queue2的消息:[" + msg + "]");
//    }

//    //在@RabbitListener注解上直接声明Queue和Exchange省去了声明Bean
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(name = "direct.queue1")
//            , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT)
//            , key = {"red", "blue"}
//    ))
//    public void listenerDirectQueue1(String msg) {
//        System.out.println("消费者接收到direct.queue1的消息:[" + msg + "]");
//    }
//
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(name = "direct.queue2")
//            , exchange = @Exchange(name = "peppacatt.direct", type = ExchangeTypes.DIRECT)
//            , key = {"red", "yellow"}
//    ))
//    public void listenerDirectQueue2(String msg) {
//        System.out.println("消费者接收到direct.queue2的消息:[" + msg + "]");
//    }

//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(name = "topic.queue1")
//            , exchange = @Exchange(name = "peppacatt.topic", type = ExchangeTypes.TOPIC)
//            , key = "china.#"
//    ))
//    public void listenerTopicQueue1(String msg) {
//        System.out.println("消费者接收到topic.queue1的消息:[" + msg + "]");
//    }
//
//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(name = "topic.queue2")
//            , exchange = @Exchange(name = "peppacatt.topic", type = ExchangeTypes.TOPIC)
//            , key = "#.news"
//    ))
//    public void listenerTopicQueue2(String msg) {
//        System.out.println("消费者接收到topic.queue2的消息:[" + msg + "]");
//    }


    @RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map msg){
        System.out.println("接收到object.queue的消息:["+msg+"]");
    }


}

运行consumer服务

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5706046.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存