rabitmq 实践

rabitmq 实践,第1张

rabitmq 实践

yml配置

spring:
  rabbitmq:
    host: offline-tech.com
    port: 30061
    username: admin
    password: admin
    connection-timeout: 5000ms
    publisher-/confirm/is: true
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true
    publisher-/confirm/i-type: correlated

消息生产者

rabitmq 配置文件,初始化队列,exchange,死信队列等

package com.kunchi.auditcenter.facade.mq;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//配置队列信息
@Configuration
public class RabbitQueueConfig {

    //队列过期时间:4小时
    private int orderQueueTTL = 4 * 3600 * 1000;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    // 配置普通队列
    @Bean
    public Queue approvalCodeQueue() {
        return QueueBuilder.durable(RabbitConstants.APPROVAL_CODE_QUEUE)
                .ttl(orderQueueTTL)
                .deadLetterRoutingKey(RabbitConstants.DEAD_ROUTE_KEY)//设置死信队列的RouteKey
                .deadLetterExchange(RabbitConstants.DEAD_EXCHANGE)//设置死信队列的Exchange
                .build();
    }

    @Bean
    public TopicExchange fanweiTopicExchange() {
        return new TopicExchange(RabbitConstants.FANWEI_EXCHANGE);
    }

    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(approvalCodeQueue())
                .to(fanweiTopicExchange())
                .with(RabbitConstants.APPROVAL_CODE_ROUTE_KEY);
    }

    //配置死信队列
    @Bean
    public Queue deadQueue() {
        return new Queue(RabbitConstants.DEAD_QUEUE, true);
    }

    @Bean
    public TopicExchange deadExchange() {
        return new TopicExchange(RabbitConstants.DEAD_EXCHANGE);
    }

    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue())
                .to(deadExchange())
                .with(RabbitConstants.DEAD_ROUTE_KEY);

    }

}

消息生产者



//定义一个类,发送消息,接收消息,并进行消息确认回调
@Slf4j
@Service
public class MqProduceServiceImpl implements  RabbitTemplate.ReturnCallback, RabbitTemplate./confirm/iCallback, FanweiMqProduceService {


    @Autowired
    private RabbitTemplate rabbitTemplate;


    @PostConstruct
    public void setCallback() {
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void sendWorkflowRequest(WorkflowRequestMessage message) {
        CorrelationData correlationData = new CorrelationData(String.valueOf ( message.getId () ));
        rabbitTemplate.convertAndSend( RabbitConstants.FANWEI_EXCHANGE, RabbitConstants.APPROVAL_CODE_ROUTE_KEY, message, correlationData);
    }

    //Exchange 到Queue投递成功,不回调ReturnCallback
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息丢失, 没有投递成功:"+message.getBody());
        String msg = new String(message.getBody());
        WorkflowRequestMessage requestMessage = JSON.parseObject (msg, WorkflowRequestMessage.class );
        LogMqRequestResumePO log = LogMqRequestResumePO.builder ().type ( FanweiOaConst.LOG_TYPE_SEND ).exchange ( exchange)
                .queue (routingKey).message ( JSON.toJSonString ( requestMessage ))
                .error ( replyText).requstId (requestMessage.getRequestId () ).build();
        logMqRequestResumeService.save ( log );
    }




    // /confirm/i确认消息投递成功了
    //如果没有 Exchange ack=false,
    //     * 如果有 Exchange ack=tru
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("消息发送成功,发送ack确认,id="+correlationData.getId());
        if (ack){
            log.info("发送成功");
        }else {
            String id = correlationData.getId ();
            ApprovalDataRecordOaPO record = approvalDataRecordOaRepositoryService.getById ( Long.valueOf ( id ) );
            WorkflowRequestMessage requestMessage = new WorkflowRequestMessage ( record.getApprovalOaId (), record.getCreateBy (), record.getId () );

            LogMqRequestResumePO log = LogMqRequestResumePO.builder ().type ( FanweiOaConst.LOG_TYPE_SEND ).exchange ( RabbitConstants.FANWEI_EXCHANGE )
                    .queue (RabbitConstants.APPROVAL_CODE_QUEUE).message ( JSON.toJSonString ( requestMessage ))
                    .error ( cause).requstId (requestMessage.getRequestId () ).build();
            logMqRequestResumeService.save ( log );
        }
    }

}

消息消费者

@Slf4j
@Service
public class FanweiConsumer {

    @Autowired
    private FanweiOaService fanweiOaService;

    @Autowired
    private LogMqRequestResumeService logMqRequestResumeService;

    @RabbitListener (queues = RabbitConstants.APPROVAL_CODE_QUEUE )
    @RabbitHandler
    public void approvalCode(Message message, Channel channel) throws IOException {
        String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
        String msg = new String(message.getBody());
        WorkflowRequestMessage requestMessage = JSON.parseObject (msg, WorkflowRequestMessage.class );
        log.info ("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]");
        try {
            log.info("收到消息:{}", msg);
            //TODO 具体业务
           fanweiOaService.getApprovalCodeAndUpdate ( requestMessage );
           channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {
            log.info ( e.getMessage (),e );
            requestMessage.setError ( e.getMessage () );
            dealFailureMessage ( message, channel, requestMessage);
        }

    }

    
    private void dealFailureMessage(Message message, Channel channel, WorkflowRequestMessage requestMessage) throws IOException {
        log.info("消息即将再次返回队列处理..."+requestMessage.getRetryCount ());
        if (requestMessage.getRetryCount ()>=FanweiOaConst.MAX_RETRY){
            channel.basicReject (message.getMessageProperties().getDeliveryTag(), false);
        } else {
            requestMessage.setRetryCount ( requestMessage.getRetryCount ()+1 );
            log.info("消息重新发送..."+requestMessage.getRetryCount ());
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            // 重新发送消息到队尾
            channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSonBytes(requestMessage));
        }
    }


    // 监听死信队列
    @RabbitListener(queues = RabbitConstants.DEAD_QUEUE)
    @RabbitHandler
    public void deadQueueListener(Message message, Channel channel) throws IOException {
        String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
        String msg = new String(message.getBody());
        System.out.println("路由key= [ "+receivedRoutingKey+" ]接收到的消息= [ "+msg +" ]");
        WorkflowRequestMessage requestMessage = JSON.parseObject (msg, WorkflowRequestMessage.class );
        LogMqRequestResumePO log = LogMqRequestResumePO.builder ().type ( FanweiOaConst.LOG_TYPE_RECEIVE ).exchange ( RabbitConstants.FANWEI_EXCHANGE )
                .queue (RabbitConstants.APPROVAL_CODE_QUEUE).message (JSON.toJSonString ( requestMessage ))
                .error ( requestMessage.getError() ).requstId (requestMessage.getRequestId () ).build();
        logMqRequestResumeService.save ( log );
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);

    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5715853.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存