yml配置
spring: rabbitmq: host: offline-tech.com port: 30061 username: admin password: admin connection-timeout: 5000ms publisher-/confirm/is: true publisher-returns: true listener: simple: acknowledge-mode: manual retry: enabled: true publisher-/confirm/i-type: correlated
消息生产者
rabitmq 配置文件,初始化队列,exchange,死信队列等
package com.kunchi.auditcenter.facade.mq; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //配置队列信息 @Configuration public class RabbitQueueConfig { //队列过期时间:4小时 private int orderQueueTTL = 4 * 3600 * 1000; @Autowired private ConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } // 配置普通队列 @Bean public Queue approvalCodeQueue() { return QueueBuilder.durable(RabbitConstants.APPROVAL_CODE_QUEUE) .ttl(orderQueueTTL) .deadLetterRoutingKey(RabbitConstants.DEAD_ROUTE_KEY)//设置死信队列的RouteKey .deadLetterExchange(RabbitConstants.DEAD_EXCHANGE)//设置死信队列的Exchange .build(); } @Bean public TopicExchange fanweiTopicExchange() { return new TopicExchange(RabbitConstants.FANWEI_EXCHANGE); } @Bean public Binding orderBinding() { return BindingBuilder.bind(approvalCodeQueue()) .to(fanweiTopicExchange()) .with(RabbitConstants.APPROVAL_CODE_ROUTE_KEY); } //配置死信队列 @Bean public Queue deadQueue() { return new Queue(RabbitConstants.DEAD_QUEUE, true); } @Bean public TopicExchange deadExchange() { return new TopicExchange(RabbitConstants.DEAD_EXCHANGE); } @Bean public Binding deadBinding() { return BindingBuilder.bind(deadQueue()) .to(deadExchange()) .with(RabbitConstants.DEAD_ROUTE_KEY); } }
消息生产者
//定义一个类,发送消息,接收消息,并进行消息确认回调 @Slf4j @Service public class MqProduceServiceImpl implements RabbitTemplate.ReturnCallback, RabbitTemplate./confirm/iCallback, FanweiMqProduceService { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void setCallback() { rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this); } @Override public void sendWorkflowRequest(WorkflowRequestMessage message) { CorrelationData correlationData = new CorrelationData(String.valueOf ( message.getId () )); rabbitTemplate.convertAndSend( RabbitConstants.FANWEI_EXCHANGE, RabbitConstants.APPROVAL_CODE_ROUTE_KEY, message, correlationData); } //Exchange 到Queue投递成功,不回调ReturnCallback @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息丢失, 没有投递成功:"+message.getBody()); String msg = new String(message.getBody()); WorkflowRequestMessage requestMessage = JSON.parseObject (msg, WorkflowRequestMessage.class ); LogMqRequestResumePO log = LogMqRequestResumePO.builder ().type ( FanweiOaConst.LOG_TYPE_SEND ).exchange ( exchange) .queue (routingKey).message ( JSON.toJSonString ( requestMessage )) .error ( replyText).requstId (requestMessage.getRequestId () ).build(); logMqRequestResumeService.save ( log ); } // /confirm/i确认消息投递成功了 //如果没有 Exchange ack=false, // * 如果有 Exchange ack=tru @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息发送成功,发送ack确认,id="+correlationData.getId()); if (ack){ log.info("发送成功"); }else { String id = correlationData.getId (); ApprovalDataRecordOaPO record = approvalDataRecordOaRepositoryService.getById ( Long.valueOf ( id ) ); WorkflowRequestMessage requestMessage = new WorkflowRequestMessage ( record.getApprovalOaId (), record.getCreateBy (), record.getId () ); LogMqRequestResumePO log = LogMqRequestResumePO.builder ().type ( FanweiOaConst.LOG_TYPE_SEND ).exchange ( RabbitConstants.FANWEI_EXCHANGE ) .queue (RabbitConstants.APPROVAL_CODE_QUEUE).message ( JSON.toJSonString ( requestMessage )) .error ( cause).requstId (requestMessage.getRequestId () ).build(); logMqRequestResumeService.save ( log ); } } }
消息消费者
@Slf4j @Service public class FanweiConsumer { @Autowired private FanweiOaService fanweiOaService; @Autowired private LogMqRequestResumeService logMqRequestResumeService; @RabbitListener (queues = RabbitConstants.APPROVAL_CODE_QUEUE ) @RabbitHandler public void approvalCode(Message message, Channel channel) throws IOException { String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey(); String msg = new String(message.getBody()); WorkflowRequestMessage requestMessage = JSON.parseObject (msg, WorkflowRequestMessage.class ); log.info ("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]"); try { log.info("收到消息:{}", msg); //TODO 具体业务 fanweiOaService.getApprovalCodeAndUpdate ( requestMessage ); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.info ( e.getMessage (),e ); requestMessage.setError ( e.getMessage () ); dealFailureMessage ( message, channel, requestMessage); } } private void dealFailureMessage(Message message, Channel channel, WorkflowRequestMessage requestMessage) throws IOException { log.info("消息即将再次返回队列处理..."+requestMessage.getRetryCount ()); if (requestMessage.getRetryCount ()>=FanweiOaConst.MAX_RETRY){ channel.basicReject (message.getMessageProperties().getDeliveryTag(), false); } else { requestMessage.setRetryCount ( requestMessage.getRetryCount ()+1 ); log.info("消息重新发送..."+requestMessage.getRetryCount ()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 重新发送消息到队尾 channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSonBytes(requestMessage)); } } // 监听死信队列 @RabbitListener(queues = RabbitConstants.DEAD_QUEUE) @RabbitHandler public void deadQueueListener(Message message, Channel channel) throws IOException { String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey(); String msg = new String(message.getBody()); System.out.println("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]"); WorkflowRequestMessage requestMessage = JSON.parseObject (msg, WorkflowRequestMessage.class ); LogMqRequestResumePO log = LogMqRequestResumePO.builder ().type ( FanweiOaConst.LOG_TYPE_RECEIVE ).exchange ( RabbitConstants.FANWEI_EXCHANGE ) .queue (RabbitConstants.APPROVAL_CODE_QUEUE).message (JSON.toJSonString ( requestMessage )) .error ( requestMessage.getError() ).requstId (requestMessage.getRequestId () ).build(); logMqRequestResumeService.save ( log ); channel.basicAck(message.getMessageProperties().getDeliveryTag(),true); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)