RabbitMQ基础

RabbitMQ基础,第1张

RabbitMQ基础 RabbitMQ基础

目录

RabbitMQ基础

一、MQ基础

1.MQ的基本概念2.分布式系统通信的方式3.MQ的优劣

3.1 MQ的优势3.2 MQ的劣势 二、RabbitMQ基础

1.JMS2.RabbitMQ安装

2.1 安装说明2.2 踩坑事件2.3 安装路径 3.代码连接RabbitMQ

3.1 导入依赖3.2 生产者代码(简单模式)3.3 消费者代码(简单模式) 4.工作模式

4.1 工作队列模式4.2 订阅模式

4.2.1 fanout(广播模式)4.2.2 routing(路由模式)4.2.3 topics(通配符模式) 5.SpringBoot整合RabbitMQ

5.1 整合生产者5.2 整合消费者

一、MQ基础 1.MQ的基本概念

​ MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器,多用于分布式系统之间进行通信。

2.分布式系统通信的方式

​ 分布式系统中发送信息的一方称为生产者,接受信息的一方称为消费者,分布式系统通信有以下两种:

​ 系统之间直接调用​ 借助第三方间接完成通信,就是下面这幅图所图的通信方式

3.MQ的优劣 3.1 MQ的优势

应用解耦

​ 有一个订单系统,若其库存系统暂时出问题了,那么可能会导致整个订单系统出问题,如下图:

​ 若现在给其中加一个MQ中间件,订单系统中的消息全部发到MQ中,然后由MQ给各库存系统、支付系统等发布信息。如果库存系统出问题了,那么也不影响订单系统,因为数据在MQ中保存,待库存系统恢复之后再从MQ中拿取信息即可,如下图:

异步提速

​ 当只有订单系统、库存系统、支付系统和物流系统时,这些系统之间的交互和与数据库之间的交互会花费很长时间,如下图:

​ 但如果加上中间件MQ,订单系统只需要把消息成功给MQ,就直接可以给用户返回成功的信息,接下来就是MQ与各系统之间进行交互,大大减小了时间花费,如下图:

削峰填谷

​ 现有A系统,每秒处理请求最大为1000,而现在每秒有5000的请求,若不进行系统架构的优化处理,那么A系统会直接宕机。

​ 若现在给A系统前加一个MQ,让所有的请求先发到MQ中,然后A系统再慢慢的从MQ中拿请求,这时A系统的稳定性就会提高很多,如下图:

​ 所以削峰填谷的意思就是,当请求突然增高时系统需要处理大量的请求(峰),其他时间系统的请求会大大降低(谷),在加入MQ之后,在大访问量时A系统的依然处理的是1000请求(削峰),当大访问量过去之后,A系统依然会处理刚才没有处理完的请求,访问量依然是1000(填谷)。

​ 可以使用以下图来解释(注:下图是黑马课程课件中的图)

3.2 MQ的劣势

系统可用性降低

系统引入的外部依赖越多,系统稳定性越差,一旦MQ宕机,就会对业务造成影响

系统的复杂度提高

MQ的加入大大增加了系统的复杂度,以前系统之间是直接进行调用,现在是通过MQ异步调用,那么如何保证消息没有被重复消费、消息丢失情况和消息传递的有序性等问题,这也是需要考虑的。

一致性问题

A系统处理完业务,通过MQ给B、C、D系统发送消息数据,如果B系统、C系统处理成功,D系统失败,如何保证消息数据的一致性也是需要解决的问题。

二、RabbitMQ基础 1.JMS

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的APIJMS是JavaEE规范中的一种,类比JDBC 2.RabbitMQ安装 2.1 安装说明

​ RabbitMQ的安装说明可以访问RabbitMQ安装教程

2.2 踩坑事件

​ 【踩坑日记】:在linux上把RabbitMQ都安装好了,但就是打不开RabbitMQ的Web管理界面,最后才发现是阿里云的安全规则没有设置15672端口,设置之后就可以了。

2.3 安装路径

​ RabbitMQ在linux上的默认安装路径为:

#输入下面路径即可进入rabbitMQ安装路径
cd /usr/share/doc/rabbitmq-server-3.6.5/

#rabbitMQ的配置文件
rabbit.conf.example

3.代码连接RabbitMQ

​ 建两个maven项目,分别作为生产者和消费者

