RabbitMQ与SpringBoot整合(JavaBean方式)

RabbitMQ与SpringBoot整合(JavaBean方式),第1张

RabbitMQ与SpringBoot整合(JavaBean方式)

        本次的学习是基于Gradle 来进行项目管理的,话不多少,直接上干货

引入依赖

        只选择显示了所需要引入的包,版本自行选择

// 引入SpringBoot依赖
compile('org.springframework.boot:spring-boot-starter-web')
// 引入rabbitMQ的依赖
compile('org.springframework.boot:spring-boot-starter-amqp')
testImplementation(libraries.'spring-rabbit-test')    
公共配置
package com.any.common;


public enum RabbitMQEnum {
    
    FANOUT_EXCHANGE_NAME("fanout_order_exchange"),

    QUEUE_FANOUT_WECHAT("fanout.wechat.queue"),

    QUEUE_FANOUT_SMS("fanout.sms.queue"),

    QUEUE_FANOUT_EMAIL("fanout.email.queue"),

    
    DIRECT_EXCHANGE_NAME("direct_order_exchange"),

    QUEUE_DIRECT_WECHAT("direct.wechat.queue"),

    QUEUE_DIRECT_SMS("direct.sms.queue"),

    QUEUE_DIRECT_EMAIL("direct.email.queue"),

    ROUTINGKEY_SMS("SMS"),

    ROUTINGKEY_EMAIL("Email"),

    ROUTINGKEY_WECHAT("WeChat"),

    
    TOPICS_EXCHANGE_NAME("topics_order_exchange"),

    QUEUE_TOPICS_WECHAT("topics.wechat.queue"),

    QUEUE_TOPICS_SMS("topics.sms.queue"),

    QUEUE_TOPICS_EMAIL("topics.email.queue"),

    TOPICS_ROUTINGKEY_SMS("#.sms.#"),

    TOPICS_ROUTINGKEY_EMAIL("*.email.#"),

    TOPICS_ROUTINGKEY_WECHAT("#.wechat.*"),

    
    DIRECT_TTL_EXCHANGE_NAME("direct_ttl_order_exchange"),
    QUEUE_TTL_DIRECT_SMS("direct.ttl.sms.queue"),
    QUEUE_TTL_MESSAGE_DIRECT_SMS("direct.ttl.message.sms.queue"),
    ROUTINGKEY_TTL_SMS("TTL_SMS"),
    ROUTINGKEY_TTL_MESSAGE_SMS("TTL_MESSAGE_SMS"),

    
    DIRECT_TTL_DEAD_EXCHANGE_NAME("direct_ttl_dead_order_exchange"),
    QUEUE_TTL_DEAD_DIRECT_SMS("direct.ttl.dead.sms.queue"),
    ROUTINGKEY_TTL_DEAD_SMS("ttl_dead_sms");

    private String desc;

    RabbitMQEnum(String desc) {
        this.desc = desc;
    }

    public String getDesc() {
        return desc;
    }
}
生产者         RabbitMQ的连接配置
package com.any.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;


@Configuration
@Slf4j
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 此注解必须加
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

}
        发布/订阅(Publish/Subscribe)模式
package com.any.config;

import com.any.common.RabbitMQEnum;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class FanoutRabbitMQConfiguration {

    
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(RabbitMQEnum.FANOUT_EXCHANGE_NAME.getDesc(),true,false);
    }

    
    @Bean
    public Queue fanoutWechatQueue(){
        return new Queue(RabbitMQEnum.QUEUE_FANOUT_WECHAT.getDesc(),true);
    }

    
    @Bean
    public Binding fanoutWechatBinding(){
        return BindingBuilder.bind(fanoutWechatQueue()).to(fanoutExchange());
    }

}
        路由(Routing)模式
package com.any.config;

import com.any.common.RabbitMQEnum;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class DirectRabbitMQConfiguration {

    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(RabbitMQEnum.DIRECT_EXCHANGE_NAME.getDesc(),true,false);
    }

    
    @Bean
    public Queue directWechatQueue(){
        return new Queue(RabbitMQEnum.QUEUE_DIRECT_WECHAT.getDesc(),true);
    }

    
    @Bean
    public Binding directWechatBinding(){
        return BindingBuilder.bind(directWechatQueue()).to(directExchange()).with(RabbitMQEnum.ROUTINGKEY_WECHAT.getDesc());
    }

}
        主题(Topics)模式
package com.any.config;

