自己用Java写了个可视化的定时任务工具,取名sundial(日晷)。定时任务的原理就是spring schedule;分布式锁基于zookeeper实现,客户端采用Netflix开源的Curator。JDK用的17,最新的长期支持版本(LTS),数据库是MySQL,同样是最新的MySQL8。
所谓分布式锁,即独立于整个分布式环境之外的全局且唯一的锁的添加、释放的机制。简单的分布式锁可由数据库实现,比如MySQL。但其性能显然不如基于内存的redis、zookeeper快,且不支持锁超时,公平锁功能。用redis也可以做,redis是基于内存且支持持久化的键值对数据库,6.0版本之后更是支持多线程,加上传统技能IO多路复用及底层采用的跳表等高性能的数据结构,性能更上一层楼。redis加锁本质上就是调用其set命令来对同一key设置键值对,value的话可以用当前线程的线程id,在解锁时对value做校验以避免释放其他线程的锁,再给键值对设置一个过期时间以实现锁超时功能。但有可能到了过期时间,持有锁的线程还没执行完成,这时锁已被释放,其他线程获取了锁,再对同一共享资源进行 *** 作,就会出现bug,而且线程安全问题相关的bug难以排查。可以用Redisson搭配Lua脚本来实现锁超时功能,基于其watchdog,每隔一定时间,默认为30秒。如果某客户端持有锁超过30秒,watchdog就会每隔10秒再把key的过期时间再设为30秒。这样,某线程执行的慢,一直持有锁,其他线程也不会获取到当前线程持有的锁。
而zookeeper实现分布式锁,就是利用其临时有序节点特性。在某一指定目录下创建节点,并判断节点序号是否为当前目录下最小,若是,则视为创建锁成功。否则,就对前一个节点添加一个监听事件。如果锁释放,会通知后一个节点,后一个节点再判断自己序号是否最小,最小就获得锁。这里的逻辑类似JUC里的AQS队列同步器的公平锁模式,AQS里有个双向链表,持有锁的线程在头节点,其他的等着抢锁的线程就在连在后面,持有锁的线程释放锁后会通知后一个节点的线程来竞争锁。
囿于篇幅,下面只介绍主要的类和数据库设计。查看详细代码,请移步GitHub,GitHub - riveryue/sundial,这个分支updateCronActivrImmediately。
使用时需要在任务类上加上自定义注解SundialTask,注解的name属性在整个工程是需保持唯一。
下图为示例,线程休眠5秒是为了模拟真实业务系统中的定时任务,测试分布式环境下同一任务只被执行一次。如果不休眠5秒,就不会存在锁竞争,导致每台服务器上都会执行一遍定时任务,造成一种zook分布式锁不生效的假象。
package sundial; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import sundial.annotation.SundialTask; import java.util.Date; @Component @Slf4j public class Task2 implements SundialExecute { @SundialTask(name = "test2") @Override public void execute() { try { Thread.sleep(5000L); } catch (InterruptedException e) { log.error("error in schedule ", e); } log.info("task2 {}", new Date()); log.info("schedule execute successfully"); } }
下图为上述注解的源码:
package sundial; import org.apache.curator.framework.Curatorframework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import sundial.annotation.SundialTask; import sundial.config.CuratorframeworkConfig; import sundial.constant.TaskStatus; import sundial.dto.TaskConfDTO; import sundial.service.TaskConfService; import sundial.utils.SpringUtils; import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Method; import java.util.concurrent.TimeUnit; public interface SundialExecute extends Runnable { void execute(); @Override default void run() { TaskConfService taskConfService = SpringUtils.getBean(TaskConfService.class); Method[] declaredMethods = this.getClass().getDeclaredMethods(); String annoVal = StringUtils.EMPTY; for (Method declaredMethod : declaredMethods) { boolean annotationPresent = declaredMethod.isAnnotationPresent(SundialTask.class); if (annotationPresent) { SundialTask methodAnno = declaredMethod.getAnnotation(SundialTask.class); annoVal = methodAnno.name(); } } TaskConfDTO taskConfDTO = taskConfService.queryByTaskName(annoVal); if (taskConfDTO != null && taskConfDTO.getStatus().equals(TaskStatus.DISABLE)) { return; } String ZK_LOCK_PATH = "/distributeLock"; Curatorframework client = SpringUtils.getBean(CuratorframeworkConfig.class).curatorframework(); final InterProcessMutex mutex = new InterProcessMutex(client, ZK_LOCK_PATH); try { //获取锁资源 boolean flag = mutex.acquire(1, TimeUnit.SECONDS); if (flag) { execute(); } } catch (Exception e) { } finally { try { mutex.release(); } catch (Exception e) { } } } }
这是数据库表设计:
CREATE database if NOT EXISTS `sundial` default character set utf8mb4 collate utf8mb4_unicode_ci; use `sundial`; SET FOREIGN_KEY_CHECKS=0; -- ---------------------------- -- Table structure for task_conf -- ---------------------------- DROP TABLE IF EXISTS `task_conf`; CREATE TABLE `task_conf` ( `id` int NOT NULL AUTO_INCREMENT, `task_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `status` tinyint DEFAULT NULL COMMENT '1 available, 2 unavailable', `cron` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `uni_task_name` (`task_name`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; -- ---------------------------- -- Records of task_conf -- ---------------------------- INSERT INTO `task_conf` VALUES ('1', 'test1', '1', '*/4 * * * * ?'); INSERT INTO `task_conf` VALUES ('2', 'test2', '1', '*/9 * * * * ?');
下面为前端页面,毕竟不是专业前端,所以做的丑,但菜就是菜,菜就是原罪,以后再改改。
我的MySQL和zookeeper装在本地,然后虚拟机就装个jdk,MySQL客户端,数据库安装在本地物理机,所以虚拟机里没必要装完整的MySQL服务。虚拟机里起三个服务以模拟分布式环境。
启动zookeeper,
虚拟机里启动定时任务服务,只贴了一台机器的截图,
在页面上启用定时任务2,
虚拟机里可以看到只有一台机器打出了任务2的日志,
自此,定时任务只在集群中的一台机器上执行,且执行成功。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)