spring 和springboot 整合rabbitmq

spring 和springboot 整合rabbitmq,第1张

spring 和springboot 整合rabbitmq

文章目录

spring && springboot整合 rabbitmq

4.1 spring 整合rabbitmq4.2 springboot 整合rabbitmq

spring && springboot整合 rabbitmq

rabbitmq 的安装相关 *** 作可以参考我的博客rabbitmq安装与 *** 作。

4.1 spring 整合rabbitmq

新建一个maven 工程rabbitmq,新建两个module为maven 的工程, 一个module 为spring-rabbitmq-consumer, 一个为spring-rabbitmq-producer。

项目结构如下所示:

1在rabbitmq的pom.xml 添加下面 添加依赖

 
        
        
            org.springframework
            spring-context
            5.1.8.RELEASE
        
        
        
            com.rabbitmq
            amqp-client
            5.6.0
        
        
        
            junit
            junit
            4.12
            test
        

        
            org.springframework
            spring-test
            5.1.8.RELEASE
            test
        


        
        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        


    

    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.8.0
                
                    1.8
                    1.8
                
            

        
    

注意: 使用@RunWith(SpringJUnit4ClassRunner.class)进行单元测试时,需要junit高版本和spring-test的高版本才支持,junit需要4.1.2以上的,spring-test也需要高版本,最开始我用的4.3.13的,翻开jar包发现并没有编写SpringJUnit4ClassRunner,后来直接换了5.1.8.RELEASE的就可以了。并且spring-test 的版本要与spring-context的版本要一致。

2 在两个module 下创建resourcesrabbitmq.properties连接参数等配置文件(注意修改成自己的相关配置)

rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest 
rabbitmq.password=guest
rabbitmq.virtual-host=/boshrong

3 在producer中 创建 resourcesspringspring-rabbitmq.xml 整合配置文件。 相当于一些配置。



    
    

    
    
    
    

    
    

    
    
    

    
    

    
    
        
            
            
        
    

    
    
    
    
    
    
    

    
        
            
            
            
        
    

    
    
    

xmlns:rabbit="http://www.springframework.org/schema/rabbit"

为spring 整合rabbitmq 的名称空间。对于命名空间的标识符, URI 的作用仅仅是保证唯一性, 它并不需要对应一个可以访问的资源或文件 ! 但是, 有很多公司都会让 namespace 的 URI 指向一个包含该命名空间信息的网页

xsi:schemaLocation限制对应的XSD(Xml Schema Definition)文档的位置的关系。

4 在测试类中编写代码

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
// 加载配置文件的路径
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml")
public class TestRabbitmq {

    // 注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;


    
    @Test
    public void queueTest(){
        // 发送消息
        rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息。");
    }
    
    @Test
    public void fanoutTest(){
        
        rabbitTemplate.convertAndSend("spring_fanout_exchange", "", "发送到spring_fanout_exchange交换机的广播消息");
    }

    
    @Test
    public void topicTest(){
        
        rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj", "发送到spring_topic_exchange交换机heima.bj的消息");
        rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj.1", "发送到spring_topic_exchange交换机heima.bj.1的消息");
        rabbitTemplate.convertAndSend("spring_topic_exchange", "heima.bj.2", "发送到spring_topic_exchange交换机heima.bj.2的消息");
        rabbitTemplate.convertAndSend("spring_topic_exchange", "itcast.cn", "发送到spring_topic_exchange交换机itcast.cn的消息");
    }

}

运行结果:

虽然只运行了一个测试用例,可以看到在监控页面将所有的队列全部加载了进来。

5 编写消费者

消费者只需要定义一个类作为消息监听器。当消息一来,就会调用消息的执行方法。

5.1 在消费者中的resource 中创建springrabbitmq.xml



    
    

    
    

    
    
    
    
    
    

    
        
        
        
        
        
        
    

在配置文件中,定义一个rabbit:listener-container 将自己实现的监听器引入进来。

