RabbitMQ使用案例

RabbitMQ使用案例,第1张

RabbitMQ使用案例 一.发布与订阅模式(队列-->交换机)

yml配置:

server:
  port: 8088
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    publisher-/confirm/i-type: correlated    #消息确认方式,通过 correlated 来确认(将来的消息中才会带 correlation_id,只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来)
    publisher-returns: true     #开启发送失败退回
    #1.开启 confirm 确认机制

maven依赖:

 
 
   org.springframework.boot
   spring-boot-starter-amqp
 
1.配置类:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitConfig {

    public static final String RPC_QUEUE1 = "queue_1";      //通道1
    public static final String RPC_QUEUE2 = "queue_2";          //通道2
    public static final String RPC_EXCHANGE = "rpc_exchange";       //  交换机

    
    @Bean
    Queue msgQueue() {
        return new Queue(RPC_QUEUE1);
    }

    
    @Bean
    Queue replyQueue() {
        return new Queue(RPC_QUEUE2);
    }

    
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(RPC_EXCHANGE);
    }

    
    @Bean
    Binding msgBinding() {
        return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
    }

    
    @Bean
    Binding replyBinding() {
        return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
    }


    
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setReplyAddress(RPC_QUEUE2);
        template.setReplyTimeout(6000);
        return template;
    }


    
    @Bean
    SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(RPC_QUEUE2);
        container.setMessageListener(rabbitTemplate(connectionFactory));
        return container;
    }
}

2.生产者

import com.yl.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;


@Slf4j
@RestController
public class RpcClientController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String send(String message) {
        // 创建消息对象
        Message newMessage = MessageBuilder.withBody(message.getBytes()).build();
        log.info("生产者发送消息----->>>>>", newMessage);
        //客户端发送消息
        Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage);

        String response = "";
        if (result != null) {
            // 获取已发送的消息的 correlationId
            String correlationId = newMessage.getMessageProperties().getCorrelationId();
            log.info("生产者----->>>>>{}", correlationId);

            // 获取响应头信息
            HashMap headers = (HashMap) result.getMessageProperties().getHeaders();

            // 获取 server 返回的消息 id
            String msgId = (String) headers.get("spring_returned_message_correlation");

            if (msgId.equals(correlationId)) {
                response = new String(result.getBody());
                log.info("生产者发送消息----->>>>>:{}", response);
            }
        }
        return response;
    }
}
3.消费者

import com.yl.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;


@Slf4j
@Component
public class RpcServerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = RabbitConfig.RPC_QUEUE1)
    public void process(Message msg) throws UnsupportedEncodingException {
        String message=new String(msg.getBody(),"UTF-8");

        log.info("消费者消费消息的消息体:{}----->>>>>"+message);
        Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build();
        CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());
        rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData);
    }
}

postmain:http://localhost:8088/send?message=听闻广陵不知寒,大雪龙骑下江南

结果:

  

二.路由模式(路由--->交换机) yml配置:
 
server:
  port: 8084
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    publisher-/confirm/is: true    #1.开启 confirm 确认机制
    publisher-returns: true    #2.开启 return 确认机制
     #3.设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    template:
      mandatory: true

maven配置:
  
 
    org.springframework.boot
    spring-boot-starter-amqp
 
1.RabbitMQ配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitQueueConfig {
    
    public static final String QUEUE_NAME="rollback-queue";     //通道
    public static final String EXCHANGE_NAME="rollback-exchange";       //交换机
    public static final String ROUTINGKEY_NAME="rollback-routingkey";       //路由

    
    
    @Bean
    public Queue queue(){
        return new Queue(QUEUE_NAME,true);
    }

    
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(EXCHANGE_NAME);
    }

    
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(directExchange())
                .with(ROUTINGKEY_NAME);
    }
}

2.重写RabbitTemplate,创建RabbitTemplateConfig配置类(手动确认消费)
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



@Configuration
@Slf4j
public class RabbitTemplateConfig {

