各位朋友大家好,今年年初有幸负责公司的一个调研系统的项目,主要背景:为了解决集团内部各种调研活动的开展,其中主要包括满意度调研,组织氛围调研,价值观践行调研及其他通用调研等。今天分享一下RabbitMQ在我们项目上的应用。
遇到的问题系统有个功能就是给集团参与调研的人员推送问卷信息,包括各种推送渠道(内部聊天软件,邮件,短信,待办等),刚开始设计是多线程并发处理信息,直到出了一次生产事故,同时博主也是承担巨大的压力下进行了系统重新架构调整。先说一下事故,有次好多个类型的问卷几乎同时发布,导致填写人员太多,系统并发太高,扛不住宕机了,当时发送问卷的线程也随即挂掉,导致有些人员没有收到问卷填写地址,一时间不得不紧急停机维护,而我也受到了领导的关爱哈哈,部门领导和我主管领导坐我身后看我处理数据及架构调整,影响太深刻了。
总结一下,就是当系统宕机之后,跑的进程随即停止导致的问题。
预解决方案1.将需要发送的数据先存储到数据库中,然后每发送一条问卷信息,记录其状态,当系统宕机之后,已发送的数据就会有记录存储,然后跑个定时任务去查询没有处理的数据,再次推送用户。
2.第二种方案就是将需要发送的数据存储到RabbitMQ中,利用消息队列的特性,手动确认问卷信息发送状态。
实现RabbitMQ解决方案本项目采用生产端点对点消息模型,消费端消息手动确认的方式实现交互。
1.添加RabbitMQ相关依赖
org.springframework.boot spring-boot-starter-amqp
2.配置文件
spring: rabbitmq: host: ip地址 port: 端口 username: 用户名 password: 密码 virtual-host: 虚拟主机 listener: simple: # 手动确认 acknowledge-mode: manual retry: enabled: true
3.生产者部分代码
// [2.将消息写入消息队列] for (int i = 0; i < releaseEmpList.size(); i++) { ReleaseEmpDTO releaseEmpDTO = releaseEmpList.get(i); // 写入OA消息队列 CorrelationData correlationDataOA = new CorrelationData(UUID.randomUUID().toString()); RabbitmqOARelease rabbitmqOARelease = createRabbitmqOARelease(quBatch, link, releaseEmpDTO); if (i == releaseEmpList.size() - 1) { rabbitmqOARelease.setFlag(true); } rabbitTemplate.convertAndSend(CommonConstant.RABBITMQ_QUEUES_OA, rabbitmqOARelease, correlationDataOA); // 写入HR消息队列 CorrelationData correlationDataHR = new CorrelationData(UUID.randomUUID().toString()); RabbitmqHRRelease rabbitmqHRRelease = createRabbitmqHRRelease(quBatch, link, releaseEmpDTO); rabbitTemplate.convertAndSend(CommonConstant.RABBITMQ_QUEUES_HR, rabbitmqHRRelease, correlationDataHR); }
4.消费者部分代码
@RabbitListener(queuesToDeclare = @Queue(value = CommonConstant.RABBITMQ_QUEUES_OA)) public void oaRelease(RabbitmqOARelease rabbitmqOARelease, Message message, Channel channel) throws IOException { if (null == rabbitmqOARelease) { return; } // 给OA发送待办或待阅 notifyAsyncTask.sendTodoOne(rabbitmqOARelease.getSendContext(), rabbitmqOARelease.getQuNotify()); long deliveryTag = message.getMessageProperties().getDeliveryTag(); //手工ack channel.basicAck(deliveryTag, false); if (rabbitmqOARelease.getFlag()) { // 期次状态改为发布 QuBatch result = quBatchMapper.selectOne(new QuBatch() {{ setId(rabbitmqOARelease.getBatchId()); }}); if (null != result && result.getStatus().intValue() == CommonConstant.STATUS_PUBLISHING.intValue()) { // 发布状态 result.setStatus(CommonConstant.STATUS_PUBLISH); result.setBatchStartTime(new Date()); quBatchMapper.updateByPrimaryKeySelective(result); } } try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)