基于NetCore的RabbitMQ使用

基于NetCore的RabbitMQ使用,第1张

基于NetCore的RabbitMQ使用

由于最近公司做的项目,需要发短信/邮件/第三方接口异步回调信息等的及时处理,自己就简单的研究了以下RabbitMQ在NetCore中的实现。
RabbitMQ是什么具体就不再这里详细介绍了,自己去百度。本文主要把自己封装的RabbitMQ的使用介绍给大家,并提供相应的Nuget包供大家下载使用,谢谢各位大佬,本人能力有限。

MQ连接配置

在appsettings中加入如下内容:

//消息队列
  "RabbitMQ": {

    "HostName": "127.0.0.1", // 主机名称
    "Port": 5672, // 主机端口
    "UserName": "guest", // 连接账号
    "Password": "guest" // 连接密码

  }

自动读入到RabbitMQOption中

MQ交换机类型

这里通过枚举类ExchangeType我定义了四种交换机类型

生产者

生产者对应的IRabbitMQProducer接口以及其实现RabbitMQProducer已经自动通过单例模式注入,使用的时候你只需要注入即可,如果你想自己实现,可以覆盖我的实现类

生产者异常处理机制

生产者投递消息失败可能有以下几种情况:
1.MQ宕机
2.MQ发送数据到交换机失败
3.MQ发送到交换机成功路由到队列失败

对于第1点:
你可以实现IMQCrashHandle写宕机等问题造成的连接失败的公共处理方式
这里考虑的是采取短信或邮件或其他方式通知人员处理MQ.

对于第2,3点:
第1点的问题就会引发2,3点问题
这里采用了MQ消息投递的/confirm/i异步处理确认模式
这里你可以实现IMQProducerHandle接口对消息投递失败或成功进行相应的处理。

这里你可以在Success方法中对成功的消息做记录等 *** 作,在Failure函数中可以对失败的消息进行自己存入db中等进行处理

消费者

消费者统一一接口IRabbitMQConsumer。

常规的消费者

用于普通的及时发送消息及时处理消息
主要是RabbitMQConsumer类,如果需要使用只需要继承RabbitMQConsumer抽象类即可

消峰消费者

用于对某一有限资源的访问控制等,目前未实现

利用死信队列实现延时处理的消费者

用于对消息进行延迟处理,分为整个管道所有消息同一延时,或每个消息延迟时间不同
延时处理队列可以通过实现RabbitMQDelayDeadLetterConsumer抽象类即可
如果延时消息消费失败,不会有重试机制,需要自己实现IMQConsumerHandle来处理

优先级消费者

目前暂未实现

消费者处理消息结果

在MQAction枚举中定义了消费者处理的枚举。主要有以下四种。

这里注意:对于使用JoinRetryQueue重试策略,次数达到之后消息的处理状态会由RETRY转为FAILURE,如果这时候需要外部接口处理需要实现IMQConsumerHandle中的Failure函数来自己处理消息

根据处理结果可以通过外部接口对消息进行处理

在IMQConsumerHandle接口分别定义了Success Retry Failure Reject分别对应上述的几种消息处理结果
可以针对这几种结果,自己实现该接口对其进行处理,注意一定要处理,因为在消费者内部已经是把该消息消费了

消费者异常处理机制

消费者出现异常,主要是消费失败,消息不能及时从管道中处理。
这里对于失败的消息,主要有以下几种策略进行处理。
在ConsumerRetryPolicy枚举类定义了重试策略

使用例子 初始化MQ

在Net5.0中在Startup.cs ConfigureSevice中加入以下代码

service.InitRabbitMQ(configuration);

在Net6.0直接在Program.cs中加入以下代码

//初始化RabbitMQ
builder.Services.InitRabbitMQ(builder.Configuration);
定义一个消费者

注入消费者发送消息到对应交换机和路由


这里我采用匿名Action来处理失败或成功的消息。

测试结果

RabbitMQ搭建好之后,启动项目即可在MQ界面中看到相应的交换机和队列已经建立成功

消息发送测试

注意消息发送之后都是序列化为字符串了的,需要消费消息需要反序列化才行

发送消息的时候把MQ停止了

可以从下面的截图中看到公共的MQ宕机连接不上的处理接口生效了,以及MQ消息投递失败的Action处理也成功了

正常投递消息并消费成功

正常投递消息模拟消费失败重试

这里我把消息内容从success改为retry表示需要重试

从运行结果可以看到重试了三次之后,状态转为处理失败,这个时候就需要你自己在外部处理消息了,如果外部不处理消息就会丢失

正常投递消息模拟消费直接转失败状态

正常投递消息模拟消费直接转拒绝状态

如果你需要自己实现相应的处理,你可以自己实现以上相应的接口,并配合我之前基于Autofac自动注入类通过Target来动态替换,具体地址为
https://blog.csdn.net/weixin_43872830/article/details/121157725?spm=1001.2014.3001.5502

为消息设置不同的重试时间间隔

这里主要采用延时队列加上死信队列,利用XMessageTTL实现不同时间间隔的重试,过期时间作用于消息上而不是队列上整体设置过期时间

消息的重试次数将由RetryIntervals的集合的长度决定,如果同时设置了RetryCount以及RetryInterval,那么起决定作用的也是RetryIntervals.
运行结果

从结果中我们可以看到10s和20s后分别重试了一次

延迟消息处理例子

消息过期时间相同的实现类

如果需要设置消息不同过期时间,这里的XMessageTTL不需要设置,然后在消息发送的时候设置消息的过期时间即可

启动项目后可以看到相应的交换机和队列已经创建

运行结果:

从结果可以看到10秒之前发送的消息被消费了

从结果可以看到消息进入队列后延时时间不一样,并且不会因为前面消息的过期时间比后面的过期时间长,而阻塞后面消息过期时间之后不能及时出队列

需要注意某个交换机或者队列一旦初始化完成之后,想要在改变交换机或者队列的一些属性就比较困难,比如修改某些属性重新启动程序,会提示mq的错误等,这些需要您自行了解,有一个叫做policy的东西。所以原则上交换机或队列一旦想好怎样设置之后,就不再改动它,特别是已经运行于生产环境中

如果需要使用我封装的RabbitMQ,可以在NuGet上搜索OpenDeepSpace.RabbitMQ进行下载使用与了解

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683219.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存