本次的学习是基于Gradle 来进行项目管理的,话不多少,直接上干货
引入依赖只选择显示了所需要引入的包,版本自行选择
// 引入SpringBoot依赖 compile('org.springframework.boot:spring-boot-starter-web') // 引入rabbitMQ的依赖 compile('org.springframework.boot:spring-boot-starter-amqp') testImplementation(libraries.'spring-rabbit-test')公共配置
package com.any.common; public enum RabbitMQEnum { FANOUT_EXCHANGE_NAME("fanout_order_exchange"), QUEUE_FANOUT_WECHAT("fanout.wechat.queue"), QUEUE_FANOUT_SMS("fanout.sms.queue"), QUEUE_FANOUT_EMAIL("fanout.email.queue"), DIRECT_EXCHANGE_NAME("direct_order_exchange"), QUEUE_DIRECT_WECHAT("direct.wechat.queue"), QUEUE_DIRECT_SMS("direct.sms.queue"), QUEUE_DIRECT_EMAIL("direct.email.queue"), ROUTINGKEY_SMS("SMS"), ROUTINGKEY_EMAIL("Email"), ROUTINGKEY_WECHAT("WeChat"), TOPICS_EXCHANGE_NAME("topics_order_exchange"), QUEUE_TOPICS_WECHAT("topics.wechat.queue"), QUEUE_TOPICS_SMS("topics.sms.queue"), QUEUE_TOPICS_EMAIL("topics.email.queue"), TOPICS_ROUTINGKEY_SMS("#.sms.#"), TOPICS_ROUTINGKEY_EMAIL("*.email.#"), TOPICS_ROUTINGKEY_WECHAT("#.wechat.*"), DIRECT_TTL_EXCHANGE_NAME("direct_ttl_order_exchange"), QUEUE_TTL_DIRECT_SMS("direct.ttl.sms.queue"), QUEUE_TTL_MESSAGE_DIRECT_SMS("direct.ttl.message.sms.queue"), ROUTINGKEY_TTL_SMS("TTL_SMS"), ROUTINGKEY_TTL_MESSAGE_SMS("TTL_MESSAGE_SMS"), DIRECT_TTL_DEAD_EXCHANGE_NAME("direct_ttl_dead_order_exchange"), QUEUE_TTL_DEAD_DIRECT_SMS("direct.ttl.dead.sms.queue"), ROUTINGKEY_TTL_DEAD_SMS("ttl_dead_sms"); private String desc; RabbitMQEnum(String desc) { this.desc = desc; } public String getDesc() { return desc; } }生产者 RabbitMQ的连接配置
package com.any.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; @Configuration @Slf4j public class RabbitMQConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 此注解必须加 public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); return rabbitTemplate; } }发布/订阅(Publish/Subscribe)模式
package com.any.config; import com.any.common.RabbitMQEnum; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutRabbitMQConfiguration { @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(RabbitMQEnum.FANOUT_EXCHANGE_NAME.getDesc(),true,false); } @Bean public Queue fanoutWechatQueue(){ return new Queue(RabbitMQEnum.QUEUE_FANOUT_WECHAT.getDesc(),true); } @Bean public Binding fanoutWechatBinding(){ return BindingBuilder.bind(fanoutWechatQueue()).to(fanoutExchange()); } }路由(Routing)模式
package com.any.config; import com.any.common.RabbitMQEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitMQConfiguration { @Bean public DirectExchange directExchange(){ return new DirectExchange(RabbitMQEnum.DIRECT_EXCHANGE_NAME.getDesc(),true,false); } @Bean public Queue directWechatQueue(){ return new Queue(RabbitMQEnum.QUEUE_DIRECT_WECHAT.getDesc(),true); } @Bean public Binding directWechatBinding(){ return BindingBuilder.bind(directWechatQueue()).to(directExchange()).with(RabbitMQEnum.ROUTINGKEY_WECHAT.getDesc()); } }主题(Topics)模式
package com.any.config; import com.any.common.RabbitMQEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicsRabbitMQConfiguration { @Bean public TopicExchange topicsExchange(){ return new TopicExchange(RabbitMQEnum.TOPICS_EXCHANGE_NAME.getDesc(),true,false); } @Bean public Queue topicsWechatQueue(){ return new Queue(RabbitMQEnum.QUEUE_TOPICS_WECHAT.getDesc(),true); } @Bean public Queue topicsSmsQueue(){ return new Queue(RabbitMQEnum.QUEUE_TOPICS_SMS.getDesc(),true); } @Bean public Queue topicsEmailQueue(){ return new Queue(RabbitMQEnum.QUEUE_TOPICS_EMAIL.getDesc(),true); } @Bean public Binding topicsWechatBinding(){ return BindingBuilder.bind(topicsWechatQueue()).to(topicsExchange()).with(RabbitMQEnum.TOPICS_ROUTINGKEY_WECHAT.getDesc()); } @Bean public Binding topicsSmsBinding(){ return BindingBuilder.bind(topicsSmsQueue()).to(topicsExchange()).with(RabbitMQEnum.TOPICS_ROUTINGKEY_SMS.getDesc()); } @Bean public Binding topicsEmailBinding(){ return BindingBuilder.bind(topicsEmailQueue()).to(topicsExchange()).with(RabbitMQEnum.TOPICS_ROUTINGKEY_EMAIL.getDesc()); } }测试Service
package com.any.service.Impl; import com.any.common.RabbitMQEnum; import com.any.service.OrderService; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.UUID; @Service public class OrderServiceImpl implements OrderService { @Autowired private RabbitTemplate rabbitTemplate; @Override public void fanoutMakeOrder(String userId, String productId, int num) { // 1、根据商品Id 查询库存是否充足 // 2、保存订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); // 3、通过MQ 来完成消息的分发 // 参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容 String routingKey = ""; rabbitTemplate.convertAndSend(RabbitMQEnum.FANOUT_EXCHANGE_NAME.getDesc(), routingKey, orderId); } @Override public void diRectMakeOrder(String userId, String productId, int num) { // 1、根据商品Id 查询库存是否充足 // 2、保存订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); // 3、通过MQ 来完成消息的分发 // 参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容 // 发送SMS rabbitTemplate.convertAndSend( RabbitMQEnum.DIRECT_EXCHANGE_NAME.getDesc(), RabbitMQEnum.ROUTINGKEY_SMS.getDesc(), orderId); // 发送Email rabbitTemplate.convertAndSend( RabbitMQEnum.DIRECT_EXCHANGE_NAME.getDesc(), RabbitMQEnum.ROUTINGKEY_EMAIL.getDesc(), orderId); } @Override public void topicsMakeOrder(String userId, String productId, int num) { // 1、根据商品Id 查询库存是否充足 // 2、保存订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); String routingKey = "wechat.email.sms"; // 3、通过MQ 来完成消息的分发 // 参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容 // 发送SMS //TOPICS_ROUTINGKEY_SMS("#.sms.#"), // // TOPICS_ROUTINGKEY_EMAIL("*.email.#"), // // TOPICS_ROUTINGKEY_WECHAT("#.wechat.*"); rabbitTemplate.convertAndSend(RabbitMQEnum.TOPICS_EXCHANGE_NAME.getDesc(), routingKey, orderId); } @Override public void directTTLMakeOrder(String userId, String productId, int num) { // 1、根据商品Id 查询库存是否充足 // 2、保存订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); // 3、通过MQ 来完成消息的分发 // 参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容 // 发送SMS System.out.println(RabbitMQEnum.ROUTINGKEY_TTL_SMS.getDesc()); for (int i = 1; i < 11; i++) { rabbitTemplate.convertAndSend(RabbitMQEnum.DIRECT_TTL_EXCHANGE_NAME.getDesc() , RabbitMQEnum.ROUTINGKEY_TTL_SMS.getDesc(), orderId+"-size:"+i); } } @Override public void directTTLMessageMakeOrder(String userId, String productId, int num) { // 1、根据商品Id 查询库存是否充足 // 2、保存订单 String orderId = UUID.randomUUID().toString(); System.out.println("订单生成成功:" + orderId); // 3.给消息设置过期时间 MessagePostProcessor postProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; // 3、通过MQ 来完成消息的分发 // 参数1:交换机 参数2:路由key/queue队列名称 参数3:消息内容 // 发送SMS System.out.println(RabbitMQEnum.ROUTINGKEY_TTL_MESSAGE_SMS.getDesc()); rabbitTemplate.convertAndSend(RabbitMQEnum.DIRECT_TTL_EXCHANGE_NAME.getDesc() , RabbitMQEnum.ROUTINGKEY_TTL_MESSAGE_SMS.getDesc(), orderId,postProcessor); } }消费者 RabbitMQ的连接配置
package com.any.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; @Configuration @Slf4j public class RabbitMQConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) // 此注解必须加 public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); return rabbitTemplate; } }消费者测试接收(其他同理)
package com.any.rabbitmq.fanout; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @RabbitListener(queues = {"fanout.email.queue"}) @Component public class FanoutEmailConsumer { @RabbitHandler public void reviceMessage(String message){ System.out.println("Email fanout---接收到的消息是:->"+message); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)