RabbitMQ高级特性解析

RabbitMQ高级特性解析,第1张

一、RabbitMQ消息可靠性解决方案 1、可靠性分析

在使用任何消息中间件的过程中,难免会出现消息丢失等异常情况,这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。

在RabbitMQ 中可以使用Firehose 功能来实现消息追踪,Firehose 可以记录每一次发送或者消费消息的记录,方便RabbitMQ 的使用者进行调试、排错等。

Firehose 的原理是将生产者投递给RabbitMQ 的消息,或者RabbitMQ 投递给消费者的消息按照指定的格式发送到默认的交换器上。这个默认的交换器的名称为 amq.rabbitmq.trace ,它是一个topic 类型的交换器。发送到这个交换器上的消息的路由键为 publish.{exchangename} 和 deliver.{queuename} 。其中 exchangename 和 queuename 为交换器和队列的名称,分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。

开启Firehose命令:

rabbitmqctl trace_on [-p vhost]

其中[-p vhost]是可选参数,用来指定虚拟主机vhost。

对应的关闭命令为:

rabbitmqctl trace_off [-p vhost]

Firehose 默认情况下处于关闭状态,并且Firehose 的状态是非持久化的,会在RabbitMQ服务重启的时候还原成默认的状态。Firehose 开启之后多少会影响RabbitMQ 整体服务性能,因为它会引起额外的消息生成、路由和存储。

rabbitmq_tracing 插件相当于Firehose 的GUI 版本,它同样能跟RabbitMQ 中消息的流入流出情况。rabbitmq_tracing 插件同样会对流入流出的消息进行封装,然后将封装后的消息日志存入相应的trace 文件中。

可以使用命令来启动rabbitmq_ tracing 插件

rabbitmq-plugins enable rabbitmq_tracing

使用命令关闭该插件。

rabbitmq-plugins disable rabbitmq_tracing

Name表示rabbitmq_tracing的一个条目的名称,Format可以选择Text或JSON,连接的用户名写root,密码写123456。

Pattern:发布的消息:publish.
Pattern:消费的消息:deliver.

2、 TTL机制


在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。

该如何实现?

  • 定期轮询(数据库等)
    用户下单成功,将订单信息放入数据库,同时将支付状态放入数据库,用户付款更改数据库状态。定期轮询数据库支付状态,如果超过30分钟就将该订单取消。
    优点:设计实现简单
    缺点:需要对数据库进行大量的IO *** 作,效率低下

  • Timer

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss");
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
	@Override
	public void run() {
		System.out.println("用户没有付款,交易取消:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
		timer.cancel();
	}
};
System.out.println("等待用户付款:" + simpleDateFormat.format(new Date(System.currentTimeMillis())));
// 10秒后执行timerTask
timer.schedule(timerTask, 10 * 1000);

缺点:

Timers没有持久化机制.
Timers不灵活 (只可以设置开始时间和重复间隔,对等待支付貌似够用)
Timers 不能利用线程池,一个timer一个线程
Timers没有真正的管理计划

  • ScheduledExecutorService
SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
// 线程工厂
ThreadFactory factory = Executors.defaultThreadFactory();
// 使用线程池
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(10, factory);
System.out.println("开始等待用户付款10秒:" + format.format(new Date()));
service.schedule(new Runnable() {
	@Override
	public void run() {
		System.out.println("用户未付款,交易取消:" +
		format.format(new Date()));
	}// 等待10s 单位秒
}, 10, TimeUnit.SECONDS);

优点:可以多线程执行,一定程度上避免任务间互相影响,单个任务异常不影响其它任务。
在高并发的情况下,不建议使用定时任务去做,因为太浪费服务器性能,不建议

  • RabbitMQ
    使用TTL
  • Quartz
  • Redis Zset
  • JCronTab
  • SchedulerX

TTL,Time to Live 的简称,即过期时间。
RabbitMQ 可以对消息和队列两个维度来设置TTL。
任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底

目前有两种方法可以设置消息的TTL。

  • 通过Queue属性设置,队列中所有消息都有相同的过期时间。
  • 对消息自身进行单独设置,每条消息的TTL 可以不同。

