目录
1.TTL机制
1.1 实现方案
1.2 原生API实现
1.3 SpringBoot实现
2.死信队列
2.1 原生API实现
2.2 SpringBoot实现
3.延迟队列
3.1 延时队列的使用
1.TTL机制 1.1 实现方案
目前的电商业务中订单创建成功,等待支付一般都会给一定的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。
如何实现这个功能?
定时轮询(数据库等)
用户下单成功,将订单数据放入数据库,同时将支付状态放入数据库,用户付款更
改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。
优点:设计实现简单。
缺点: 需要对数据库进行大量的IO *** 作,效率低下。
Timer
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss"); Timer timer = new Timer(); TimerTask timerTask = new TimerTask() { @Override public void run() { System.out.println("用户没有付款,交易取消:" + simpleDateFormat.format(new Date(System.currentTimeMillis()))); timer.cancel(); } }; System.out.println("等待用户付款:" + simpleDateFormat.format(new Date(System.currentTimeMillis()))); // 10秒后执行timerTask timer.schedule(timerTask, 10 * 1000);
优点: 没有想到
缺点: 没有持久化机制。
不灵活 (只可以设置开始时间和重复间隔, 对于其它业务不太合适)。
不能利用线程池,一个timer一个线程。
没有真正的管理计划。
Scheduler及其它定时器
// 线程工厂 ThreadFactory factory = Executors.defaultThreadFactory(); // 使用线程池 ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory); System.out.println("开始等待用户付款10秒:" + format.format(new Date())); service.schedule(new Runnable() { @Override public void run() { System.out.println("用户未付款,交易取消:" + format.format(new Date())); }// 等待10s 单位秒 }, 10, TimeUnit.SECONDS);
优点:可多线程执行,一定程度上避免任务互相影响,单个任务异常不影响其它任务
缺点:高并发下不建议使用定时任务,较浪费服务器性能
RabbitMQ 的 TTL
通过对 消息 和 队列 两个维度来设置 TTL
优点:无需消费过多的服务器性能即可实现定时的功能
缺点:任何中间件的容量和堆积能力都是有限的,且消息中间件的引入会增加许多问题
由于消息中间件的容量和堆积能力都是有限的,如果有些消息总是无法消费掉,就需要有一种东西进行兜底。
目前有两种方法设置消息的TTL,两种方法同时使用,则消息的TTL已两者之间 较小数值为准。
1. 通过 Queue 属性设置,队列中所有消息都有相同的过期时间。
2. 对消息自身进行单独设置,每条消息的TTL可以不同。
1.2 原生API实现try(Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 创建队列(实际上使用的是AMQP default这个direct类型的交换器) // 设置队列属性 Maparguments = new HashMap<>(); // 设置队列的TTL arguments.put("x-message-ttl", 30000); // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久) arguments.put("x-expires", 10000); channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); channel.basicPublish("", QUEUE_NAME, //设置消息本身的过期时间 new AMQP.BasicProperties().builder().expiration("30000").build(), message.getBytes()) } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
此外,还可以通过命令行方式设置全局TTL,执行如下命令:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues
默认规则:
1.如果不设置 x-message-ttl, 则表示消息不会过期。
2.如果 x-message-ttl 设置为0, 则表示除非消息可以直接将消息投递到消费者,否则消息会被立即丢弃 。
1.3 SpringBoot实现@Configuration public class RabbitConfig { @Bean public Queue queueTTLWaiting() { Mapprops = new HashMap<>(); // 对于该队列中的消息,设置都等待10s props.put("x-message-ttl", 10000); // 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久) props.put("x-expires", 1000); Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props); return queue; } }
@RestController public class controller{ @RequestMapping("/pay/msgttl") public String sendTTLMessage() throws UnsupportedEncodingException { MessageProperties properties = new MessageProperties(); //设置消息本身过期时间 properties.setExpiration("5000"); Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties); rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message); return "msg-ttl-ok"; } }2.死信队列
DLX,全称为 Dead-Letter-Excahnge, 死信交换机。
在各个外卖系统中,用户下单调用订单服务, 然后订单服务调用派单系统通知外卖人员送单,这时候订单系统与派单系统 采用 MQ异步通讯。
用户在下单之后,外卖员接单之后再取消接单应该如何处理,用户下单长时间未被接单如何处理?
这种场景下我们可以考虑定义一个死信交换机,并绑定一个死信队列。当消息变成死信时,该消息就会被发送到该死信队列上,这样方便我们查看消息失败的原因,从而j进行下一步的处理,是取消订单还是安排其他的外卖员。
消息在到达队列之后,被重新发送到一个特殊的交换机(DLX)中, 同时,绑定DLX的队列就称为“死信队列”。
一下几种情况导致消息变成死信:
1. 消息被拒绝 (Basic.Reject / Basic.Nack), 并且设置 requeue 参数为 false。
2. 消息过期。
3. 队列达到最大长度。
同时我们也可以在处理异常的时候,如果消息不能被消费者正常消费,可以放置到死信队列,后续进行分析程序中的异常情况,进而改善和优化系统。
2.1 原生API实现try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 定义一个死信交换器(也是一个普通的交换器) channel.exchangeDeclare("exchange.dlx", "direct", true); // 定义一个正常业务的交换器 channel.exchangeDeclare("exchange.biz", "fanout", true); Map2.2 SpringBoot实现arguments = new HashMap<>(); // 设置队列TTL arguments.put("x-message-ttl", 10000); // 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列) arguments.put("x-dead-letter-exchange", "exchange.dlx"); // 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的 routingKey arguments.put("x-dead-letter-routing-key", "routing.key.dlx.test"); //定义交换机队列及绑定关系 channel.queueDeclare("queue.biz", true, false, false, arguments); channel.queueBind("queue.biz", "exchange.biz", ""); channel.queueDeclare("queue.dlx", true, false, false, null); // 死信队列和死信交换器 channel.queueBind("queue.dlx", "exchange.dlx", "routing.key.dlx.test"); channel.basicPublish("exchange.biz", "", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx.test".getBytes()); } catch (Exception e) { e.printStackTrace(); }
@Configuration public class RabbitConfig { @Bean public Queue queue() { Mapprops = new HashMap<>(); // 消息在队列中的生存时间 10s props.put("x-message-ttl", 10000); // 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列) props.put("x-dead-letter-exchange", "ex.go.dlx"); // 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey props.put("x-dead-letter-routing-key", "go.dlx"); Queue queue = new Queue("q.go", true, false, false, props); return queue; } }
3.延迟队列一般来说,死信队列都会配合RabbitMQ的TTL来使用,在消息超时之后会被自动路由到死信队列中。
在一定的业务场景中,我们发送到消息队列中的消息不一定想立即被消费,就比如:
在12306中购买火车票,选中一个座位中订单未支付的时间段中系统会将这个座位锁定,如果超过时间还没有付款的话系统会自动把座位释放掉,怎么实现类似的功能呢?
1.可以用定时任务每分钟扫描一次,发现有超过15分钟的就释放掉,但这样非常浪费系统资源。
2. 可以用分布式锁、分布式缓存的被动过期时间,15分钟之后锁会自动释放,但是这样会长期占用系统的资源。
3. 可以通过TTL配合死信队列来实现。但是TTL扫描是从队列的头依此往后扫描,加入第一个消息没有超时,后续的消息是不会被扫描的,如果队列中的消息过期时间不是固定的,就会导致后面消息过期了,但是仍然没有被处理。
因此可以使用延迟队列,锁座成功之后会发送一条延迟消息到延时交换机,延时交换机轮询所有的消息,消息到指定的时间后会被消费,消费的过程就是检查这个座位是否是“已付款”状态;
RabbitMQ本身并没有提供延时队列的功能,可以用 rabbitmq_delayed_message_exchange 插件来实现,它与TTL最大的不同就是 TTL 存放消息在死信队列中,而它则是存放消息在延时交换机中。
1. 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
2. 延时交换机(exchange)存储消息持续扫描所有的消息,等待消息到期根据路由键(routekey)找到 绑定自己的队列(queue)并把消息给它
3. 队列(queue)再把消息发送给监听它的消费者(customer)
下载插件
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
安装插件
、 将插件拷贝到rabbitmq-server的安装路径
# 启用插件 rabbitmq-plugins list rabbitmq-plugins enable rabbitmq_delayed_message_exchange #重启rabbitmq-server systemctl restart rabbitmq-server
@Configuration public class RabbitConfig { @Bean public Exchange exchange() { Mapprops = new HashMap<>(); props.put("x-delayed-type", ExchangeTypes.FANOUT); Exchange exchange = new CustomExchange("ex.delayed", "xdelayed-message", true, false, props); return exchange; }
public class PublishController { @Autowired private AmqpTemplate rabbitTemplate; @RequestMapping("/prepare/{seconds}") public String toMeeting(@PathVariable Integer seconds) throws UnsupportedEncodingException { // RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列 // 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX // 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。 // 插件rabbitmq_delayed_message_exchange帮我们搞定这个。 MessageProperties properties = new MessageProperties(); properties.setHeader("x-delay", (seconds - 10) * 1000); Message message = new Message((seconds + "秒后召开销售部门会议。").getBytes("utf-8"),properties); rabbitTemplate.convertAndSend("ex.delayed", "key.delayed",message); return "已经定好闹钟了,到时提前告诉大家"; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)