如何在python

如何在python,第1张

如何在python

我对此问题的解决方案

rq
仅使用了(并且不再使用
rq_scheduler
):

  1. 升级到最新的python-rq软件包:

    # requirements.txt


    rq==1.1.0

  2. 为轮询作业创建专用队列,并相应地使作业入队(具有

    depends_on
    关系):

    with Connection(redis.from_url(current_app.config['REDIS_URL'])):q = Queue('default')p = Queue('pqueue')job1 = q.enqueue(step1)job2 = p.enqueue(step2, depends_on=job1)  # step2 enqueued in polling queuejob3 = q.enqueue(step3, depends_on=job2)
  3. 派遣专职工作人员进行轮询队列。它继承自标准

    Worker
    类:

    class PWorker(rq.worker.Worker):def execute_job(self, *args, **kwargs):    seconds_between_polls = 65    job = args[0]    if 'lastpoll' in job.meta:        job_timedelta = (datetime.utcnow() - job.meta["lastpoll"]).total_seconds()        if job_timedelta < seconds_between_polls: sleep_period = seconds_between_polls - job_timedelta time.sleep(sleep_period)    job.meta['lastpoll'] = datetime.utcnow()    job.save_meta()    super().execute_job(*args, **kwargs)

PWorker

execute_job
通过向作业的元数据添加时间戳来扩展该方法
'lastpoll'

如果有

lastpoll
时间戳记的轮询作业进入,工作人员将检查此后的时间间隔
lastpoll
是否大于65秒。如果是,它将当前时间写入
'lastpoll'
并执行轮询。如果没有,它将一直hibernate直到65s结束,然后将当前时间写入
'lastpoll'
并执行轮询。没有
lastpoll
时间戳的进来的作业是第一次轮询,而工作人员创建时间戳并执行轮询。

  1. 创建一个专用异常(由task函数抛出)和一个异常处理程序来处理它:
        # exceptions.py    class PACError(Exception):        pass    class PACJobRun(PACError):        pass    class PACJobExit(PACError):        pass        # exception_handlers.py    def poll_exc_handler(job, exc_type, exc_value, traceback):        if exc_type is PACJobRun: requeue_job(job.get_id(), connection=job.connection) return False  # no further exception handling        else: return True  # further exception handling        # tasks.py    def step2():        # GET request to remote compute job portal API for status        # if response == "RUN":        raise PACJobRun        return True

当定制异常处理程序捕获到定制异常(这意味着远程计算作业仍在运行)时,它将在轮询队列中重新排队该作业。

  1. 将定制异常处理程序放入异常处理层次结构中:
        # manage.py    @cli.command('run_pworker')    def run_pworker():        redis_url = app.config['REDIS_URL']        redis_connection = redis.from_url(redis_url)        with rq.connections.Connection(redis_connection): pworker = PWorker(app.config['PQUEUE'], exception_handlers=[poll_exc_handler]) pworker.work()

该解决方案的优点在于,它仅用几行额外的代码即可扩展python-rq的标准功能。另一方面,额外的队列和工作程序增加了复杂性……



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4937340.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-13
下一篇 2022-11-13

发表评论

登录后才能评论

评论列表(0条)

保存