python轻量级定时任务库cron-lite

python轻量级定时任务库cron-lite,第1张

我在用bottle搭建轻量测试服务的时候,除了api,也需要一个定时任务服务。我希望定时服务和bottle的装饰器风格保持类似,并且支持cron表达式,支持一定的安全机制(例如任务超时时队列不要溢出),然后依赖应该尽可能轻量。
调研一圈发现:

sched是系统自带库,无额外依赖。虽然支持定时任务,但对定时的支持很弱,不支持cron表达式,也不支持循环重入和自动调度

apscheduler支持比较完善,可以做到任务的并发调度和超时规避,也支持cron表达式化的任务。但是这个库比较大,依赖比较重,用起来也不是足够简洁(不像bottle的路由注册那么轻量)

还有不少cron库支持cron表达式,但是却是 *** 作的系统的crontab,对系统有侵入,不满足我定时任务作为业务服务的场景

croniter支持计算cron表达式和下次的调度时间,但是却不支持执行任务

综合上述,我并没有找到特别适合的轻量定时任务框架,所以就利用croniter和sched,自己封装了一个库并上传到了pypi(想上传自己的库,可参考这里,我的另一篇文章)

目前支持python3.6或以上版本,可以用pip安装

pip install cron-lite

使用比较简单,直接装饰就可以把函数变成定时任务执行

from cron_lite import cron_task, start_all
import time


@cron_task("* * * * * 0/2")
def event1():
    print("event1", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))
    time.sleep(3)
    print("event1 done", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))

@cron_task("* * * * * 0/15")
def event2():
    print("event2", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))
    time.sleep(10)
    print("event2 done", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))


start_all()

这样就可以启动一个永久的cron服务,注册了两个函数event1和event2为定时任务,分别每2秒和每15秒一次。因为2秒一次的定时任务自己需要执行3秒,所以每次执行时会错过下一次的执行时间,实际上的执行间隔是4秒。
它支持两种cron表达格式(实际为croniter支持)

5段cron格式:
分 小时 日 月 星期
例如
0 0/12 * * *
会在每天的正午和半夜整点执行任务

6段cron格式:
分 小时 日 月 星期 秒
例如
0/10 * * * * 20,40
会在每整10分钟的20和40秒执行任务

如果想用非阻塞的形式启动,则可以这样使用spawn参数。它还支持重定向log的handler

from cron_lite import start_all, stop_all
t = start_all(spawn=True)

上面代码会启动一个线程来调度这些任务。并且返回调度线程t
之后可以通过

stop_all(t)

来停止任务循环,并等待执行中的任务全部完成,以做到优雅退出。

例如以下代码捕获退出信号,并在程序退出之前会确保定时任务的完成

#!/usr/bin/env python3.6
# encoding: utf-8
from common.logger import logger
from common.crons import start_all, stop_all, cron_task
import signal


@cron_task("* * * * * 0/5")
def event1():
    import time
    print("event1", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))
    time.sleep(4)
    print("event1 done")


def signal_handler(sig, frame):
    # exit gracefully, ensure long term task finished
    stop_all()
    logger.info(f"receive sig={sig}, exiting...")
    for handler in logger.handlers:
        handler.flush()


if __name__ == '__main__':
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)
    start_all(spawn=False, info_handler=logger.info, error_handler=logger.error)

源码很小很简单,只有一个文件不到200行,实际仅依赖一个轻量级三方库croniter。具体可以参考我的github仓库

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/922516.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存