    //第二种方式
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() 		{
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info("/confirm/iCallback,相关数据:{}", correlationData);
            log.info("/confirm/iCallback,确认消息:{}", ack);
            log.info("/confirm/iCallback,原因:{}", cause);
        }
    };

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate();
        //设置连接工厂Bean
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //手动开启
        rabbitTemplate.setMandatory(true);

        //设置传输数据是json格式
        rabbitTemplate.setMessageConverter(jsonMessageConverter());

        //流程:生产者-->交换机-->路由键-->队列
        //ConfirmCallback
        //流程:生产者-->交换机
        //1)成功  触发回调
        //2)失败  触发回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("/confirm/iCallback,相关数据:{}", correlationData);
                log.info("/confirm/iCallback,确认消息:{}", ack);
                log.info("/confirm/iCallback,原因:{}", cause);
            }
        });

        //第二种方式
        //rabbitTemplate.setConfirmCallback(/confirm/iCallback);

        //ReturnCallback:该回调函数的触发器与mandatory: true参数有必要关系
        //流程:交换机-->队列
        //成功  不触发回调
        //失败  触发回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("ReturnCallback,消息:{}", message);
                log.info("ReturnCallback,回应码:{}", replyCode);
                log.info("ReturnCallback,回应信息:{}", replyText);
                log.info("ReturnCallback,交换机:{}", exchange);
                log.info("ReturnCallback,路由键:{}", routingKey);
            }
        });

        return rabbitTemplate;
    }


    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

3.消息监听指定具体的队列
import com.yl.controller.RabbitQueueReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;




@Configuration
public class MessageListenerConfig {

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(
            ConnectionFactory connectionFactory,
            RabbitQueueConfig rabbitQueueConfig,
            RabbitQueueReceiver rabbitQueuReceiver){
        SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);//设置mq连接工厂对象
        container.setConcurrentConsumers(1);//设置并发消费者
        container.setMaxConcurrentConsumers(1);//设置最多的并发消费者
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息

//        container.setMessageConverter(jackson2JsonMessageConverter());



        //注意:此处不能使用Autowired根据类型自动注入队列,必须调用rabbitmqDirectConfig.firstQueue()获得,why?
        // 因为项目中可能存在多个队列,它们的类型都是Queue,自动注入会报错
        container.setQueues(rabbitQueueConfig.queue());
        container.setMessageListener(rabbitQueuReceiver);

        return container;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

4.消息生产者
import com.yl.common.Result;
import com.yl.common.SnowflakeIdWorker;
import com.yl.entity.Journal;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;



@RestController
@Slf4j
public class SendController {

    final static SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0);
    @Autowired
    private RabbitTemplate rabbitTemplate;


    @RequestMapping("send")
    public Result send(){
        String exchange="rollback-exchange";
        String routingkey="rollback-routingkey";

        log.info("生产者开始发送消息");
        Journal journal = new Journal();
        journal.setId(idWorker.nextId());
        journal.setTitle("听闻广陵不知寒,大雪龙骑下江南");
//        journal.setCreateTime(LocalDateTime.now());
        journal.setTitleDesc("怒发冲冠⑵,凭阑处⑶、潇潇雨歇⑷。抬望眼,仰天长啸⑸,壮怀激烈⑹。三十功名尘与土⑺,八千里路云和月⑻。莫等闲⑼、白了少年头,空悲切⑽。 靖康耻⑾,犹未雪。臣子恨,何时灭。驾长车,踏破贺兰山缺⑿。壮志饥餐胡虏肉⒀,笑谈渴饮匈奴血⒁。待从头、收拾旧山河,朝天阙⒂。");
        //注意:将消息推送到正常的交换机中
        //参数一:交换机名称
        //参数二:路由键
        //参数三:传递参数
        //流程:生产者-->交换机-->路由键-->队列
        rabbitTemplate.convertAndSend(exchange,routingkey,journal);
        log.info("生产者发送消息完成");
        return Result.succ(" *** 作成功");
    }
}