3.1 导入依赖

    
    
        com.rabbitmq
        amqp-client
        5.6.0
    



    
        
        
            org.apache.maven.plugins
            maven-compiler-plugin
            3.8.0
            
                1.8
                1.8
            
        
    

3.2 生产者代码(简单模式)
public static void main(String[] args) throws Exception{

    //1.创建连接工厂
    ConnectionFactory factory=new ConnectionFactory();

    //2.设置参数
    factory.setHost("47.96.150.204");   //设置IP地址,默认值为localhost
    factory.setPort(5672);  //设置端口,默认值也为5672
    factory.setVirtualHost("/it");  //设置虚拟机,"/it"是虚拟机名字,默认值为"/"
    factory.setUsername("admin");   //设置账号,默认guest
    factory.setPassword("admin");   //设置密码,默认guest

    //3.创建连接 Connection
    Connection connection = factory.newConnection();

    //4.创建channel
    Channel channel = connection.createChannel();

    //5.创建一个队列Queue
    
    //如果没有名叫hello_world的队列则会创建,有的话则不会创建
    channel.queueDeclare("hello_world",true,false,false,null);

    //6.发送消息
    

    String body="hello rabbitmq~";
    channel.basicPublish("","hello_world",null,body.getBytes());

    //7.释放资源
    channel.close();
    connection.close();
}
3.3 消费者代码(简单模式)
public static void main(String[] args) throws Exception{
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();

    //2.设置连接参数
    factory.setHost("47.96.150.204");//设置IP
    factory.setPort(5672);//设置端口号
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setVirtualHost("/it");//设置虚拟机

    //3.创建连接
    Connection connection = factory.newConnection();

    //4.创建channel
    Channel channel = connection.createChannel();

    //5.创建一个队列
    channel.queueDeclare("hello_world",true,false,false,null);

    //6.接收消息
    
    Consumer consumer=new DefaultConsumer(channel){
        

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("consumerTag="+consumerTag);
            System.out.println("Exchange="+envelope.getExchange());
            System.out.println("routingKey="+envelope.getRoutingKey());
            System.out.println("properties="+properties);
            System.out.println("body="+ new String(body));
            super.handleDelivery(consumerTag, envelope, properties, body);
        }
    };

    channel.basicConsume("hello_world",true,consumer);
}
4.工作模式 4.1 工作队列模式

工作队列模式与简单模式相比,多了一个或一些消费者在一个队列中有多个消费者,那么多个消费者对于同一条消息是竞争关系Work Queue对于任务过重或任务较多情况使用工作队列模式提高处理速度,例如:短信服务部署

测试代码

​ 消费者代码

public static void main(String[] args) throws Exception {

    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();

    //2.设置参数
    factory.setHost("47.96.150.204");   //设置IP地址,默认值为localhost
    factory.setPort(5672);  //设置端口,默认值也为5672
    factory.setVirtualHost("/it");  //设置虚拟机,"/it"是虚拟机名字,默认值为"/"
    factory.setUsername("admin");   //设置账号,默认guest
    factory.setPassword("admin");   //设置密码,默认guest

    //3.创建连接 Connection
    Connection connection = factory.newConnection();

    //4.创建channel
    Channel channel = connection.createChannel();

    //5.创建一个队列Queue
    
    //如果没有名叫hello_world的队列则会创建,有的话则不会创建
    channel.queueDeclare("work_queue", true, false, false, null);

    //6.发送消息
    
    
	//发送10条消息
    for(int i=0;i<10;i++){
        String body = i+"message two!";
        channel.basicPublish("", "work_queue", null, body.getBytes());
    }

    //7.释放资源
    channel.close();
    connection.close();
}

​ 消费者1

public static void main(String[] args) throws Exception {
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();

    //2.设置连接参数
    factory.setHost("47.96.150.204");//设置IP
    factory.setPort(5672);//设置端口号
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setVirtualHost("/it");//设置虚拟机

    //3.创建连接
    Connection connection = factory.newConnection();

    //4.创建channel
    Channel channel = connection.createChannel();

    //5.创建一个队列
    channel.queueDeclare("work_queue", true, false, false, null);

    //6.接收消息
    
    Consumer consumer = new DefaultConsumer(channel) {
        

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("body=" + new String(body));
        }
    };

    channel.basicConsume("work_queue", true, consumer);
}

​ 消费者2(与消费者1代码相同)

