49 lines
1.6 KiB
Python
49 lines
1.6 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import cast
|
|
|
|
import pytest
|
|
from websockets.asyncio.server import ServerConnection
|
|
|
|
from app.server import ClientConnection, SignalingServer
|
|
|
|
|
|
def _fake_ws() -> ServerConnection:
|
|
return cast(ServerConnection, object())
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_item_use_has_global_cooldown(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
server = SignalingServer("127.0.0.1", 8765, None, None)
|
|
ws = _fake_ws()
|
|
client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=6)
|
|
server.clients[ws] = client
|
|
item = server.item_service.default_item(client, "dice")
|
|
server.item_service.add_item(item)
|
|
|
|
send_payloads: list[object] = []
|
|
now_ms = 10_000
|
|
|
|
async def fake_send(websocket: ServerConnection, packet: object) -> None:
|
|
send_payloads.append(packet)
|
|
|
|
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
|
|
return
|
|
|
|
monkeypatch.setattr(server, "_send", fake_send)
|
|
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
|
|
monkeypatch.setattr(server.item_service, "now_ms", lambda: now_ms)
|
|
|
|
await server._handle_message(client, json.dumps({"type": "item_use", "itemId": item.id}))
|
|
assert send_payloads[-1].ok is True
|
|
|
|
now_ms += 400
|
|
await server._handle_message(client, json.dumps({"type": "item_use", "itemId": item.id}))
|
|
assert send_payloads[-1].ok is False
|
|
assert "cooldown" in send_payloads[-1].message.lower()
|
|
|
|
now_ms += 700
|
|
await server._handle_message(client, json.dumps({"type": "item_use", "itemId": item.id}))
|
|
assert send_payloads[-1].ok is True
|