如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然,“死信”也是可以被取出来消费的.

可以通过命令行方式设置全局TTL,执行如下命令:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues

默认规则:

  • 如果不设置TTL,则表示此消息不会过期;
  • 如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃;

注意理解 message-ttl 、 x-expires 这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般TTL相关的参数单位都是毫秒(ms)

springboot案例:

  1. pom.xml添加依赖
<dependencies>
	<dependency>
		<groupId>org.springframework.bootgroupId>
		<artifactId>spring-boot-starter-amqpartifactId>
	dependency>
	<dependency>
		<groupId>org.springframework.bootgroupId>
		<artifactId>spring-boot-starter-webartifactId>
	dependency>
	<dependency>
		<groupId>org.springframework.bootgroupId>
		<artifactId>spring-boot-starter-testartifactId>
		<scope>testscope>
	<exclusions>
		<exclusion>
			<groupId>org.junit.vintagegroupId>
			<artifactId>junit-vintage-engineartifactId>
		exclusion>
	exclusions>
	dependency>
	<dependency>
		<groupId>org.springframework.amqpgroupId>
		<artifactId>spring-rabbit-testartifactId>
		<scope>testscope>
	dependency>
dependencies>
  1. application.properties添加rabbitmq连接信息

     spring.application.name=ttl
     spring.rabbitmq.host=node1
     spring.rabbitmq.virtual-host=/
     spring.rabbitmq.username=root
     spring.rabbitmq.password=123456
     spring.rabbitmq.port=5672
    
  2. 主入口类

@SpringBootApplication
public class RabbitmqDemo {
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemo07.class, args);
    }
}
  1. RabbitConfig类
@Configuration
public class RabbitConfig {
    @Bean
    public Queue queueTTLWaiting() {
        Map<String, Object> props = new HashMap<>();
// 对于该队列中的消息,设置都等待10s
        props.put("x-message-ttl", 10000);
        Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
        return queue;
    } 
    @Bean
    public Queue queueWaiting() {
        Queue queue = new Queue("q.pay.waiting", false, false, false);
        return queue;
    } 
    @Bean
    public Exchange exchangeTTLWaiting() {
        DirectExchange exchange = new DirectExchange("ex.pay.ttlwaiting", false, false);
        return exchange;
    } 
    /**
        * 该交换器使用的时候,需要给每个消息设置有效期
        * @return
        */
    @Bean
    public Exchange exchangeWaiting() {
        DirectExchange exchange = new DirectExchange("ex.pay.waiting", false, false);
        return exchange;
    } 
    @Bean
    public Binding bindingTTLWaiting() {
        return BindingBuilder.bind(queueTTLWaiting()).to(exchangeTTLWaiting()).wit h("pay.ttl-waiting").noargs();
    } 
    @Bean
    public Binding bindingWaiting() {
        return BindingBuilder.bind(queueWaiting()).to(exchangeWaiting()).with("pay
                        .waiting").noargs();
    }

  1. PayController类
@RestController
public class PayController {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    
    @RequestMapping("/pay/queuettl")
    public String sendMessage() {
        rabbitTemplate.convertAndSend("ex.pay.ttl-waiting", "pay.ttl-waiting", "发送了TTL-WAITING-MESSAGE");
        return "queue-ttl-ok";
    } 
    @RequestMapping("/pay/msgttl")
    public String sendTTLMessage() throws UnsupportedEncodingException {
        MessageProperties properties = new MessageProperties();
        properties.setExpiration("5000");
        Message message = new Message("发送了WAITINGMESSAGE".getBytes("utf-8"), properties);
        rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
        return "msg-ttl-ok";
    }
}
3、死信队列


用户下单,调用订单服务,然后订单服务调用派单系统通知外卖人员送单,这时候订单系统与派单系统 采用 MQ异步通讯。

在定义业务队列时可以考虑指定一个 死信交换机,并绑定一个死信队列。当消息变成死信时,该消息就会被发送到该死信队列上,这样方便我们查看消息失败的原因。

DLX,全称为Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter)之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为“死信队列”。

以下几种情况导致消息变为死信:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
  • 消息过期;
  • 队列达到最大长度。

对于RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了Basic.Nack 或者Basic.Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统

springboot案例