public static void main(String[] args) throws Exception {
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();

    //2.设置连接参数
    factory.setHost("47.96.150.204");//设置IP
    factory.setPort(5672);//设置端口号
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setVirtualHost("/it");//设置虚拟机

    //3.创建连接
    Connection connection = factory.newConnection();

    //4.创建channel
    Channel channel = connection.createChannel();

    //5.创建一个队列
    channel.queueDeclare("work_queue", true, false, false, null);

    //6.接收消息
    
    Consumer consumer = new DefaultConsumer(channel) {
        

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("body=" + new String(body));
        }
    };

    channel.basicConsume("work_queue", true, consumer);
}	

运行结果

​ 运行生产者代码,发送10条信息给work_queue队列中,然后两个消费者进行接收消息,对于一条消息,只能有一个消费者进行接收

4.2 订阅模式

​ 在之前的简单模式和工作队列模式中,对于同一条消息只能有一个消费者接收,而订阅模式中,一个消息可以被多个消费者共享。

​ 可以从上图看出,P(生产者)发送消息给交换机,交换机路由分发给队列,然后消费者监听队列获取消息。

P:生产者,也就是要发送消息的程序,但是不再发给队列中,而是发给交换机C:消费者,消息的接收者,会一直等待消息到来Queue:消息队列,用来接收消息、缓存消息Exchange:交换机(X),一方面接收生产者的消息,另一方面,知道如何处理消息,例如将消息递交给某个特别的队列、递交给所有的队列、或是将消息丢弃。如何 *** 作取决于Exchange的类型,常用的有如下几种:

Fanout:广播,将消息交给所有绑定交换机的队列Direct:定向,把消息交给符合指定routing key的队列Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何的队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。

4.2.1 fanout(广播模式)

测试代码

​ 生产者代码

public static void main(String[] args) throws Exception {

    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2.设置参数
    factory.setHost("47.96.150.204");   //设置IP地址,默认值为localhost
    factory.setPort(5672);  //设置端口,默认值也为5672
    factory.setVirtualHost("/it");  //设置虚拟机,"/it"是虚拟机名字,默认值为"/"
    factory.setUsername("admin");   //设置账号,默认guest
    factory.setPassword("admin");   //设置密码,默认guest
    //3.创建连接 Connection
    Connection connection = factory.newConnection();
    //4.创建channel
    Channel channel = connection.createChannel();
    //5.创建交换机
    
    String exchangeName="test_fanout";
    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
    //6.创建两个队列
    String queue1Name="test_fanout_queue1";
    String queue2Name="test_fanout_queue2";
    channel.queueDeclare(queue1Name,true,false,false,null);
    channel.queueDeclare(queue2Name,true,false,false,null);
    //7.绑定队列和交换机
    
    channel.queueBind(queue1Name,exchangeName,"");
    channel.queueBind(queue2Name,exchangeName,"");
    //8.发送消息
    String body="日志信息:张三去调用了findAll方法...日志级别:info";
    channel.basicPublish(exchangeName,"",null,body.getBytes());
    //9.释放资源
    channel.close();
    connection.close();
}

​ 消费者1代码

public static void main(String[] args) throws Exception {
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();

    //2.设置连接参数
    factory.setHost("47.96.150.204");//设置IP
    factory.setPort(5672);//设置端口号
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setVirtualHost("/it");//设置虚拟机

    //3.创建连接
    Connection connection = factory.newConnection();

    //4.创建channel
    Channel channel = connection.createChannel();

//        //5.创建一个队列,队列创建一次就可以了
//        channel.queueDeclare("test_fanout_queue1", true, false, false, null);

    //6.接收消息
    
    Consumer consumer = new DefaultConsumer(channel) {
        

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("body=" + new String(body));
            System.out.println("将日志信息打印到控制台...");
        }
    };
    String queueName="test_fanout_queue1";
    channel.basicConsume(queueName, true, consumer);
}

​ 消费者2代码(基本相同)

public static void main(String[] args) throws Exception {
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();

    //2.设置连接参数
    factory.setHost("47.96.150.204");//设置IP
    factory.setPort(5672);//设置端口号
    factory.setUsername("admin");
    factory.setPassword("admin");
    factory.setVirtualHost("/it");//设置虚拟机

    //3.创建连接
    Connection connection = factory.newConnection();

    //4.创建channel
    Channel channel = connection.createChannel();

//        //5.创建一个队列,队列创建一次就可以了
//        channel.queueDeclare("test_fanout_queue1", true, false, false, null);

    //6.接收消息
    
    Consumer consumer = new DefaultConsumer(channel) {
        

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("body=" + new String(body));
            System.out.println("将日志信息保存到数据库...");
        }
    };
    String queueName="test_fanout_queue2";
    channel.basicConsume(queueName, true, consumer);
}