5.2 在配置 spring-rabbitma.xml 中,我们定义了六类监听器。具体如下,创建对应的监听器java 类。其要实现MessageListener接口,并重写onMessage方法。

 





为了方便测试,监听器代码一致如下。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import java.io.UnsupportedEncodingException;

public class FanoutListener1 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        String msg = null;
        try {
            msg = new String(message.getBody(), "utf-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        System.out.printf("接收路由名称为:%s,路由键为:%s,队列名为:%s的消息:%s n",
                message.getMessageProperties().getReceivedExchange(),
                message.getMessageProperties().getReceivedRoutingKey(),
                message.getMessageProperties().getConsumerQueue(),
                msg);


    }
}

5.3 编写consumer 的测试类进行测试

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:/spring/spring-rabbitmq.xml")
public class TestConsumer {

    @Test
    public void test1(){
        while(true){
            
        }
    }


}

其通过@ContextConfiguration() 注解,将监听加载进来,只要程序不停止就会一直监听,并从其中取消息进行消费。

运行结果如下:

4.2 springboot 整合rabbitmq

使用spring 整合rabbitmq 需要大量的xml 的配置文件springboot 整合rabbitmq 之后,减少了大量xml 文件的编写。

整体项目架构

1 创建springboot 项目

创建springboot-rabbitmq-producer 项目

2 引入依赖


    org.springframework.boot
    spring-boot-starter-amqp

3 编写springboot 启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

4 编写yal 配置文件,基本信息进行配置。

在resources 文件下创建 application.yml 文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /boshrong
    username: guest
    password: guest

5 定义交换机,队列以及绑定关系的配置类。springboot 使用配置类来代替spring 中的xml文件中的相关配置。

使用rabbitmq 的topic 模式进行测试。对topic 模式不太了解,可以参考我的博客。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class rabbitmqConfig {
    //交换机名称
    public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
    //队列名称
    public static final String ITEM_QUEUE = "item_queue";
    public static final String ORDER_QUEUE = "order_queue";

    //声明交换机
    @Bean("itemTopicExchange")
    public Exchange topicExchange(){
        return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
    }

    //声明队列
    @Bean("itemQueue")
    public Queue itemQueue(){
        return QueueBuilder.durable(ITEM_QUEUE).build();
    }
    @Bean("orderQueue")
    public Queue orderQueue(){
        return QueueBuilder.durable(ORDER_QUEUE).build();
    }

    //绑定队列和交换机
    @Bean
    public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
                                     @Qualifier("itemTopicExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
    }
    @Bean
    public Binding orderQueueExchange(@Qualifier("orderQueue") Queue queue,
                                     @Qualifier("itemTopicExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }

}

5 编写测试,发送消息

@SpringBootTest
class DemoApplicationTests {

    // 注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        
        rabbitTemplate.convertAndSend(rabbitmqConfig.ITEM_TOPIC_EXCHANGE,"order.err","发送一个订单错误消息");
        rabbitTemplate.convertAndSend(rabbitmqConfig.ITEM_TOPIC_EXCHANGE,"item.err","发送一个条目错误信息");

    }

}

可以在监控页面看到下面queue 的界面。

我将test 执行了两边所有每个queue里面有两条信息。

6 创建rabbitmqconsumer springboot 项目

导入依赖


    org.springframework.boot
    spring-boot-starter-amqp

在resources文件下创建application.yml 文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /boshrong
    username: guest
    password: guest

编写相应的监听器

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class myListener {

    @RabbitListener(queues = "item_queue")
    public void itemListener(String message){
        System.out.println("item消费者接收到的消息为:" + message);
    }

    @RabbitListener(queues = "order_queue")
    public void orderListener(String message){
        System.out.println("order消费者接收的消息为"+message);
    }

}

这时候只要启动springboot 主类。监听器就会不断监听,消费相应队列中的数据。

运行结果如下:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700289.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存