- exchange:{模块名}.{功能名}queue:{word}.{word}routing key:{word}.{word},例:merge.request,原因:.之间的会被认为是一个单词,便于通过*和#来匹配一个或多个单词
使用MessageConvert自动转换为JSON默认情况下RabbitMQ发送的消息是为字节码,我们采用统一的JSON格式的消息
如果规定了消息的格式为JSON,并使用消息转换器,则会自动将消息转化为JSON格式而不需要每次手动进行转换。RabbitTemplate默认使用SimpleMessageConverter作为自己的消息转化器,而SimpleMessageConverter并不能满足JSON消息的需求。我们可以使用Jackson2JsonMessageConverter作为默认的消息转换器。
特别注意:
生产者和消费者需要使用相同的MessageConvert,消息类型可通过header中的content-type来确认
对于String类型的消息,SimpleMessageConverter和Jackson2JsonMessageConverter的处理策略是不一样的,具体提现为:
content-type:分别是text/plain和application/json
Jackson2JsonMessageConverter会多出两个双引号" "从而导致字符串大变样,示例如下:
生产者代码
String context = "hello "rabbitMQ" "; rabbitTemplate.convertAndSend("hello", context);
当使用SimpleMessageConverter时,rabbitMQ队列中的消息:hello "rabbitMQ"
当使用Jackson2JsonMessageConverter时,rabbitMQ队列中的消息:"hello "rabbitMQ" "
如果想要在发送消息时自定义格式,请使用send而不是conertAndSend方法,接收方也请使用receive而不是receiveAndConvert
为RabbitTemplate配置MessageConverter:
@Configuration public class RabbitConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }配置优化 生产者消息确认机制
spring: rabbitmq: #确认消息已发送到交换机(Exchange) publisher-/confirm/is: true #确认消息已发送到队列(Queue) publisher-returns: true
@Slf4j @Configuration public class RabbitConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); // 开启交换机确认 rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() { @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { log.info("=======ConfirmCallback========="); log.info("correlationData = " + correlationData); log.info("ack = " + ack); log.info("cause = " + cause); log.info("=======ConfirmCallback========="); } }); // 开启队列失败确认 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("--------------ReturnCallback----------------"); log.info("message = " + message); log.info("replyCode = " + replyCode); log.info("replyText = " + replyText); log.info("exchange = " + exchange); log.info("routingKey = " + routingKey); log.info("--------------ReturnCallback----------------"); } }); return rabbitTemplate; } }本地重试确认机制
spring: rabbitmq: listener: simple: retry: # 开启本地重试机制,默认3次,避免内存泄漏 enabled: true
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)