路线
@app.route('/my-long-function',methods=['POST'])def my_long_function(): param1 = request.form['param1'] param2 = request.form['param2'] task = outsIDe_function.delay(param1,param2) return task.ID
Celery Task – 在后台启动some_python_script.handle
@celery.task(name='outsIDe_function')def outsIDe_function(param1,param2): with app.app_context(): some_python_script.handle(param1,param2)
some_python_script.handle:
def handle(param1,param2): param1 + param2 # many,many different things
理想情况下,我希望能够自我更新芹菜任务,以便我可以轻松地从我的应用程序请求其状态,如下所示:
some_python_script.handle(理想情况下):
def handle(param1,many different things self.outsIDe_function.update_state('PROGRESS',Meta = {'status':'progressing'})
检查进度(理想情况下):
@app.route('/status/<task_ID>')def taskstatus(task_ID): task = outsIDe_function.AsyncResult(task_ID) response = { 'state': task.state,'ID': task.ID,'status' : task.status,} return Jsonify(response)
或类似的东西.非常感谢任何帮助,我对芹菜很新!
解决方法 您应该声明要调用的任务ID.你可以查看 update_state.
以下代码应该有效.
# capture ID of celery taskID = self.request.IDdef handle(param1,many different things # update the state of celery task with direct reference to it self.update_state(task_ID=ID,state='PROGRESS',Meta = {'status':'progressing'})总结
以上是内存溢出为你收集整理的Flask Celery update_state来自另一个函数全部内容,希望文章能够帮你解决Flask Celery update_state来自另一个函数所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)