import pytz import datetime from apscheduler.schedulers.background import BackgroundScheduler, BlockingScheduler from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED class Scheduler(object): def __init__(self, background=True): jobstores = { } executors = { 'default': ThreadPoolExecutor(50), } job_defaults = { 'coalesce': True, 'max_instances': 1 } timez = pytz.timezone('Asia/Shanghai') if background: self.scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timez) else: self.scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timez) self.jobs = {} self.scheduler.add_listener(self.my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) def add_job(self, func, seconds=0, minutes=0, args=None, job_id=None, max_instances=1): if func is None: return if job_id is None: job_id = func.__name__ self.jobs[job_id] = { 'job_id': job_id, 'job_func': func, 'args': args, 'interval': minutes*60 + seconds } self.scheduler.add_job(func, 'interval', args=args, next_run_time=datetime.datetime.now(), seconds=seconds, minutes=minutes, id=job_id, name=job_id, max_instances=max_instances) def add_cron(self, func, minute='0', hour='*', day='*', month='*', day_of_week='*', args=None, job_id=None, max_instances=1): if func is None: return if job_id is None: job_id = func.__name__ self.jobs[job_id] = { 'job_id': job_id, 'job_func': func, 'args': args, 'hour': hour, 'minute': minute, 'day': day, 'month': month, 'day_of_week': day_of_week } tz = pytz.timezone('Asia/Shanghai') self.scheduler.add_job(func, 'cron', args=args, minute=minute, hour=hour, day=day, month=month, day_of_week=day_of_week, id=job_id, name=job_id, max_instances=max_instances, timezone=tz) def remove_job(self, job_id): self.scheduler.remove_job(job_id) self.jobs.pop(job_id) def get_jobs(self): return self.jobs.values() def get_scheduler_jobs(self): return self.scheduler.get_jobs() def start(self): self.scheduler.start() def stop(self): self.scheduler.shutdown() def remove_all_jobs(self): self.scheduler.remove_all_jobs() def my_listener(self, event): if event.exception: pass def get_scheduler(self): return self.scheduler
class CronJob(): def __init__(self, app): self.scheduler = Scheduler(db_uri) def init(self): try: # 测试任务 一分钟一次 self.scheduler.add_job(test, seconds=0, minutes=1, args=None, job_id="test", max_instances=1) self.app.logger.info('test intervaljob Added!') except Exception as err: self.app.logger.exception(err) def run(self): self.scheduler.start() def test(): print('hello world')
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)