import com.any.common.RabbitMQEnum;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicsRabbitMQConfiguration {

    
    @Bean
    public TopicExchange topicsExchange(){
        return new TopicExchange(RabbitMQEnum.TOPICS_EXCHANGE_NAME.getDesc(),true,false);
    }

    
    @Bean
    public Queue topicsWechatQueue(){
        return new Queue(RabbitMQEnum.QUEUE_TOPICS_WECHAT.getDesc(),true);
    }
    
    @Bean
    public Queue topicsSmsQueue(){
        return new Queue(RabbitMQEnum.QUEUE_TOPICS_SMS.getDesc(),true);
    }

    
    @Bean
    public Queue topicsEmailQueue(){
        return new Queue(RabbitMQEnum.QUEUE_TOPICS_EMAIL.getDesc(),true);
    }

    
    @Bean
    public Binding topicsWechatBinding(){
        return BindingBuilder.bind(topicsWechatQueue()).to(topicsExchange()).with(RabbitMQEnum.TOPICS_ROUTINGKEY_WECHAT.getDesc());
    }

    
    @Bean
    public Binding topicsSmsBinding(){
        return BindingBuilder.bind(topicsSmsQueue()).to(topicsExchange()).with(RabbitMQEnum.TOPICS_ROUTINGKEY_SMS.getDesc());
    }
    
    @Bean
    public Binding topicsEmailBinding(){
        return BindingBuilder.bind(topicsEmailQueue()).to(topicsExchange()).with(RabbitMQEnum.TOPICS_ROUTINGKEY_EMAIL.getDesc());
    }
}
        测试Service
package com.any.service.Impl;

import com.any.common.RabbitMQEnum;
import com.any.service.OrderService;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;


@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Override
    public void fanoutMakeOrder(String userId, String productId, int num) {
        // 1、根据商品Id 查询库存是否充足
        // 2、保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderId);
        // 3、通过MQ 来完成消息的分发
        // 参数1:交换机  参数2:路由key/queue队列名称 参数3:消息内容
        String routingKey = "";
        rabbitTemplate.convertAndSend(RabbitMQEnum.FANOUT_EXCHANGE_NAME.getDesc(), routingKey, orderId);
    }

    @Override
    public void diRectMakeOrder(String userId, String productId, int num) {
        // 1、根据商品Id 查询库存是否充足
        // 2、保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderId);
        // 3、通过MQ 来完成消息的分发
        // 参数1:交换机  参数2:路由key/queue队列名称 参数3:消息内容
        // 发送SMS
        rabbitTemplate.convertAndSend(
                RabbitMQEnum.DIRECT_EXCHANGE_NAME.getDesc(), RabbitMQEnum.ROUTINGKEY_SMS.getDesc(), orderId);

        // 发送Email
        rabbitTemplate.convertAndSend(
                RabbitMQEnum.DIRECT_EXCHANGE_NAME.getDesc(), RabbitMQEnum.ROUTINGKEY_EMAIL.getDesc(), orderId);
    }

    @Override
    public void topicsMakeOrder(String userId, String productId, int num) {
        // 1、根据商品Id 查询库存是否充足
        // 2、保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderId);

        String routingKey = "wechat.email.sms";
        // 3、通过MQ 来完成消息的分发
        // 参数1:交换机  参数2:路由key/queue队列名称 参数3:消息内容
        // 发送SMS

        //TOPICS_ROUTINGKEY_SMS("#.sms.#"),
        //
        //        TOPICS_ROUTINGKEY_EMAIL("*.email.#"),
        //
        //        TOPICS_ROUTINGKEY_WECHAT("#.wechat.*");
        rabbitTemplate.convertAndSend(RabbitMQEnum.TOPICS_EXCHANGE_NAME.getDesc(), routingKey, orderId);
    }

    @Override
    public void directTTLMakeOrder(String userId, String productId, int num) {
        // 1、根据商品Id 查询库存是否充足
        // 2、保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderId);
        // 3、通过MQ 来完成消息的分发
        // 参数1:交换机  参数2:路由key/queue队列名称 参数3:消息内容
        // 发送SMS
        System.out.println(RabbitMQEnum.ROUTINGKEY_TTL_SMS.getDesc());

        for (int i = 1; i < 11; i++) {
            rabbitTemplate.convertAndSend(RabbitMQEnum.DIRECT_TTL_EXCHANGE_NAME.getDesc()
                    , RabbitMQEnum.ROUTINGKEY_TTL_SMS.getDesc(), orderId+"-size:"+i);
        }

    }

    @Override
    public void directTTLMessageMakeOrder(String userId, String productId, int num) {
        // 1、根据商品Id 查询库存是否充足
        // 2、保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:" + orderId);
        // 3.给消息设置过期时间
        MessagePostProcessor postProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        // 3、通过MQ 来完成消息的分发
        // 参数1:交换机  参数2:路由key/queue队列名称 参数3:消息内容
        // 发送SMS
        System.out.println(RabbitMQEnum.ROUTINGKEY_TTL_MESSAGE_SMS.getDesc());
        rabbitTemplate.convertAndSend(RabbitMQEnum.DIRECT_TTL_EXCHANGE_NAME.getDesc()
                , RabbitMQEnum.ROUTINGKEY_TTL_MESSAGE_SMS.getDesc(), orderId,postProcessor);
    }
}
消费者         RabbitMQ的连接配置
package com.any.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;


@Configuration
@Slf4j
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 此注解必须加
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

}
        消费者测试接收(其他同理)
package com.any.rabbitmq.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@RabbitListener(queues = {"fanout.email.queue"})
@Component
public class FanoutEmailConsumer {

    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("Email fanout---接收到的消息是:->"+message);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701777.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存