5.消息的消费者
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.util.JSONPObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RabbitListener(queues = {"rollback_queue"})
public class RabbitQueueReceiver extends AbstractAdaptableMessageListener {

    
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //消息的唯一ID,单调递增正整数,从1开始,当multiple=trues,一次性处理<=deliveryTag的所有
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        boolean multiple=false;  //false单条   true 批量

//       channel.basicAck(deliveryTag, multiple);	//正确消费消息
//       channel.basicReject(deliveryTag,true);     //为true会重新放回队列
//       channel.basicNack(deliveryTag, multiple,true)
        try {
            String msg=new String(message.getBody(),"UTF-8");
            JSonObject json = JSONObject.parseObject(msg);
            Long id = json.getLong("id");
            log.info("消费的消息id"+id+"-------->>>>>>>"+"消费者消费消息的消息体:{}----->>>>>"+message);

            //睡眠四秒
            for(int i=0;i<4;i++){
                Thread.sleep(1000);
                System.out.println("...");
            }

//            if(deliveryTag%2==0){
//                throw new RuntimeException("偶数必须为0");
//            }

            log.info("消息已被正确消费--->>>>>>>>"+deliveryTag);
            //当前模式为单条消费
            channel.basicAck(deliveryTag, multiple);

        } catch (Exception e) {
            e.printStackTrace();
            //报异常重新投递
            channel.basicReject(deliveryTag,true);
        }
    }


//    @RabbitHandler
//    public void handlerMessage(Journal orderVo) {
//        log.info("消费者消费消息"+orderVo.toString());
//    }
}

6.实体类对象
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;

import java.time.LocalDateTime;


@Data
public class Journal {

    private Long id;

    private String title;

    private String titleDesc;

}
7.工具类


//雪花算法 生成主键ID
public class SnowflakeIdWorker {

    // ==============================Fields===========================================
    
    private final long twepoch = 1420041600000L;

    
    private final long workerIdBits = 5L;

    
    private final long datacenterIdBits = 5L;

    
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

    
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

    
    private final long sequenceBits = 12L;

    
    private final long workerIdShift = sequenceBits;

    
    private final long datacenterIdShift = sequenceBits + workerIdBits;

    
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

    
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    
    private long workerId;

    
    private long datacenterId;

    
    private long sequence = 0L;

    
    private long lastTimestamp = -1L;

    // ==============================Constructors=====================================
    
    public SnowflakeIdWorker(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(
                    String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(
                    String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

    // ==============================Methods==========================================
    
    public synchronized long nextId() {
        long timestamp = timeGen();

        // 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(String.format(
                    "Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        // 如果是同一时间生成的,则进行毫秒内序列
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            // 毫秒内序列溢出
            if (sequence == 0) {
                // 阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        // 时间戳改变,毫秒内序列重置
        else {
            sequence = 0L;
        }

        // 上次生成ID的时间截
        lastTimestamp = timestamp;

        // 移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (datacenterId << datacenterIdShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }

    
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    
    protected long timeGen() {
        return System.currentTimeMillis();
    }

    // ==============================Test=============================================
    
    public static void main(String[] args) {
        SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0);
        for (int i = 0; i < 100; i++) {
            long id = idWorker.nextId();
            String insertSQL = "insert into orderNumber value('" + id + "');";
            System.out.println(insertSQL);
        }
    }
}
import lombok.Data;

import java.io.Serializable;

@Data
public class Result implements Serializable {

	private int code;
	private String msg;
	private Object data;

	public static Result succ(Object data) {
		return succ(200, " *** 作成功", data);
	}

	public static Result succ(int code, String msg, Object data) {
		Result r = new Result();
		r.setCode(code);
		r.setMsg(msg);
		r.setData(data);
		return r;
	}

	public static Result fail(String msg) {
		return fail(400, msg, null);
	}

	public static Result fail(int code, String msg, Object data) {
		Result r = new Result();
		r.setCode(code);
		r.setMsg(msg);
		r.setData(data);
		return r;
	}

}

 postmain调用接口生产消息:http://localhost:8084/send

结果: 项目地址:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688820.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存