教程:构建一个基本的聊天服务器#
在本教程中,我们将构建一个非常基本的聊天服务器。它将允许任何人向当前连接到服务器的所有其他人发送消息。
本教程旨在作为 Quart 中 WebSockets 的入门介绍。如果你想跳到最后,代码可以在 Github 上找到。
1:创建项目#
我们需要为我们的聊天服务器创建一个项目,我喜欢使用 Poetry 来完成此操作。Poetry 通过 pip(或通过 Brew)安装。
pip install poetry
然后我们可以使用 Poetry 创建一个新的聊天项目
poetry new --src chat
我们的项目现在可以在 chat 目录中开发,所有后续命令都应在运行 chat 目录中运行。
2:添加依赖项#
我们只需要 Quart 来构建这个简单的聊天服务器,我们可以通过运行以下命令将其安装为项目的依赖项
poetry add quart
Poetry 将通过运行以下命令来确保此依赖项存在且路径正确
poetry install
3:创建应用程序#
我们需要一个 Quart 应用程序作为我们的 Web 服务器,它是由以下添加到 src/chat/__init__.py 中的内容创建的
from quart import Quart
app = Quart(__name__)
def run() -> None:
app.run()
为了使应用程序易于运行,我们可以通过将以下内容添加到 pyproject.toml 中来调用 Poetry 脚本中的 run 方法
[tool.poetry.scripts]
start = "chat:run"
这允许以下命令启动应用程序
poetry run start
4:提供 UI 服务#
当用户访问我们的聊天网站时,我们需要向他们展示一个 UI,他们可以使用该 UI 来输入和接收消息。以下 HTML 模板应添加到 src/chat/templates/index.html 中
<script type="text/javascript">
const ws = new WebSocket(`ws://${location.host}/ws`);
ws.addEventListener('message', function (event) {
const li = document.createElement("li");
li.appendChild(document.createTextNode(event.data));
document.getElementById("messages").appendChild(li);
});
function send(event) {
const message = (new FormData(event.target)).get("message");
if (message) {
ws.send(message);
}
event.target.reset();
return false;
}
</script>
<div style="display: flex; height: 100%; flex-direction: column">
<ul id="messages" style="flex-grow: 1; list-style-type: none"></ul>
<form onsubmit="return send(event)">
<input type="text" name="message" minlength="1" />
<button type="submit">Send</button>
</form>
</div>
这是一个非常基本的 UI,无论是在样式方面,还是在没有对 WebSocket 进行错误处理方面。
我们现在可以通过将以下内容添加到 src/chat/__init__.py 中来为根路径(即 /
)提供此模板
from quart import render_template
@app.get("/")
async def index():
return await render_template("index.html")
5:构建代理#
在我们添加 websocket 路由之前,我们需要能够将消息从一个连接的客户端传递到另一个客户端。为此,我们需要一个消息代理。首先,我们将通过将以下内容添加到 src/chat/broker.py 中来构建我们自己的内存中代理
import asyncio
from typing import AsyncGenerator
from quart import Quart
class Broker:
def __init__(self) -> None:
self.connections = set()
async def publish(self, message: str) -> None:
for connection in self.connections:
await connection.put(message)
async def subscribe(self) -> AsyncGenerator[str, None]:
connection = asyncio.Queue()
self.connections.add(connection)
try:
while True:
yield await connection.get()
finally:
self.connections.remove(connection)
此 Broker
具有基于发布-订阅模式的接口,客户端预计会向其他客户端发布消息,同时订阅任何发送的消息。
6:实现 websocket#
我们现在可以通过将以下内容添加到 src/chat/__init__.py 中来实现 websocket 路由
import asyncio
from quart import websocket
from chat.broker import Broker
broker = Broker()
async def _receive() -> None:
while True:
message = await websocket.receive()
await broker.publish(message)
@app.websocket("/ws")
async def ws() -> None:
try:
task = asyncio.ensure_future(_receive())
async for message in broker.subscribe():
await websocket.send(message)
finally:
task.cancel()
await task
为了确保发送和接收同时运行,_receive
协程必须作为单独的任务运行。此外,此任务必须被正确取消并清理。
当用户断开连接时,将引发一个 CancelledError,从而中断 while 循环并触发 finally 块。
7:测试#
为了测试我们的应用程序,我们需要检查通过 websocket 路由发送的消息是否被回显。这可以通过将以下内容添加到 tests/test_chat.py 中来完成
import asyncio
from quart.testing.connections import TestWebsocketConnection as _TestWebsocketConnection
from chat import app
async def _receive(test_websocket: _TestWebsocketConnection) -> str:
return await test_websocket.receive()
async def test_websocket() -> None:
test_client = app.test_client()
async with test_client.websocket("/ws") as test_websocket:
task = asyncio.ensure_future(_receive(test_websocket))
await test_websocket.send("message")
result = await task
assert result == "message"
由于测试是一个异步函数,我们需要通过运行以下命令来安装 pytest-asyncio
poetry add --dev pytest-asyncio
安装完成后,需要通过将以下内容添加到 pyproject.toml 中来配置它
[tool.pytest.ini_options]
asyncio_mode = "auto"
最后,我们可以通过以下命令运行测试
poetry run pytest tests/
如果你在 Quart 示例文件夹中运行此命令,则需要添加 -c pyproject.toml
选项以防止 pytest 使用 Quart pytest 配置。
8:总结#
到目前为止,我们构建的消息代理仅在内存中工作,这意味着消息只能与连接到同一服务器实例的用户共享。为了在服务器实例之间共享消息,我们需要使用第三方代理,例如通过支持发布/订阅接口的 aioredis 库的 redis。