教程:构建一个基本的聊天服务器#

在本教程中,我们将构建一个非常基本的聊天服务器。它将允许任何人向当前连接到服务器的所有其他人发送消息。

本教程旨在作为 Quart 中 WebSockets 的入门介绍。如果你想跳到最后,代码可以在 Github 上找到。

1:创建项目#

我们需要为我们的聊天服务器创建一个项目,我喜欢使用 Poetry 来完成此操作。Poetry 通过 pip(或通过 Brew)安装。

pip install poetry

然后我们可以使用 Poetry 创建一个新的聊天项目

poetry new --src chat

我们的项目现在可以在 chat 目录中开发,所有后续命令都应在运行 chat 目录中运行。

2:添加依赖项#

我们只需要 Quart 来构建这个简单的聊天服务器,我们可以通过运行以下命令将其安装为项目的依赖项

poetry add quart

Poetry 将通过运行以下命令来确保此依赖项存在且路径正确

poetry install

3:创建应用程序#

我们需要一个 Quart 应用程序作为我们的 Web 服务器,它是由以下添加到 src/chat/__init__.py 中的内容创建的

src/chat/__init__.py#
from quart import Quart

app = Quart(__name__)

def run() -> None:
    app.run()

为了使应用程序易于运行,我们可以通过将以下内容添加到 pyproject.toml 中来调用 Poetry 脚本中的 run 方法

pyproject.toml#
[tool.poetry.scripts]
start = "chat:run"

这允许以下命令启动应用程序

poetry run start

4:提供 UI 服务#

当用户访问我们的聊天网站时,我们需要向他们展示一个 UI,他们可以使用该 UI 来输入和接收消息。以下 HTML 模板应添加到 src/chat/templates/index.html

src/chat/templates/index.html#
<script type="text/javascript">
  const ws = new WebSocket(`ws://${location.host}/ws`);

  ws.addEventListener('message', function (event) {
    const li = document.createElement("li");
    li.appendChild(document.createTextNode(event.data));
    document.getElementById("messages").appendChild(li);
  });

  function send(event) {
    const message = (new FormData(event.target)).get("message");
    if (message) {
      ws.send(message);
    }
    event.target.reset();
    return false;
  }
</script>

<div style="display: flex; height: 100%; flex-direction: column">
  <ul id="messages" style="flex-grow: 1; list-style-type: none"></ul>

  <form onsubmit="return send(event)">
    <input type="text" name="message" minlength="1" />
    <button type="submit">Send</button>
  </form>
</div>

这是一个非常基本的 UI,无论是在样式方面,还是在没有对 WebSocket 进行错误处理方面。

我们现在可以通过将以下内容添加到 src/chat/__init__.py 中来为根路径(即 /)提供此模板

from quart import render_template

@app.get("/")
async def index():
    return await render_template("index.html")

5:构建代理#

在我们添加 websocket 路由之前,我们需要能够将消息从一个连接的客户端传递到另一个客户端。为此,我们需要一个消息代理。首先,我们将通过将以下内容添加到 src/chat/broker.py 中来构建我们自己的内存中代理

src/chat/broker.py#
import asyncio
from typing import AsyncGenerator

from quart import Quart

class Broker:
    def __init__(self) -> None:
        self.connections = set()

    async def publish(self, message: str) -> None:
        for connection in self.connections:
            await connection.put(message)

    async def subscribe(self) -> AsyncGenerator[str, None]:
        connection = asyncio.Queue()
        self.connections.add(connection)
        try:
            while True:
                yield await connection.get()
        finally:
            self.connections.remove(connection)

Broker 具有基于发布-订阅模式的接口,客户端预计会向其他客户端发布消息,同时订阅任何发送的消息。

6:实现 websocket#

我们现在可以通过将以下内容添加到 src/chat/__init__.py 中来实现 websocket 路由

src/chat/__init__.py#
import asyncio

from quart import websocket

from chat.broker import Broker

broker = Broker()

async def _receive() -> None:
    while True:
        message = await websocket.receive()
        await broker.publish(message)

@app.websocket("/ws")
async def ws() -> None:
    try:
        task = asyncio.ensure_future(_receive())
        async for message in broker.subscribe():
            await websocket.send(message)
    finally:
        task.cancel()
        await task

为了确保发送和接收同时运行,_receive 协程必须作为单独的任务运行。此外,此任务必须被正确取消并清理。

当用户断开连接时,将引发一个 CancelledError,从而中断 while 循环并触发 finally 块。

7:测试#

为了测试我们的应用程序,我们需要检查通过 websocket 路由发送的消息是否被回显。这可以通过将以下内容添加到 tests/test_chat.py 中来完成

tests/test_chat.py#
import asyncio

from quart.testing.connections import TestWebsocketConnection as _TestWebsocketConnection

from chat import app

async def _receive(test_websocket: _TestWebsocketConnection) -> str:
    return await test_websocket.receive()

async def test_websocket() -> None:
    test_client = app.test_client()
    async with test_client.websocket("/ws") as test_websocket:
        task = asyncio.ensure_future(_receive(test_websocket))
        await test_websocket.send("message")
        result = await task
        assert result == "message"

由于测试是一个异步函数,我们需要通过运行以下命令来安装 pytest-asyncio

poetry add --dev pytest-asyncio

安装完成后,需要通过将以下内容添加到 pyproject.toml 中来配置它

[tool.pytest.ini_options]
asyncio_mode = "auto"

最后,我们可以通过以下命令运行测试

poetry run pytest tests/

如果你在 Quart 示例文件夹中运行此命令,则需要添加 -c pyproject.toml 选项以防止 pytest 使用 Quart pytest 配置。

8:总结#

到目前为止,我们构建的消息代理仅在内存中工作,这意味着消息只能与连接到同一服务器实例的用户共享。为了在服务器实例之间共享消息,我们需要使用第三方代理,例如通过支持发布/订阅接口的 aioredis 库的 redis。