SpringBoot简单整合

SpringBoot简单整合,第1张

SpringBoot简单整合

1. 引入依赖

        
            org.springframework.boot
            spring-boot-starter-amqp
        

2. 配置ymal

spring:
  rabbitmq:
    host: 192.168.56.10
    port: 5672
    virtual-host: /




    publisher-/confirm/is: true  # 开启发送端确认
    publisher-returns: true # 开启发送端消息抵达队列确认
    template:
      mandatory: true  # 只要抵达队列,以异步发送优先回调我们这个returnconnfirms
    listener:
      direct:
        acknowledge-mode: manual # 手动ack消息

3.  使用java *** 作mq

package com.systop.gulimall.order;

import com.rabbitmq.client.ConnectionFactory;
import com.systop.common.utils.Query;
import com.systop.gulimall.order.entity.OrderEntity;
import com.systop.gulimall.order.entity.RefundInfoEntity;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;


    // 发布消息
    @Test
    void sendMessage(){
        String msg = "java.hello";
        OrderEntity orderEntity = new OrderEntity();
        orderEntity.setReceiverName("杜宜洲");
        orderEntity.setOrderSn("213");
        rabbitTemplate.convertAndSend("java.exchange.direct","java.hello",orderEntity);
        log.info("消息发布成功!!");

    }

    // 创建交换机
    @Test
    void createExchanges() {
        DirectExchange directExchange = new DirectExchange("java.exchange.direct", true, false);
        amqpAdmin.declareExchange(directExchange);
        log.info("交换机[{}]创建成功", "java.exchange.direct");
    }

    // 创建队列
    @Test
    void createQueue() {
        //     public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)
        Queue queue = new Queue("hello-java-queue", true, false, false);
        amqpAdmin.declareQueue(queue);
        log.info("队列[{}]创建成功", "hello-java-queue");
    }

    // 交换机绑定队列
    @Test
    void binding(){
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "java.exchange.direct", "java.hello", null);
        amqpAdmin.declareBinding(binding);
        log.info("交换机[{}]绑定成功", "java.exchange.direct");
    }

}

4. 使用请求发布消息

Autowired
    private RabbitTemplate rabbitTemplate;

    @ResponseBody
    @GetMapping("/send")
    public String sendMsg (){
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0){

                OrderEntity orderEntity = new OrderEntity();
                orderEntity.setReceiverName("杜宜洲"+i);
                orderEntity.setOrderSn("213"+i);
                rabbitTemplate.convertAndSend("java.exchange.direct","java.hello",orderEntity,new CorrelationData(UUID.randomUUID().toString()));
                log.info("消息发布成功!!");
            }else {
                OrderItemEntity orderItemEntity = new OrderItemEntity();
                orderItemEntity.setSkuAttrsVals("dasdf");
                orderItemEntity.setOrderId(1L);
                rabbitTemplate.convertAndSend("java.exchange.direct","java.hello22",orderItemEntity,new CorrelationData(UUID.randomUUID().toString()));

            }
        }
        return "ok";
    }

5. 接收消息

@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl implements OrderItemService {




    
    // @RabbitListener(queues = {"hello-java-queue"})
    @RabbitHandler
    public void recieveMessage(Message message, OrderEntity orderEntity, Channel channel) throws IOException {
        System.out.println("接收到消息" + orderEntity);
        byte[] body = message.getBody();

        // channel 内按顺序自增的
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag==>" + deliveryTag);
        // 手动签收,非批量模式
        try {
            if (deliveryTag % 2 == 0){
                // 收货
                channel.basicAck(deliveryTag,false);
                System.out.println("签收了货物..." + deliveryTag);
            }else {
                // 退货
                // requeue(b1) = false 丢弃  requeue=true 发回服务器,服务器重新入队
                // b: 是否批量拒收  b1: 拒收后是否重新入队
                channel.basicNack(deliveryTag,false,false);
                System.out.println("没有签收了货物");
            }
        }catch (Exception e){
            // 网络中断了
        }

    }

}

5. 因为默认的序列化机制是jdk默认的所以需要自己配置序列化,创建一个config类

package com.systop.gulimall.order.config;

import com.mysql.cj.protocol.MessageListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;


@Configuration
public class MyRabbitConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }


    
    @PostConstruct
    public void initRabbitTemplate(){
        System.out.println(123);
        System.out.println(rabbitTemplate);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("/confirm/i...correlationData["+correlationData+"] ==> ack" +"["+b+"]");
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("Fail Message [" + message +"]" + "==>" + "回复的文本内容[" + s + "]");
            }
        });

    }
    
    // 这个是接收数据的时候需要配置
    @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }



}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5703590.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存