运行结果

​ 消费者1将信息打印到控制台上

​ 消费者2将信息保存到数据库中

4.2.2 routing(路由模式)

​ 【需求】:在上面的fanout中,将消息(日志信息)发送给交换机,交换机路由分发给队列,消费者从队列中取消息,取的都是同样的消息。如果现在要求将日志级别为info的打印到控制台上,将日志级别为bug的保存到数据库中,需要将这些不同级别的日志消息由交换机分发给不同的队列中,这就需要路由模式。

模式说明

队列与交换机绑定不是随意绑定了,而是需要指定一个RoutingKey(路由key)消息的发送方给Exchange(交换机)发送时也需要指定RoutingKeyExchange不再把信息交给每一个队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致时,才会接收到消息

代码测试

​ 生产者代码

public static void main(String[] args) throws Exception{
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2.设置工厂连接参数
    factory.setHost("47.96.150.204");
    factory.setPort(5672);
    factory.setVirtualHost("/it");
    factory.setUsername("admin");
    factory.setPassword("admin");
    //3.创建连接
    Connection connection = factory.newConnection();
    //4.创建channel
    Channel channel = connection.createChannel();
    //5.创建交换机
    String exchangeName="test_direct";
    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
    //6.创建队列
    String queue1Name="test_direct_queue1";
    String queue2Name="test_direct_queue2";
    channel.queueDeclare(queue1Name,true,false,false,null);
    channel.queueDeclare(queue2Name,true,false,false,null);
    //7.建立交换机和队列的绑定
    //队列1的绑定,绑定的是bug
    channel.queueBind(queue1Name,exchangeName,"bug");
    //队列2的绑定,绑定的是info、error、warning
    channel.queueBind(queue2Name,exchangeName,"info");
    channel.queueBind(queue2Name,exchangeName,"error");
    channel.queueBind(queue2Name,exchangeName,"warning");
    //8.发送消息
    String body_bug="日志信息:张三调用了findAll方法...日志级别:bug";
    channel.basicPublish(exchangeName,"bug",null,body_bug.getBytes());
    String body_info="日志信息:张三调用了findAll方法...日志级别:info";
    channel.basicPublish(exchangeName,"info",null,body_info.getBytes());
    String body_error="日志信息:张三调用了findAll方法...日志级别:error";
    channel.basicPublish(exchangeName,"error",null,body_error.getBytes());
    String body_warning="日志信息:张三调用了findAll方法...日志级别:warning";
    channel.basicPublish(exchangeName,"warning",null,body_warning.getBytes());
    //9.关闭资源
    channel.close();
    connection.close();

}

​ 消费者1代码

public static void main(String[] args) throws Exception{
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2.设置连接工厂配置
    factory.setHost("47.96.150.204");
    factory.setPort(5672);
    factory.setVirtualHost("/it");
    factory.setUsername("admin");
    factory.setPassword("admin");
    //3.创建连接
    Connection connection = factory.newConnection();
    //4.创建channel
    Channel channel = connection.createChannel();
    //5.接收消息
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("body="+new String(body).toString());
            System.out.println("将bug级别日志保存到数据库");
        }
    };
    //接收队列"test_direct_queue1"的消息
    channel.basicConsume("test_direct_queue1",defaultConsumer);
}

​ 消费者2代码

public static void main(String[] args) throws Exception{
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2.设置连接工厂配置
    factory.setHost("47.96.150.204");
    factory.setPort(5672);
    factory.setVirtualHost("/it");
    factory.setUsername("admin");
    factory.setPassword("admin");
    //3.创建连接
    Connection connection = factory.newConnection();
    //4.创建channel
    Channel channel = connection.createChannel();
    //5.接收消息
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("body="+new String(body).toString());
            System.out.println("将info、error、warning级别日志打印控制台");
        }
    };
    //接收队列"test_direct_queue2"的消息
    channel.basicConsume("test_direct_queue2",defaultConsumer);
}

运行结果

​ 消费者1接收的是队列1的消息(bug级别):

​ 消费者2接收的是队列2的消息(info、error、warning级别)

4.2.3 topics(通配符模式)

