1. 引入依赖
org.springframework.boot spring-boot-starter-amqp
2. 配置ymal
spring: rabbitmq: host: 192.168.56.10 port: 5672 virtual-host: / publisher-/confirm/is: true # 开启发送端确认 publisher-returns: true # 开启发送端消息抵达队列确认 template: mandatory: true # 只要抵达队列,以异步发送优先回调我们这个returnconnfirms listener: direct: acknowledge-mode: manual # 手动ack消息
3. 使用java *** 作mq
package com.systop.gulimall.order; import com.rabbitmq.client.ConnectionFactory; import com.systop.common.utils.Query; import com.systop.gulimall.order.entity.OrderEntity; import com.systop.gulimall.order.entity.RefundInfoEntity; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @Slf4j @SpringBootTest class GulimallOrderApplicationTests { @Autowired AmqpAdmin amqpAdmin; @Autowired RabbitTemplate rabbitTemplate; // 发布消息 @Test void sendMessage(){ String msg = "java.hello"; OrderEntity orderEntity = new OrderEntity(); orderEntity.setReceiverName("杜宜洲"); orderEntity.setOrderSn("213"); rabbitTemplate.convertAndSend("java.exchange.direct","java.hello",orderEntity); log.info("消息发布成功!!"); } // 创建交换机 @Test void createExchanges() { DirectExchange directExchange = new DirectExchange("java.exchange.direct", true, false); amqpAdmin.declareExchange(directExchange); log.info("交换机[{}]创建成功", "java.exchange.direct"); } // 创建队列 @Test void createQueue() { // public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Maparguments) Queue queue = new Queue("hello-java-queue", true, false, false); amqpAdmin.declareQueue(queue); log.info("队列[{}]创建成功", "hello-java-queue"); } // 交换机绑定队列 @Test void binding(){ Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "java.exchange.direct", "java.hello", null); amqpAdmin.declareBinding(binding); log.info("交换机[{}]绑定成功", "java.exchange.direct"); } }
4. 使用请求发布消息
Autowired private RabbitTemplate rabbitTemplate; @ResponseBody @GetMapping("/send") public String sendMsg (){ for (int i = 0; i < 10; i++) { if (i % 2 == 0){ OrderEntity orderEntity = new OrderEntity(); orderEntity.setReceiverName("杜宜洲"+i); orderEntity.setOrderSn("213"+i); rabbitTemplate.convertAndSend("java.exchange.direct","java.hello",orderEntity,new CorrelationData(UUID.randomUUID().toString())); log.info("消息发布成功!!"); }else { OrderItemEntity orderItemEntity = new OrderItemEntity(); orderItemEntity.setSkuAttrsVals("dasdf"); orderItemEntity.setOrderId(1L); rabbitTemplate.convertAndSend("java.exchange.direct","java.hello22",orderItemEntity,new CorrelationData(UUID.randomUUID().toString())); } } return "ok"; }
5. 接收消息
@RabbitListener(queues = {"hello-java-queue"}) @Service("orderItemService") public class OrderItemServiceImpl extends ServiceImplimplements OrderItemService { // @RabbitListener(queues = {"hello-java-queue"}) @RabbitHandler public void recieveMessage(Message message, OrderEntity orderEntity, Channel channel) throws IOException { System.out.println("接收到消息" + orderEntity); byte[] body = message.getBody(); // channel 内按顺序自增的 long deliveryTag = message.getMessageProperties().getDeliveryTag(); System.out.println("deliveryTag==>" + deliveryTag); // 手动签收,非批量模式 try { if (deliveryTag % 2 == 0){ // 收货 channel.basicAck(deliveryTag,false); System.out.println("签收了货物..." + deliveryTag); }else { // 退货 // requeue(b1) = false 丢弃 requeue=true 发回服务器,服务器重新入队 // b: 是否批量拒收 b1: 拒收后是否重新入队 channel.basicNack(deliveryTag,false,false); System.out.println("没有签收了货物"); } }catch (Exception e){ // 网络中断了 } } }
5. 因为默认的序列化机制是jdk默认的所以需要自己配置序列化,创建一个config类
package com.systop.gulimall.order.config; import com.mysql.cj.protocol.MessageListener; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import java.text.SimpleDateFormat; @Configuration public class MyRabbitConfig { @Autowired RabbitTemplate rabbitTemplate; @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } @PostConstruct public void initRabbitTemplate(){ System.out.println(123); System.out.println(rabbitTemplate); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("/confirm/i...correlationData["+correlationData+"] ==> ack" +"["+b+"]"); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("Fail Message [" + message +"]" + "==>" + "回复的文本内容[" + s + "]"); } }); } // 这个是接收数据的时候需要配置 @Bean public RabbitListenerContainerFactory> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)