多节点服务器定时任务重复的一种解决方式

多节点服务器定时任务重复的一种解决方式,第1张

多节点服务器定时任务重复的一种解决方式

由于定时任务和主题代码写在一起,部署在两台服务器上,所以定时任务会执行两次,据说可以用乐观锁解决,所以特地记录一下。

  • 乐观锁和悲观锁
  • 悲观锁:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它释放锁。
  • 乐观锁:总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据。
  • 锁实现
    新建表:
CREATE TABLE `schedule_execute_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `schedule_name` varchar(60) DEFAULT NULL COMMENT '任务名称',
  `version` bigint(20) DEFAULT 0 COMMENT '锁',
  `uu_id` varchar(150) DEFAULT NULL COMMENT '执行标识',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB  DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='定时任务执行信息';

新增数据

假如执行sql不上锁:

select * from schedule_execute_info where schedule_name = 'async';
update schedule_execute_info set uu_id = '000' where schedule_name = 'async';

多人执行修改 *** 作会产生大量脏数据。

  • 使用乐观锁对修改 *** 作上锁:
    可以通过version字段控制版本,再利用行锁的特性实现乐观锁
    假如两个线程同时执行了**select * from schedule_execute_info where schedule_name = ‘async’;** *** 作并拿到相同的数据。其中一条线程修改数据并加上了乐观锁:
update schedule_execute_info set uu_id = '111', version = version + 1 where schedule_name = 'async' and version = 0


此时另一条线程无法修改数据,避免了脏数据的产生。

  • 使用悲观锁:
    使用悲观锁,我们要禁止事务自动提交,改成手动提交,如果是autocommit模式 ,autocommit的值应该为 1 ,不autocommit 的值是 0。
set autocommit=0;
#0.开始事务
#begin;/begin work;/start transaction; (三者选一就可以)
begin;
#1.查
#select * from schedule_execute_info where schedule_name = 'async';
#2.增
INSERT INTO `schedule_execute_info`(`schedule_name`, `version`, `uu_id`, `create_time`, `update_time`) VALUES ('async2', 0, '548f145d-3e61-4206-b4f2-aca19a452954', '2021-11-09 16:09:31', NULL);
#3.改
update schedule_execute_info set uu_id = '222' where schedule_name = 'async'
#4.提交事务
#commit;/commit work;
commit;

select uu_id from schedule_execute_info where id=1 for update;
使用了select…for update的方式,这样就通过数据库实现了悲观锁。
此时在schedule_execute_info表中,id为1的 那条数据就被我们锁定了,
其它的事务必须等本次事务提交之后才能执行。
这样我们可以保证当前的数据不会被其它事务修改。
注:需要注意的是,在事务中,只有SELECT … FOR UPDATE
或LOCK IN SHARE MODE 同一笔数据时会等待其它事务结束后才执行

  • 采用乐观锁的方式去处理多节点问题:
    建表:
CREATE TABLE `schedule_execute_info`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  `schedule_name` varchar(60) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '任务名称',
  `version` bigint(20) NULL DEFAULT 0 COMMENT '锁',
  `uu_id` varchar(150) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '执行标识',
  `execute_time` datetime(0) NULL DEFAULT NULL COMMENT '执行时间',
  `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '定时任务执行信息' ROW_FORMAT = Dynamic;
INSERT INTO `schedule_execute_info` VALUES (1, 'SMS_SEND', 0, '8d5666ca-eefd-4d7c-b1f7-aa2d655b23b1', '2021-11-10 14:36:00', '2021-11-10 09:26:03', '2021-11-10 14:36:00');
INSERT INTO `schedule_execute_info` VALUES (2, 'OAAS_UPDATE', 0, '2a07319f-91ba-414e-85f3-57a672e8efbe', '2021-11-10 14:02:27', '2021-11-10 09:27:42', '2021-11-10 14:02:27');

实现锁代码:如果返回true变执行定时任务,false则不执行

package com.chinaunicom.service.schedule.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.chinaunicom.api.exception.BusinessException;
import com.chinaunicom.api.model.eo.ScheduleExecuteInfo;
import com.chinaunicom.dao.ScheduleExecuteInfoMapper;
import com.chinaunicom.service.schedule.ScheduleExecuteInfoService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Date;
import java.util.Optional;
import java.util.UUID;


@Service
public class ScheduleExecuteInfoServiceImpl extends ServiceImpl implements ScheduleExecuteInfoService {

    @Resource
    ScheduleExecuteInfoMapper scheduleExecuteInfoMapper;

    private static Logger logger = LoggerFactory.getLogger(ScheduleExecuteInfoServiceImpl.class);

    private static final Long INTERVAL = 5 * 1000L;

    @Override
    public boolean canExcute(String scheduleName) {
        Date now = new Date();
        ScheduleExecuteInfo info = Optional.ofNullable(
                scheduleExecuteInfoMapper.selectOne(new LambdaQueryWrapper() {{
                    eq(ScheduleExecuteInfo::getScheduleName, scheduleName);
                }})).orElseThrow(() -> new BusinessException("任务不存在!"));
        if (info.getExecuteTime() != null && Math.abs((now.getTime() - info.getExecuteTime().getTime())) <= INTERVAL) {
            logger.info("task :" + scheduleName + " has been executed by other nodes");
            return false;
        }
        info.setExecuteTime(now);
        info.setUuId(UUID.randomUUID().toString());
        info.setUpdateTime(now);
        int result = scheduleExecuteInfoMapper.updateVersion(info);
        if (result == 0) {
            logger.info("task :" + scheduleName + " has been executed by other nodes");
            return false;
        }
        return true;
    }
}

sql加锁

update
        schedule_execute_info
        set uu_id=#{uuId},execute_time=#{executeTime}, version=version+1,update_time=#{updateTime}
        where id=#{id} and version=#{version}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5433855.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-11
下一篇 2022-12-11

发表评论

登录后才能评论

评论列表(0条)

保存