部署和运行示例应用程序
将应用程序部署在 Bluemix 中之前,您必须注册。转到 Bluemix 并单击 LOGIN。按照说明注册或管理您的帐户,然后:
安装 Cloud Foundry 客户端。在您的环境中创建一个文件夹,比如 myWS。将 cf 命令 v6 版 下载到您创建的文件夹中。运行下载的文件并按照说明将 Cloud Foundry 客户端安装在同一个文件夹中。
在 Cloud Foundry 文件夹中创建一个文件夹,比如 “samples”。
从 IBM DevOps Services 下载 sessionCacheSamplewar 文件,将它保存在刚创建的文件夹中。备注:此示例应用程序需要一个名为 scsSampleSessionServiceDWDemo001 的 SessionCache 服务。如果您觉得此名称可能导致冲突,请参阅第 2 部分中 构建示例应用程序下的 “添加服务器配置”,将应用程序绑定到一个具有不同名称的服务。
将 WAR 文件部署到 Bluemix。
在命令提示符中,将目录更改为新创建的 samples 文件夹。使用命令 cf api >
之前的博客中我们可以在spring容器中构建 SimpleMessageListenerContainer 来消费消息,我们也可以使用 @RabbitListener 来消费消息。
定义消息处理器, @RabbitListener 注解标记的方法
应用启动类, @EnableRabbit 启用 @RabbitListener
测试:
控制台打印:
如果发送的消息 content_type 的属性是 text ,那么接收的消息处理方法的参数就必须是 String 类型,如果是 byte[] 类型就会报错。
控制台报错
总结
如果消息属性中没有指定 content_type ,则接收消息的处理方法接收类型是 byte[] ,如果消息属性中指定 content_type为text ,则接收消息的处理方法的参数类型是 String 类型。不管有没有指定 content_type ,处理消息方法的参数类型是Message都不会报错。
步骤
具体的消息处理方法的参数是跟 MessageConverter 转换后的java对象有关。
如果想要设置 MessageConverte r,则需要在 RabbitListenerContainerFactory 的实例中去设置,( setMessageConverter 方法)
获取单一个Header的属性,Header还有其他的一些属性,比如 required , defaultvalue 等属性,顾名思义:
配置文件:
启动类:
定义mq中不存在的 Queue , exchange 和 route key
从上面的我们知道声明必须容器中要有 RabbitAdmin 和 RabbitTemplate 实例
应用启动类
测试验证
控制台打印:
说明自动声明的绑定中的队列被自动默认监听。 @RabbitListener 注解中的 bindings 和 queues 参数不能同时指定,否则会报错。
@RabbitListener 可以标注在类上面,当使用在类上面的时候,需要配合 @RabbitHandler 注解一起使用, @RabbitListener 标注在类上面表示当有收到消息的时候,就交给带有 @RabbitHandler 的方法处理,具体找哪个方法处理,需要跟进 MessageConverter 转换后的java对象。
配置:
处理器方法
应用启动类:
发送不包含content_type属性的消息和content_type属性为text的消息,控制台打印:
@RabbitListener 注解的 containerFactory 属性可以指定一个 RabbitListenerContainerFactory 的bean,默认是找名字为 rabbitListenerContainerFactory 的实例。
当我们将 ConsumerConfig 类中的 RabbitListenerContainerFactory 实例的对象名改掉的时候,发现就会报错。
此时控制台上报错,
此时如果配置一下 @RabbitListener 注解的 containerFactory 属性便不会报错。
我们再去改造一下在 RabbitListenerContainerFactory 实例中定义消息类型转换器
User对象:
在处理器中增加参数是User的方法:
下面是RabbitMQ的消息确认机制:“为了确保消息不会丢失,RabbitMQ支持消息确认机制。客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了。假如客户端在发送ack之前意外死掉了,那么RabbitMQ会将消息投递到下一个consumer客户端。如果有多个consumer客户端,RabbitMQ在投递消息时是轮询的。RabbitMQ如何判断客户端死掉了?唯一根据是客户端连接是否断开。这里没有超时机制,也就是说客户端可以处理一个消息很长时间,只要没断开连接,RabbitMQ就一直等待ack消息。”我现在遇到的问题是这样的:我这边有几条线程去消息队列里取数据,但是会有异常数据导致线程挂掉,就是上边的“客户端在发送ack之前意外死掉了”,RabbitMQ会将消息投递到下一个consumer客户端,这样一条异常数据会把我的所有线程挂掉,我现在想实现这样的功能:如果有异常数据导致进程挂掉,那么我不让RabbitMQ将这条消息投递到下一个consumer客户端,而是放到另一个地方或者另外处理,请问该如何实现呢?
配置类MQConfig:
Handler类 MessageHandler , MessageHandler 类中定义的方法也就是上面翻译的目标监听器的处理方法:
启动应用类:
总结
使用 MessageListenerAdapter 处理器进行消息队列监听处理,如果容器没有设置 setDefaultListenerMethod ,则处理器中默认的处理方法名是handleMessage,如果设置了 setDefaultListenerMethod ,则处理器中处理消息的方法名就是 setDefaultListenerMethod 方法参数设置的值。也可以通过 setQueueOrTagToMethodName 方法为不同的队列设置不同的消息处理方法。
我们知道 MessageListenerAdapter 继承 AbstractAdaptableMessageListener 类,实现 MessageListener 和 ChannelAwareMessageListener 接口,而我们知道 MessageListener 和 ChannelAwareMessageListener 接口的 onMessage 方法就是具体容器监听队列处理队列消息的方法。
MessageListenerAdapter 的 onMessage 方法
获取处理消息的方法名
MessageListenerAdapter
1可以把一个没有实现 MessageListener 和 ChannelAwareMessageListener 接口的类适配成一个可以处理消息的处理器
2默认的方法名称为: handleMessage ,可以通过 setDefaultListenerMethod 设置新的消息处理方法
3 MessageListenerAdapter 支持不同的队列交给不同的方法去执行。使用 setQueueOrTagToMethodName 方法设置,当根据queue名称没有找到匹配的方法的时候,就会交给默认的方法去处理。
以上就是关于消息队列原理及选型全部的内容,包括:消息队列原理及选型、RabbitMq系列之三:web管理端、RabbitMQ消费者性能优化相关配置说明等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)