代码之下---RabbitMQ项目实践

代码之下---RabbitMQ项目实践,第1张

代码之下---RabbitMQ项目实践

        各位朋友大家好,今年年初有幸负责公司的一个调研系统的项目,主要背景:为了解决集团内部各种调研活动的开展,其中主要包括满意度调研,组织氛围调研,价值观践行调研及其他通用调研等。今天分享一下RabbitMQ在我们项目上的应用。

遇到的问题

        系统有个功能就是给集团参与调研的人员推送问卷信息,包括各种推送渠道(内部聊天软件,邮件,短信,待办等),刚开始设计是多线程并发处理信息,直到出了一次生产事故,同时博主也是承担巨大的压力下进行了系统重新架构调整。先说一下事故,有次好多个类型的问卷几乎同时发布,导致填写人员太多,系统并发太高,扛不住宕机了,当时发送问卷的线程也随即挂掉,导致有些人员没有收到问卷填写地址,一时间不得不紧急停机维护,而我也受到了领导的关爱哈哈,部门领导和我主管领导坐我身后看我处理数据及架构调整,影响太深刻了。

        总结一下,就是当系统宕机之后,跑的进程随即停止导致的问题。

预解决方案

        1.将需要发送的数据先存储到数据库中,然后每发送一条问卷信息,记录其状态,当系统宕机之后,已发送的数据就会有记录存储,然后跑个定时任务去查询没有处理的数据,再次推送用户。

        2.第二种方案就是将需要发送的数据存储到RabbitMQ中,利用消息队列的特性,手动确认问卷信息发送状态。

实现RabbitMQ解决方案

        本项目采用生产端点对点消息模型,消费端消息手动确认的方式实现交互。

1.添加RabbitMQ相关依赖


     org.springframework.boot
     spring-boot-starter-amqp

2.配置文件

spring:
    rabbitmq:
        host: ip地址
        port: 端口
        username: 用户名
        password: 密码
        virtual-host: 虚拟主机
        listener:
          simple:
            # 手动确认
            acknowledge-mode: manual
            retry:
              enabled: true

3.生产者部分代码

// [2.将消息写入消息队列]
        for (int i = 0; i < releaseEmpList.size(); i++) {
            ReleaseEmpDTO releaseEmpDTO = releaseEmpList.get(i);
            // 写入OA消息队列
            CorrelationData correlationDataOA = new CorrelationData(UUID.randomUUID().toString());
            RabbitmqOARelease rabbitmqOARelease = createRabbitmqOARelease(quBatch, link, releaseEmpDTO);
            if (i == releaseEmpList.size() - 1) {
                rabbitmqOARelease.setFlag(true);
            }
            rabbitTemplate.convertAndSend(CommonConstant.RABBITMQ_QUEUES_OA, rabbitmqOARelease, correlationDataOA);

            // 写入HR消息队列
            CorrelationData correlationDataHR = new CorrelationData(UUID.randomUUID().toString());
            RabbitmqHRRelease rabbitmqHRRelease = createRabbitmqHRRelease(quBatch, link, releaseEmpDTO);
            rabbitTemplate.convertAndSend(CommonConstant.RABBITMQ_QUEUES_HR, rabbitmqHRRelease, correlationDataHR);
        }

4.消费者部分代码

    @RabbitListener(queuesToDeclare = @Queue(value = CommonConstant.RABBITMQ_QUEUES_OA))
    public void oaRelease(RabbitmqOARelease rabbitmqOARelease, Message message, Channel channel) throws IOException {
        if (null == rabbitmqOARelease) {
            return;
        }

        // 给OA发送待办或待阅
        notifyAsyncTask.sendTodoOne(rabbitmqOARelease.getSendContext(), rabbitmqOARelease.getQuNotify());

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //手工ack
        channel.basicAck(deliveryTag, false);

        if (rabbitmqOARelease.getFlag()) {
            // 期次状态改为发布
            QuBatch result = quBatchMapper.selectOne(new QuBatch() {{
                setId(rabbitmqOARelease.getBatchId());
            }});

            if (null != result && result.getStatus().intValue() == CommonConstant.STATUS_PUBLISHING.intValue()) {
                // 发布状态
                result.setStatus(CommonConstant.STATUS_PUBLISH);
                result.setBatchStartTime(new Date());

                quBatchMapper.updateByPrimaryKeySelective(result);
            }
        }

        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5068241.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-16
下一篇 2022-11-16

发表评论

登录后才能评论

评论列表(0条)

保存