  1. pom.xml添加依赖
<dependencies>
			<dependency>
				<groupId>org.springframework.bootgroupId>
				<artifactId>spring-boot-starter-amqpartifactId>
			dependency>
			<dependency>
				<groupId>org.springframework.bootgroupId>
				<artifactId>spring-boot-starter-webartifactId>
			dependency>
			<dependency>
				<groupId>org.springframework.bootgroupId>
				<artifactId>spring-boot-starter-testartifactId>
				<scope>testscope>
				<exclusions>
					<exclusion>
						<groupId>org.junit.vintagegroupId>
						<artifactId>junit-vintage-engineartifactId>
					exclusion>
				exclusions>
			dependency>
			<dependency>
				<groupId>org.springframework.amqpgroupId>
				<artifactId>spring-rabbit-testartifactId>
				<scope>testscope>
			dependency>
		dependencies>
  1. application.properties添加RabbitMQ连接信息

     spring.application.name=dlx
     spring.rabbitmq.host=node1
     spring.rabbitmq.virtual-host=/
     spring.rabbitmq.username=root
     spring.rabbitmq.password=123456
     spring.rabbitmq.port=5672
    
  2. 主入口类

@SpringBootApplication
public class RabbitmqDemo {
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemo08.class, args);
    }
}
  1. RabbitConfig类:
@Configuration
public class RabbitConfig {
    @Bean
    public Queue queue() {
        Map<String, Object> props = new HashMap<>();
// 消息的生存时间 10s
        props.put("x-message-ttl", 10000);
// 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
        props.put("x-dead-letter-exchange", "ex.go.dlx");
// 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
        props.put("x-dead-letter-routing-key", "go.dlx");
        Queue queue = new Queue("q.go", true, false, false, props);
        return queue;
    } 
    @Bean
    public Queue queueDlx() {
        Queue queue = new Queue("q.go.dlx", true, false, false);
        return queue;
    } 
    @Bean
    public Exchange exchange() {
        DirectExchange exchange = new DirectExchange("ex.go", true, false, null);
        return exchange;
    } 
    /**
            * 死信交换器
* @return
        */
    @Bean
    public Exchange exchangeDlx() {
        DirectExchange exchange = new DirectExchange("ex.go.dlx", true, false, null);
        return exchange;
    } 
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("go").noargs();
    } 
    /**
            * 死信交换器绑定死信队列
* @return
        */
    @Bean
    public Binding bindingDlx() {
        return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with("go.dlx").no
        args();
    }
}
  1. GoController类:
@RestController
public class GoController {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    @RequestMapping("/go")
    public String distributeGo() {
        rabbitTemplate.convertAndSend("ex.go", "go", "送单到石景山x小区,请在10秒内接受任务");
        return "任务已经下发,等待送单。。。";
    } 
    @RequestMapping("/notgo")
    public String getAccumulatedTask() {
        String notGo = (String) rabbitTemplate.receiveAndConvert("q.go.dlx");
        return notGo;
    }
}
4、 延迟队列

延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费

例如下面的业务场景:在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时间,如果15分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?

  • 可以用定时任务每分钟扫一次,发现有占座超过15分钟还没付款的就释放掉。但是这样做很低效,很多时候做的都是些无用功;
  • 可以用分布式锁、分布式缓存的被动过期时间,15分钟过期后锁也释放了,缓存key也不存在了;
  • 还可以用延迟队列,锁座成功后会发送1条延迟消息,这条消息15分钟后才会被消费,消费的过程就是检查这个座位是否已经是“已付款”状态;

你在公司的协同办公系统上面预约了一个会议,邀请汪产品和陈序员今晚22点准时参加会有。系统还比较智能,除了默认发会议邀请的邮件告知参会者以外,到了今晚21:45分的时候(提前15分钟)就会通知提醒参会人员做好参会准备,会议马上开始…

