流式响应#
Quart 支持将响应流式传输到客户端,而不是一次性接收。如果您有兴趣流式传输请求数据,请参阅 消费请求体,或有关双工流式传输的信息,请参阅 使用 WebSockets。
要流式传输响应,视图函数应返回一个异步生成器,该生成器会生成字节。此生成器可以像往常一样返回状态码和标头。例如,要每秒流式传输时间,
@app.route('/')
async def stream_time():
async def async_generator():
time = datetime.isoformat()
yield time.encode()
return async_generator(), 200, {'X-Something': 'value'}
使用上下文#
如果您想在流式传输时使用 request
上下文,您需要使用 quart.helpers.stream_with_context()
装饰器,
@app.route('/')
async def stream_time():
@stream_with_context
async def async_generator():
time = datetime.isoformat()
yield time.encode()
return async_generator(), 200, {'X-Something': 'value'}
超时#
默认情况下,Quart 会超时响应时间过长的响应,以防止可能的拒绝服务攻击,请参阅 拒绝服务缓解措施。这对于流式响应可能不理想,例如无限流。可以全局禁用超时,但是这可能会使其他路由易受拒绝服务攻击,因此建议将特定响应的超时属性设置为 None
,
from quart import make_response
@app.route('/sse')
async def stream_time():
...
response = await make_response(async_generator())
response.timeout = None # No timeout for this route
return response
测试#
测试客户端 get()
及其关联方法将合并整个流式传输的响应。如果您想测试路由是否实际流式传输了响应,或者测试在客户端断开连接之前一直流式传输的路由,您需要使用 request()
方法,
async def test_stream() -> None:
test_client = app.test_client()
async with test_client.request(..) as connection:
data = await connection.receive()
assert data ...
assert connection.status_code == 200
...
await connection.disconnect() # For infinite streams