Python定时任务封装

Python定时任务封装,第1张

Python定时任务封装
import pytz
import datetime
from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED

class Scheduler(object):
    def __init__(self, background=True):
        jobstores = {
        }
        executors = {
            'default': ThreadPoolExecutor(50),
        }
        job_defaults = {
            'coalesce': True,
            'max_instances': 1
        }
        timez = pytz.timezone('Asia/Shanghai')
        if background:
            self.scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors,
                                                 job_defaults=job_defaults, timezone=timez)
        else:
            self.scheduler = BlockingScheduler(jobstores=jobstores, executors=executors,
                                               job_defaults=job_defaults, timezone=timez)
        self.jobs = {}
        self.scheduler.add_listener(self.my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

    def add_job(self, func, seconds=0, minutes=0, args=None, job_id=None, max_instances=1):
        if func is None:
            return
        if job_id is None:
            job_id = func.__name__
        self.jobs[job_id] = {
            'job_id': job_id,
            'job_func': func,
            'args': args,
            'interval': minutes*60 + seconds
        }
        self.scheduler.add_job(func, 'interval', args=args, next_run_time=datetime.datetime.now(),
                               seconds=seconds, minutes=minutes, id=job_id, name=job_id,
                               max_instances=max_instances)

    def add_cron(self, func, minute='0', hour='*', day='*', month='*', day_of_week='*',
                 args=None, job_id=None, max_instances=1):
        if func is None:
            return
        if job_id is None:
            job_id = func.__name__
        self.jobs[job_id] = {
            'job_id': job_id,
            'job_func': func,
            'args': args,
            'hour': hour,
            'minute': minute,
            'day': day,
            'month': month,
            'day_of_week': day_of_week
        }
        tz = pytz.timezone('Asia/Shanghai')
        self.scheduler.add_job(func, 'cron', args=args, minute=minute, hour=hour, day=day,
                               month=month, day_of_week=day_of_week, id=job_id, name=job_id,
                               max_instances=max_instances, timezone=tz)

    def remove_job(self, job_id):
        self.scheduler.remove_job(job_id)
        self.jobs.pop(job_id)

    def get_jobs(self):
        return self.jobs.values()

    def get_scheduler_jobs(self):
        return self.scheduler.get_jobs()

    def start(self):
        self.scheduler.start()

    def stop(self):
        self.scheduler.shutdown()

    def remove_all_jobs(self):
        self.scheduler.remove_all_jobs()

    def my_listener(self, event):
        if event.exception:
            pass

    def get_scheduler(self):
        return self.scheduler

使用方法

class CronJob():
    def __init__(self, app):
        self.scheduler = Scheduler(db_uri)

    def init(self):
        try:
            # 测试任务  一分钟一次
            self.scheduler.add_job(test, seconds=0, minutes=1, args=None, job_id="test", max_instances=1)
            self.app.logger.info('test intervaljob Added!')

        except Exception as err:
            self.app.logger.exception(err)

    def run(self):
        self.scheduler.start()

def test():
	print('hello world')

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/3971819.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-10-21
下一篇 2022-10-21

发表评论

登录后才能评论

评论列表(0条)

保存