后台任务#
如果您有一个需要执行的任务,但不需要其结果,您可以使用后台任务来运行它。 后台任务与路由处理程序等并发运行,即在后台运行。 当后台任务包含需要很长时间才能完成的操作时,它们非常有用,因为它们允许在任务本身执行时向客户端发送响应。 同样,某些任务不需要在发送响应之前完成,而可以在后台执行。
Quart 中的后台任务通过 add_background_task
方法创建
async def background_task():
...
@app.route('/jobs/', methods=['POST'])
async def create_job():
app.add_background_task(background_task)
return 'Success'
@app.before_serving
async def startup():
app.add_background_task(background_task)
后台任务将可以访问应用上下文。 在关闭期间将等待这些任务,以确保它们在应用关闭之前完成。 如果您的任务在配置 BACKGROUND_TASK_SHUTDOWN_TIMEOUT
内未完成,它将被取消。
注意 BACKGROUND_TASK_SHUTDOWN_TIMEOUT
理想情况下应该小于任何服务器关闭超时。
支持同步后台任务,它们将在单独的线程中运行。
警告
由于 Quart 基于 asyncio,它将在单个执行上运行,并在任务在等待 I/O 时被阻塞时在它们之间切换。 如果任务不需要等待 I/O,它将阻塞事件循环,并且 Quart 可能会变得无响应。 此外,该任务将消耗与服务器相同的 CPU 资源,因此可能会减慢服务器速度。
测试后台任务#
为了确保后台任务在测试中完成,请使用 test_app
上下文管理器。 这将在允许测试继续之前等待任何后台任务完成。
async def test_tasks_complete():
async with app.test_app():
app.add_background_task(...)
# Background task has completed here
assert task_has_done_something
注意,在测试应用时,test_client
的使用应在 test_app
上下文块内。
可以通过创建应用上下文并等待该函数来测试后台任务协程函数,
async def test_background_task():
async with app.app_context():
await background_task()
assert something_to_test