​ 通配符模式是更强大的交换机与队列之间匹配的方式,将使用前缀省略或后缀省略的方式进行匹配

【需求】:将以error级别的日志存入数据库,将所有order系统的日志存入数据库。不管什么样的控制信息全都打印到控制台上。

测试代码

​ 生产者代码

public static void main(String[] args) throws Exception{
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2.设置连接工厂参数
    factory.setHost("47.96.150.204");
    factory.setPort(5672);
    factory.setVirtualHost("/it");
    factory.setUsername("admin");
    factory.setPassword("admin");
    //3.创建连接
    Connection connection = factory.newConnection();
    //4.创建channel
    Channel channel = connection.createChannel();
    //5.创建交换机
    String exchangeName="test_topic";
    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
    //6.创建队列
    String queue1Name="test_topic_queue1";
    String queue2Name="test_topic_queue2";
    channel.queueDeclare(queue1Name,true,false,false,null);
    channel.queueDeclare(queue2Name,true,false,false,null);
    //7.设置交换机与队列绑定
    //队列1保存".error"和"order.*"消息,
    channel.queueBind(queue1Name,exchangeName,"#.error",null);
    channel.queueBind(queue1Name,exchangeName,"order.#",null);
    //队列2保存所有的消息
    channel.queueBind(queue2Name,exchangeName,"*.*",null);
    //8.发送消息
    String body_order="order系统的日志";
    String body_error="error级别的日志";
    String body_info="info级别的日志";
    channel.basicPublish(exchangeName,"order.info",null,body_order.getBytes());
    channel.basicPublish(exchangeName,"aof.error",null,body_error.getBytes());
    channel.basicPublish(exchangeName,"menu.info",null,body_info.getBytes());
    //9.关闭资源
    channel.close();
    connection.close();
}

​ 消费者1代码

public static void main(String[] args) throws Exception{
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2.设置连接工厂配置
    factory.setHost("47.96.150.204");
    factory.setPort(5672);
    factory.setVirtualHost("/it");
    factory.setUsername("admin");
    factory.setPassword("admin");
    //3.创建连接
    Connection connection = factory.newConnection();
    //4.创建channel
    Channel channel = connection.createChannel();
    //5.接收信息
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("body="+new String(body));
            System.out.println("已保存到数据库");
        }
    };
    //接收队列1的消息
    channel.basicConsume("test_topic_queue1",defaultConsumer);
}

​ 消费者2代码

public static void main(String[] args) throws Exception{
    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2.设置连接工厂配置
    factory.setHost("47.96.150.204");
    factory.setPort(5672);
    factory.setVirtualHost("/it");
    factory.setUsername("admin");
    factory.setPassword("admin");
    //3.创建连接
    Connection connection = factory.newConnection();
    //4.创建channel
    Channel channel = connection.createChannel();
    //5.接收信息
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("body="+new String(body));
            System.out.println("输出到控制台");
        }
    };
    //接收队列1的消息
    channel.basicConsume("test_topic_queue2",defaultConsumer);
}

运行结果

​ 将以error级别和order系统的日志消息存入队列1,并将其保存到数据库

​ 将所有的日志信息存入队列2,并将其输出到控制台

5.SpringBoot整合RabbitMQ 5.1 整合生产者

​ 依赖


    org.springframework.boot
    spring-boot-starter-amqp

​ application.yml

#配置rabbitmq的基本信息,Ip、端口号、账号、密码、虚拟机等
spring:
  rabbitmq:
    host: **.**.**.***
    port: 5672
    username: guest
    password: guest
    virtual-host: /
	

​ 配置类

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME="boot_topic_exchange";
    public static final String QUEUE_NAME="boot_queue";

    //1.交换机
    @Bean("newExchange")
    public Exchange newExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //2.队列
    @Bean("newQueue")
    public Queue newQueue(){
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //3.队列和交换机绑定关系
    @Bean
    public Binding bindQueueExchange(@Qualifier("newQueue") Queue queue,@Qualifier("newExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}

​ 测试类

@SpringBootTest(classes = {ProducerSpringbootApplication.class})
public class Test1 {

    //注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void send(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello~");
    }
}
5.2 整合消费者

​ 消费者中不需要写生产者中的配置类,然后需要写一个Listener类,当springboot项目启动时,里面的方法会自动执行,如下:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQListener {

    @RabbitListener(queues = "boot_queue")
    public void listenerQueue(Message message){
        System.out.println(message);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5718178.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存