ThreadPoolTaskScheduler动态添加、移除定时任务

ThreadPoolTaskScheduler动态添加、移除定时任务,第1张

最近遇到一个需求,定时任务的业务逻辑不会改变,但需要动态添加、移除定时任务,而且定时执行时间有可能随时改变,这可怎么实现呢?

首先,配置定时任务线程池;

第二步,建立任务,里面包含了定时任务需要实现的业务逻辑;

第三步,应用定时任务,包括添加、移除;

最后,运行入口程序,打开浏览器进行测试; 通过浏览器分别执行了localhost:8080/index/insert/1000/10、localhost:8080/index/insert/2000/20,也就是添加了两个任务,任务1000每10s执行一次,任务2000每20s执行一次;

执行 http://localhost:8080/index/remove/1000 ,把1000的任务移除掉,再看执行结果,只剩下任务2000,ok,派掘动态添加、移除定时任务编码完成。

当然,这里为了测试,把管理任务的队列直接放到了Controller里,实际应用时应保持全局唯一。

最后总结

通过这个需求,我们又用到了一个类ThreadPoolTaskScheduler,它有别于ThreadPoolTaskExecutor类,有兴趣有时间的可以激蚂查看源码。

动态添加、移除定时任务的 *** 作流程,大致可以分为以下四个步骤:

1.建立一个定时任务线程池;

2.为定时任务线程池建立一个队列,来管理这些任务;

3.根据唯一标尘铅核识,往定时任务线程池和队列里分别添加这个任务;

4.根据唯一标识,从定时任务线程池里取消一个任务,并从队列里移除这个任务。

Java 中线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。合理的梁如使用线程池可以带来多个好处:

(1) 降低资源消耗 。通过重复利用已创建的线程降低线程在创建和销毁时造成的消耗。

(2) 提高响应速度 。当处理执行任务时,任务可以不需要等待线程的创建就能立刻执行。

(3) 提高线程的可管理性 。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

线程池的处理流程如上图所示

线程池中通过 ctl 字段来表示线程池中的当前状态,主池控制状态 ctl 是 AtomicInteger 类型,包装了两个概念字段:workerCount 和 runState,workerCount 表示有效线程数,runState 表示是否正在运行、正在关闭等状态。使用 ctl 字段表示两个概念,ctl 的前 3 位表示线程池状态,线程池中限制 workerCount 为(2^29 )-1(约 5 亿)个线程,而不是 (2^31)-1(20 亿)个线程。workerCount 是允许启动和不允许停止的工作程序的数量。该值可能与实际的活动线程数暂时不同,例如,当 ThreadFactory 在被询问时未能创建线程时,以及退出线程在终止前仍在执行记时。用户可见的池大小报告为工作集的当前大小。 runState 提供主要的生命周期控制,取值如下表所示:

runState 随着时间的推移而改变,在 awaitTermination() 方法中等待的线程将在状态达到 TERMINATED 时返回。状态的转换为:

RUNNING ->SHUTDOWN 在调用 shutdown() 时,可能隐含在 finalize() 中

(RUNNING 或 SHUTDOWN)->STOP 在调用 shutdownNow() 时

SHUTDOWN ->TIDYING 当队列和线程池都为空时

STOP ->TIDYING 当线程池为空时

TIDYING ->TERMINATED 当 terminate() 方法完成时

开发人员如果需要在线程池变为 TIDYING 状态时进行相应的处理,可以通过重载 terminated() 函数来实现。

结合上图说明线程池 ThreadPoolExecutor 执行流程,使用橡腊启 execute() 方法提交任务到线程池中执行时分为4种场景:

(1)线程池中运行的线程数量小于 corePoolSize,创建新线程来执行任务。

(2)线程池中运行线程数量不小于 corePoolSize,将任务加入到阻塞队列 BlockingQueue。

(3)如果无法将任务加入到阻塞队列(队列已满),创建新的线程来处理任务(这里需要获取全局锁)。

(4)当创建新的线程数量使线程池中当前运行线程数量超过 maximumPoolSize,线程池中拒绝任务,调用 RejectedExecutionHandler.rejectedExecution() 方法处理。

源码分析:

线程池创局差建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取工作队列里的任务来执行。

创建线程池之前,首先要知道创建线程池中的核心参数:

corePoolSize (核心线程数大小):当提交任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到需要执行的任务数大于核心线程数时就不再创建。

runnableTaskQueue (任务队列):用于保存等待执行任务的阻塞队列。一般选择以下几种:

ArrayBlockingQueue:基于数组的有界阻塞队列,按照 FIFO 原则对元素进行排序。

LinkedBlockingQueue:基于链表的阻塞队列,按照 FIFO 原则对元素进行排序。

SynchronousQueue:同步阻塞队列,也是不存储元素的阻塞队列。每一个插入 *** 作必须要等到另一个 线程调用移除 *** 作,否则插入 *** 作一直处于阻塞状态。

PriorityBlockingQueue:优先阻塞队列,一个具有优先级的无限阻塞队列。

