对于许多小型项目来说,芹菜实在是太过分了。对于那些项目,您可以使用schedule,它非常易于使用。
使用此库,您可以使任何函数定期执行任务:
import scheduleimport timedef job(): print("I'm working...")schedule.every(10).minutes.do(job)schedule.every().hour.do(job)schedule.every().day.at("10:30").do(job)schedule.every().monday.do(job)schedule.every().wednesday.at("13:15").do(job)while True: schedule.run_pending() time.sleep(1)
该示例以阻塞方式运行,但是如果您查看FAQ,您会发现您还可以在并行线程中运行任务,以使您不会阻塞,并且一旦不再需要就删除该任务:
from schedule import Schedulerdef run_continuously(self, interval=1): """Continuously run, while executing pending jobs at each elapsed time interval. @return cease_continuous_run: threading.Event which can be set to cease continuous run. Please note that it is *intended behavior that run_continuously() does not run missed jobs*. For example, if you've registered a job that should run every minute and you set a continuous run interval of one hour then your job won't be run 60 times at each interval but only once. """ cease_continuous_run = threading.Event() class ScheduleThread(threading.Thread): @classmethod def run(cls): while not cease_continuous_run.is_set(): self.run_pending() time.sleep(interval) continuous_thread = ScheduleThread() continuous_thread.setDaemon(True) continuous_thread.start() return cease_continuous_runScheduler.run_continuously = run_continuously
这是在类方法中使用的示例:
def foo(self): ... if some_condition():return schedule.CancelJob # a job can dequeue it # can be put in __enter__ or __init__ self._job_stop = self.scheduler.run_continuously() logger.debug("doing foo"...) self.foo() # call foo self.scheduler.every(5).seconds.do( self.foo) # schedule foo for running every 5 seconds ... # later on foo is not needed any more: self._job_stop.set() ... def __exit__(self, exec_type, exc_value, traceback): # if the jobs are not stop, you can stop them self._job_stop.set()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)