同样的,这也可以通过轮询“会议预定表”来实现,比如我每分钟跑一次定时任务看看当前有哪些会议即将开始了。当然也可以通过延迟消息来实现,预定会议以后系统投递一条延迟消息,而这条消息比较特殊不会立马被消费,而是延迟到指定时间后再触发消费动作(发通知提醒参会人准备)。不过遗憾的是,在AMQP协议和RabbitMQ中都没有相关的规定和实现。不过,我们似乎可以借助“死信队列”来变相的实现。

可以使用rabbitmq_delayed_message_exchange插件实现。

这里和TTL方式有个很大的不同就是TTL存放消息在死信队列(delayqueue)里,二基于插件存放消息在延时交换机里(x-delayed-message-exchange)。

  • 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
  • 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列(queue)并把消息给它
  • 队列(queue)再把消息发送给监听它的消费者(customer)
  1. 下载插件
    下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  1. 安装插件
    将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins

  2. 启用插件

     rabbitmq-plugins list
     rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
  3. 重启rabbitmq-server

     systemctl restart rabbitmq-server
    
  4. 编写代码,首先是SpringBootApplication主入口类

@SpringBootApplication
public class RabbitmqDemo {
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemo.class, args);
    }
}

RabbitMQ的对象配置

@Configuration
public class RabbitConfig {
    @Bean
    public Queue queue() {
        Queue queue = new Queue("q.delayed", false, false, false,
                null);
        return queue;
    }
    @Bean
    public Exchange exchange() {
        Map<String, Object> props = new HashMap<>();
        props.put("x-delayed-type", ExchangeTypes.FANOUT);
        Exchange exchange = new CustomExchange("ex.delayed", "xdelayed-message", true, false, props);
        return exchange;
    } 
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("key.delayed").noargs();
    }
}

使用推消息模式接收延迟队列的广播

@Component
public class MeetingListener {
    @RabbitListener(queues = "q.delayed")
    public void broadcastMeetingAlarm(Message message, Channel channel) throws IOException {
        System.err.println("提醒:5秒后:" + new String(message.getBody(), "utf-8"));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

开发RestController,用于向延迟队列发送消息,并指定延迟的时长

@RestController
public class PublishController {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/prepare/{seconds}")
    public String toMeeting(@PathVariable Integer seconds) throws UnsupportedEncodingException {
// RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
// 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
// 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
// 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
        MessageProperties properties = new MessageProperties();
        properties.setHeader("x-delay", (seconds - 10) * 1000);
        Message message = new Message((seconds + "秒后召开销售部门会议。").getBytes("utf-8"), properties);
// 如果不设置message的properties,也可以使用下述方法设置x-delay属性的值 
//                rabbitTemplate.convertAndSend("ex.delayed",
//                        "key.delayed", message, msg -> {
// // 使用定制的属性x-delay设置过期时间,也就是提前5s提醒
// // 当消息转换完,设置消息头字段
//                      msg.getMessageProperties().setHeader("x-delay",(seconds - 5) * 1000);
//                      return msg;
//      });
        rabbitTemplate.convertAndSend("ex.delayed", "key.delayed", message);
        return "已经定好闹钟了,到时提前告诉大家";
    }
}

application.properties中添加rabbitmq的配置

spring.application.name=delayedqueue
spring.rabbitmq.host=node1
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672

pom.xml添加依赖:

<dependencies>
			<dependency>
				<groupId>org.springframework.bootgroupId>
				<artifactId>spring-boot-starter-amqpartifactId>
			dependency>
			<dependency>
				<groupId>org.springframework.bootgroupId>
				<artifactId>spring-boot-starter-webartifactId>
			dependency>
			<dependency>
				<groupId>org.springframework.bootgroupId>
				<artifactId>spring-boot-starter-testartifactId>
				<scope>testscope>
				<exclusions>
					<exclusion>
						<groupId>org.junit.vintagegroupId>
						<artifactId>junit-vintage-engineartifactId>
					exclusion>
				exclusions>
			dependency>
			<dependency>
				<groupId>org.springframework.amqpgroupId>
				<artifactId>spring-rabbit-testartifactId>
				<scope>testscope>
			dependency>
		dependencies>

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/716436.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存