maximumPoolSize (最大线程数大小):线程池允许创建的最大线程数,当队列已满,并且线程池中的线程数小于最大线程数,则线程池会创建新的线程执行任务。当使用无界队列时,此参数无用。

RejectedExecutionHandler (拒绝策略):当任务队列和线程池都满了,说明线程池处于饱和状态,那么必须使用拒绝策略来处理新提交的任务。JDK 内置拒绝策略有以下 4 种:

AbortPolicy:直接抛出异常

CallerRunsPolicy:使用调用者所在的线程来执行任务

DiscardOldestPolicy:丢弃队列中最近的一个任务来执行当前任务

DiscardPolicy:直接丢弃不处理

可以根据应用场景来实现 RejectedExecutionHandler 接口自定义处理策略。

keepAliveTime (线程存活时间):线程池的工作线程空闲后,保持存活的时间。

TimeUnit (存活时间单位):可选单位DAYS(天)、HOURS(小时)、MINUTES(分钟)、MILLISECONDS(毫秒)、MICROSECONDS(微妙)、NANOSECONDS(纳秒)。

ThreadFactory (线程工厂):可以通过线程工厂给创建出来的线程设置有意义的名字。

创建线程池主要分为两大类,第一种是通过 Executors 工厂类创建线程池,第二种是自定义创建线程池。根据《阿里java开发手册》中的规范,线程池不允许使用 Executors 去创建,原因是规避资源耗尽的风险。

创建一个单线程化的线程池

创建固定线程数的线程池

以上两种创建线程池方式使用链表阻塞队列来存放任务,实际场景中可能会堆积大量请求导致 OOM

创建可缓存线程池

允许创建的线程数量最大为 Integer.MAX_VALUE,当创建大量线程时会导致 CPU 处于重负载状态和 OOM 的发生

向线程池提交任务可以使用两个方法,分别为 execute() 和 submit()。

execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。execute() 方法中传入的是 Runnable 类的实例。

submit() 方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get() 方法来获取返回值。get() 方法会阻塞当前线程直到任务完成,使用 get(long timeout, TimeUnit unit)方法会阻塞当前线程一段时间后立即返回,这时候可能任务没有执行完。

可以通过调用线程池的 shutdown() 或shutdownNow() 方法来关闭线程池。他们的原理是遍历线程池中的工作线程,然后逐个调用 interrupt() 方法来中断线程,所以无法响应中断任务可能永远无法终止。

shutdown() 和 shutdownNow() 方法的区别在于 shutdownNow 方法首先将线程池的状态设置为 STOP,然后尝试停止正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。

线程池使用面临的核心的问题在于: 线程池的参数并不好配置 。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;另一方面,线程池执行的情况和任务类型相关性较大,IO 密集型和 CPU 密集型的任务运行起来的情况差异非常大,这导致业界并没有一些成熟的经验策略帮助开发人员参考。

(1)以任务型为参考的简单评估:

假设线程池大小的设置(N 为 CPU 的个数)

如果纯计算的任务,多线程并不能带来性能提升,因为 CPU 处理能力是稀缺的资源,相反导致较多的线程切换的花销,此时建议线程数为 CPU 数量或+1;----为什么+1?因为可以防止 N 个线程中有一个线程意外中断或者退出,CPU 不会空闲等待。

如果是 IO 密集型应用, 则线程池大小设置为 2N+1. 线程数 = CPU 核数 目标 CPU 利用率 (1 + 平均等待时间 / 平均工作时间)

(2)以任务数为参考的理想状态评估:

1)默认值

2)如何设置 * 需要根据相关值来决定 - tasks :每秒的任务数,假设为500~1000 - taskCost:每个任务花费时间,假设为0.1s - responsetime:系统允许容忍的最大响应时间,假设为1s

以上都为理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器 cpu load 已经满了,则需要通过升级硬件和优化代码,降低 taskCost 来处理。

(仅为简单的理想状态的评估,可作为线程池参数设置的一个参考)

与主业务无直接数据依赖的从业务可以使用异步线程池来处理,在项目初始化时创建线程池并交给将从业务中的任务提交给异步线程池执行能够缩短响应时间。

严禁在业务代码中起线程!!!

当任务需要按照指定顺序(FIFO, LIFO, 优先级)执行时,推荐创建使用单线程化的线程池。

本文章主要说明了线程池的执行原理和创建方式以及推荐线程池参数设置和一般使用场景。在开发中,开发人员需要根据业务来合理的创建和使用线程池达到降低资源消耗,提高响应速度的目的。

原文链接:https://juejin.cn/post/7067324722811240479

Python的线程池可以有效地控制系统中并发线程的数量。

当程序中需要创建许多生存期较短的线程执行运算任务时,首先考虑厅猜数使用线程池。线程池任务启动时会创建出最大线程数参数 max_workers 指定数量扮首的空闲线程,程序只要将执行函数提交给线程池,线程池就会启动一个空闲的线程来兆顷执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。配合使用 with 关键字实现任务队列完成后自动关闭线程池释放资源。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/bake/11991676.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-20
下一篇 2023-05-20

发表评论

登录后才能评论

评论列表(0条)

保存