消息队列原理及选型

消息队列原理及选型,第1张

 部署和运行示例应用程序

将应用程序部署在 Bluemix 中之前,您必须注册。转到 Bluemix 并单击 LOGIN。按照说明注册或管理您的帐户,然后:

安装 Cloud Foundry 客户端。在您的环境中创建一个文件夹,比如 myWS。将 cf 命令 v6 版 下载到您创建的文件夹中。运行下载的文件并按照说明将 Cloud Foundry 客户端安装在同一个文件夹中。

在 Cloud Foundry 文件夹中创建一个文件夹,比如 “samples”。

从 IBM DevOps Services 下载 sessionCacheSamplewar 文件,将它保存在刚创建的文件夹中。备注:此示例应用程序需要一个名为 scsSampleSessionServiceDWDemo001 的 SessionCache 服务。如果您觉得此名称可能导致冲突,请参阅第 2 部分中 构建示例应用程序下的 “添加服务器配置”,将应用程序绑定到一个具有不同名称的服务。

将 WAR 文件部署到 Bluemix。

在命令提示符中,将目录更改为新创建的 samples 文件夹。使用命令 cf api >

之前的博客中我们可以在spring容器中构建 SimpleMessageListenerContainer 来消费消息,我们也可以使用 @RabbitListener 来消费消息。

定义消息处理器, @RabbitListener 注解标记的方法

应用启动类, @EnableRabbit 启用 @RabbitListener

测试:

控制台打印:

如果发送的消息 content_type 的属性是 text ,那么接收的消息处理方法的参数就必须是 String 类型,如果是 byte[] 类型就会报错。

控制台报错

总结

如果消息属性中没有指定 content_type ,则接收消息的处理方法接收类型是 byte[] ,如果消息属性中指定 content_type为text ,则接收消息的处理方法的参数类型是 String 类型。不管有没有指定 content_type ,处理消息方法的参数类型是Message都不会报错。

步骤

具体的消息处理方法的参数是跟 MessageConverter 转换后的java对象有关。

如果想要设置 MessageConverte r,则需要在 RabbitListenerContainerFactory 的实例中去设置,( setMessageConverter 方法)

获取单一个Header的属性,Header还有其他的一些属性,比如 required , defaultvalue 等属性,顾名思义:

配置文件:

启动类:

定义mq中不存在的 Queue , exchange 和 route key

从上面的我们知道声明必须容器中要有 RabbitAdmin 和 RabbitTemplate 实例

应用启动类

测试验证

控制台打印:

说明自动声明的绑定中的队列被自动默认监听。 @RabbitListener 注解中的 bindings 和 queues 参数不能同时指定,否则会报错。

@RabbitListener 可以标注在类上面,当使用在类上面的时候,需要配合 @RabbitHandler 注解一起使用, @RabbitListener 标注在类上面表示当有收到消息的时候,就交给带有 @RabbitHandler 的方法处理,具体找哪个方法处理,需要跟进 MessageConverter 转换后的java对象。

配置:

处理器方法

应用启动类:

发送不包含content_type属性的消息和content_type属性为text的消息,控制台打印:

@RabbitListener 注解的 containerFactory 属性可以指定一个 RabbitListenerContainerFactory 的bean,默认是找名字为 rabbitListenerContainerFactory 的实例。

当我们将 ConsumerConfig 类中的 RabbitListenerContainerFactory 实例的对象名改掉的时候,发现就会报错。

此时控制台上报错,

此时如果配置一下 @RabbitListener 注解的 containerFactory 属性便不会报错。

我们再去改造一下在 RabbitListenerContainerFactory 实例中定义消息类型转换器

User对象:

在处理器中增加参数是User的方法:

下面是RabbitMQ的消息确认机制:“为了确保消息不会丢失,RabbitMQ支持消息确认机制。客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了。假如客户端在发送ack之前意外死掉了,那么RabbitMQ会将消息投递到下一个consumer客户端。如果有多个consumer客户端,RabbitMQ在投递消息时是轮询的。RabbitMQ如何判断客户端死掉了?唯一根据是客户端连接是否断开。这里没有超时机制,也就是说客户端可以处理一个消息很长时间,只要没断开连接,RabbitMQ就一直等待ack消息。”我现在遇到的问题是这样的:我这边有几条线程去消息队列里取数据,但是会有异常数据导致线程挂掉,就是上边的“客户端在发送ack之前意外死掉了”,RabbitMQ会将消息投递到下一个consumer客户端,这样一条异常数据会把我的所有线程挂掉,我现在想实现这样的功能:如果有异常数据导致进程挂掉,那么我不让RabbitMQ将这条消息投递到下一个consumer客户端,而是放到另一个地方或者另外处理,请问该如何实现呢?

配置类MQConfig:

Handler类 MessageHandler , MessageHandler 类中定义的方法也就是上面翻译的目标监听器的处理方法:

启动应用类:

总结

使用 MessageListenerAdapter 处理器进行消息队列监听处理,如果容器没有设置 setDefaultListenerMethod ,则处理器中默认的处理方法名是handleMessage,如果设置了 setDefaultListenerMethod ,则处理器中处理消息的方法名就是 setDefaultListenerMethod 方法参数设置的值。也可以通过 setQueueOrTagToMethodName 方法为不同的队列设置不同的消息处理方法。

我们知道 MessageListenerAdapter 继承 AbstractAdaptableMessageListener 类,实现 MessageListener 和 ChannelAwareMessageListener 接口,而我们知道 MessageListener 和 ChannelAwareMessageListener 接口的 onMessage 方法就是具体容器监听队列处理队列消息的方法。

MessageListenerAdapter 的 onMessage 方法

获取处理消息的方法名

MessageListenerAdapter

1可以把一个没有实现 MessageListener 和 ChannelAwareMessageListener 接口的类适配成一个可以处理消息的处理器

2默认的方法名称为: handleMessage ,可以通过 setDefaultListenerMethod 设置新的消息处理方法

3 MessageListenerAdapter 支持不同的队列交给不同的方法去执行。使用 setQueueOrTagToMethodName 方法设置,当根据queue名称没有找到匹配的方法的时候,就会交给默认的方法去处理。

以上就是关于消息队列原理及选型全部的内容,包括:消息队列原理及选型、RabbitMq系列之三:web管理端、RabbitMQ消费者性能优化相关配置说明等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9320502.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-27
下一篇 2023-04-27

发表评论

登录后才能评论

评论列表(0条)

保存