Files
chat_grid/server/tests/test_server_message_handling.py

1112 lines
39 KiB
Python
Raw Normal View History

from __future__ import annotations
import asyncio
import json
from pathlib import Path
from time import monotonic
from typing import cast
import uuid
import pytest
from websockets.asyncio.server import ServerConnection
from app.client import ClientConnection
from app.auth_service import AuthError
from app.server import AUTH_LOGIN_FAILURE_MESSAGE, AUTH_RESUME_FAILURE_MESSAGE, SignalingServer
def _fake_ws() -> ServerConnection:
return cast(ServerConnection, object())
def _packet_types(payloads: list[object]) -> list[str]:
return [getattr(packet, "type", "") for packet in payloads]
@pytest.mark.asyncio
async def test_update_position_rejects_out_of_bounds(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=6)
server.clients[ws] = client
broadcast_payloads: list[object] = []
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
await server._handle_message(client, json.dumps({"type": "update_position", "x": 200, "y": -5}))
assert client.x == 5
assert client.y == 6
assert broadcast_payloads == []
@pytest.mark.asyncio
async def test_radio_metadata_refresh_updates_station_and_title(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=10, y=10)
server.clients[ws] = client
radio = server.item_service.default_item(client, "radio_station")
radio.params["streamUrl"] = "http://example.com/stream"
radio.params["enabled"] = True
radio.params["emitRange"] = 10
radio.params["stationName"] = ""
radio.params["nowPlaying"] = ""
server.item_service.add_item(radio)
async def fake_broadcast_item(item: object) -> None:
return None
def fake_fetch(url: str) -> tuple[str, str]:
assert url == "http://example.com/stream"
return ("Test Station", "Test Song")
monkeypatch.setattr(server, "_broadcast_item", fake_broadcast_item)
monkeypatch.setattr(server, "_fetch_stream_metadata", fake_fetch)
await server._refresh_radio_metadata_once()
assert radio.params["stationName"] == "Test Station"
assert radio.params["nowPlaying"] == "Test Song"
@pytest.mark.asyncio
async def test_radio_metadata_refresh_skips_when_no_listener_in_range(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=0, y=0)
server.clients[ws] = client
radio = server.item_service.default_item(client, "radio_station")
radio.x = 30
radio.y = 30
radio.params["streamUrl"] = "http://example.com/stream"
radio.params["enabled"] = True
radio.params["emitRange"] = 5
server.item_service.add_item(radio)
called = False
def fake_fetch(url: str) -> tuple[str, str]:
nonlocal called
called = True
return ("X", "Y")
monkeypatch.setattr(server, "_fetch_stream_metadata", fake_fetch)
await server._refresh_radio_metadata_once()
assert called is False
@pytest.mark.asyncio
async def test_item_secondary_use_radio_reports_now_playing(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5)
server.clients[ws] = client
radio = server.item_service.default_item(client, "radio_station")
radio.x = 5
radio.y = 5
radio.params["enabled"] = True
radio.params["stationName"] = "Station X"
radio.params["nowPlaying"] = "Song Y"
server.item_service.add_item(radio)
send_payloads: list[object] = []
broadcast_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
await server._handle_message(client, json.dumps({"type": "item_secondary_use", "itemId": radio.id}))
results = [packet for packet in send_payloads if getattr(packet, "type", "") == "item_action_result"]
assert results
assert results[-1].ok is True
assert results[-1].action == "secondary_use"
assert "Playing Song Y from Station X." in results[-1].message
assert broadcast_payloads == []
@pytest.mark.asyncio
async def test_item_secondary_use_missing_handler_returns_message(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5)
server.clients[ws] = client
dice = server.item_service.default_item(client, "dice")
dice.x = 5
dice.y = 5
server.item_service.add_item(dice)
send_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
await server._handle_message(client, json.dumps({"type": "item_secondary_use", "itemId": dice.id}))
results = [packet for packet in send_payloads if getattr(packet, "type", "") == "item_action_result"]
assert results
assert results[-1].ok is False
assert results[-1].action == "secondary_use"
assert "No secondary action" in results[-1].message
def test_clock_alarm_announcement_sequence_shape() -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
params = {"timeZone": "America/Detroit", "use24Hour": False}
alarm_sounds = server._build_clock_announcement_sounds(params, top_of_hour=False, alarm=True)
assert alarm_sounds
assert alarm_sounds[0] == "/sounds/clock/el640/announcement.ogg"
assert alarm_sounds[-1] == "/sounds/clock/el640/alarm.ogg"
top_of_hour_sounds = server._build_clock_announcement_sounds(params, top_of_hour=True, alarm=False)
assert top_of_hour_sounds
assert top_of_hour_sounds[0] == "/sounds/clock/el640/hour1.ogg"
assert top_of_hour_sounds[-1] == "/sounds/clock/el640/hour2.ogg"
@pytest.mark.asyncio
async def test_auth_login_uses_hash_offload(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
username = f"alpha_{uuid.uuid4().hex[:8]}"
server.auth_service.register(username, "password99")
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester")
send_payloads: list[object] = []
offload_calls: list[str] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
return None
async def fake_run_auth_hash_task(func, /, *args, **kwargs):
offload_calls.append(getattr(func, "__name__", "unknown"))
return func(*args, **kwargs)
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
monkeypatch.setattr(server, "_run_auth_hash_task", fake_run_auth_hash_task)
await server._handle_message(
client,
json.dumps({"type": "auth_login", "username": username, "password": "password99"}),
)
assert "login" in offload_calls
auth_results = [packet for packet in send_payloads if getattr(packet, "type", "") == "auth_result"]
assert auth_results
assert auth_results[-1].ok is True
@pytest.mark.asyncio
async def test_auth_rate_limit_blocks_before_hash(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester")
send_payloads: list[object] = []
called_login = False
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
def fake_login(username: str, password: str): # pragma: no cover - should never run
nonlocal called_login
called_login = True
raise RuntimeError("unexpected login call")
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_sleep_auth_failure_jitter", lambda: asyncio.sleep(0))
monkeypatch.setattr(server.auth_service, "login", fake_login)
monkeypatch.setattr(server, "_is_auth_rate_limited", lambda _client, _packet: True)
await server._handle_message(client, json.dumps({"type": "auth_login", "username": "alpha", "password": "wrongpass"}))
assert called_login is False
assert send_payloads
assert send_payloads[-1].ok is False
assert "too many" in send_payloads[-1].message.lower()
@pytest.mark.asyncio
async def test_auth_login_failure_message_is_generic(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester")
send_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_sleep_auth_failure_jitter", lambda: asyncio.sleep(0))
def fake_login(_username: str, _password: str):
raise AuthError("Account is disabled.")
monkeypatch.setattr(server.auth_service, "login", fake_login)
await server._handle_message(client, json.dumps({"type": "auth_login", "username": "alpha", "password": "wrongpass"}))
auth_results = [packet for packet in send_payloads if getattr(packet, "type", "") == "auth_result"]
assert auth_results
assert auth_results[-1].ok is False
assert auth_results[-1].message == AUTH_LOGIN_FAILURE_MESSAGE
@pytest.mark.asyncio
async def test_auth_resume_failure_message_is_generic(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester")
send_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_sleep_auth_failure_jitter", lambda: asyncio.sleep(0))
def fake_resume(_token: str):
raise AuthError("Session has expired.")
monkeypatch.setattr(server.auth_service, "resume", fake_resume)
await server._handle_message(client, json.dumps({"type": "auth_resume", "sessionToken": "expired-token"}))
auth_results = [packet for packet in send_payloads if getattr(packet, "type", "") == "auth_result"]
assert auth_results
assert auth_results[-1].ok is False
assert auth_results[-1].message == AUTH_RESUME_FAILURE_MESSAGE
@pytest.mark.asyncio
async def test_item_drop_rejects_out_of_bounds(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=6)
server.clients[ws] = client
item = server.item_service.default_item(client, "dice")
item.carrierId = client.id
server.item_service.add_item(item)
send_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
await server._handle_message(client, json.dumps({"type": "item_drop", "itemId": item.id, "x": 999, "y": 999}))
assert item.carrierId == client.id
assert send_payloads[-1].ok is False
assert "out of bounds" in send_payloads[-1].message.lower()
@pytest.mark.asyncio
async def test_item_transfer_updates_item_owner(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
owner_ws = _fake_ws()
target_ws = _fake_ws()
owner = ClientConnection(
websocket=owner_ws,
id="u1",
nickname="owner",
authenticated=True,
user_id="1",
username="owner_user",
permissions={"item.transfer.own"},
x=5,
y=6,
)
target = ClientConnection(
websocket=target_ws,
id="u2",
nickname="target",
authenticated=True,
user_id="2",
username="target_user",
permissions=set(),
x=10,
y=10,
)
server.clients[owner_ws] = owner
server.clients[target_ws] = target
item = server.item_service.default_item(owner, "dice")
item.x = owner.x
item.y = owner.y
server.item_service.add_item(item)
send_payloads: list[object] = []
broadcasted_items: list[object] = []
broadcast_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
async def fake_broadcast_item(broadcast_item: object) -> None:
broadcasted_items.append(broadcast_item)
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_broadcast_item", fake_broadcast_item)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
await server._handle_message(owner, json.dumps({"type": "item_transfer", "itemId": item.id, "targetId": target.id}))
assert item.createdBy == target.user_id
assert item.createdByName == target.username
assert broadcasted_items
assert send_payloads
result = send_payloads[-1]
assert result.type == "item_action_result"
assert result.ok is True
assert result.action == "transfer"
assert "you transferred" in result.message.lower()
assert broadcast_payloads
assert getattr(broadcast_payloads[-1], "type", "") == "chat_message"
assert "owner transferred" in getattr(broadcast_payloads[-1], "message", "").lower()
2026-02-28 20:17:49 -05:00
@pytest.mark.asyncio
async def test_item_transfer_allows_self_target_for_transfer_any(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
owner_ws = _fake_ws()
actor_ws = _fake_ws()
owner = ClientConnection(
websocket=owner_ws,
id="u1",
nickname="owner",
authenticated=True,
user_id="1",
username="owner_user",
permissions=set(),
x=5,
y=6,
)
actor = ClientConnection(
websocket=actor_ws,
id="u3",
nickname="actor",
authenticated=True,
user_id="3",
username="actor_user",
permissions={"item.transfer.any"},
x=5,
y=6,
)
server.clients[owner_ws] = owner
server.clients[actor_ws] = actor
item = server.item_service.default_item(owner, "dice")
item.x = actor.x
item.y = actor.y
server.item_service.add_item(item)
send_payloads: list[object] = []
broadcasted_items: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
async def fake_broadcast_item(broadcast_item: object) -> None:
broadcasted_items.append(broadcast_item)
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_broadcast_item", fake_broadcast_item)
await server._handle_message(actor, json.dumps({"type": "item_transfer", "itemId": item.id, "targetId": actor.id}))
assert item.createdBy == actor.user_id
assert item.createdByName == actor.username
assert broadcasted_items
result = send_payloads[-1]
assert result.type == "item_action_result"
assert result.ok is True
assert result.action == "transfer"
@pytest.mark.asyncio
async def test_item_transfer_accepts_offline_target_user_id(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, auth_db_path=tmp_path / "auth.db", grid_size=41)
owner_session = server.auth_service.register("owner_test", "password99")
actor_session = server.auth_service.register("actor_test", "password99")
offline_session = server.auth_service.register("offline_test", "password99")
owner_ws = _fake_ws()
actor_ws = _fake_ws()
owner = ClientConnection(
websocket=owner_ws,
id="u1",
nickname="owner",
authenticated=True,
user_id=owner_session.user.id,
username=owner_session.user.username,
permissions=set(),
x=5,
y=6,
)
actor = ClientConnection(
websocket=actor_ws,
id="u3",
nickname="actor",
authenticated=True,
user_id=actor_session.user.id,
username=actor_session.user.username,
permissions={"item.transfer.any"},
x=5,
y=6,
)
server.clients[owner_ws] = owner
server.clients[actor_ws] = actor
item = server.item_service.default_item(owner, "dice")
item.x = actor.x
item.y = actor.y
server.item_service.add_item(item)
send_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
await server._handle_message(
actor,
json.dumps({"type": "item_transfer", "itemId": item.id, "targetUserId": offline_session.user.id}),
)
assert item.createdBy == offline_session.user.id
assert item.createdByName == offline_session.user.username
result = send_payloads[-1]
assert result.type == "item_action_result"
assert result.ok is True
assert result.action == "transfer"
@pytest.mark.asyncio
async def test_item_transfer_targets_lists_online_and_offline(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, auth_db_path=tmp_path / "auth.db", grid_size=41)
owner_session = server.auth_service.register("owner_menu", "password99")
actor_session = server.auth_service.register("actor_menu", "password99")
online_session = server.auth_service.register("online_menu", "password99")
offline_session = server.auth_service.register("offline_menu", "password99")
owner_ws = _fake_ws()
actor_ws = _fake_ws()
online_ws = _fake_ws()
owner = ClientConnection(
websocket=owner_ws,
id="u1",
nickname="owner",
authenticated=True,
user_id=owner_session.user.id,
username=owner_session.user.username,
permissions=set(),
x=5,
y=6,
)
actor = ClientConnection(
websocket=actor_ws,
id="u3",
nickname="actor",
authenticated=True,
user_id=actor_session.user.id,
username=actor_session.user.username,
permissions={"item.transfer.any"},
x=5,
y=6,
)
online = ClientConnection(
websocket=online_ws,
id="u4",
nickname="online",
authenticated=True,
user_id=online_session.user.id,
username=online_session.user.username,
permissions=set(),
x=10,
y=10,
)
server.clients[owner_ws] = owner
server.clients[actor_ws] = actor
server.clients[online_ws] = online
item = server.item_service.default_item(owner, "dice")
item.x = actor.x
item.y = actor.y
server.item_service.add_item(item)
send_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
await server._handle_message(actor, json.dumps({"type": "item_transfer_targets", "itemId": item.id}))
assert send_payloads
result = send_payloads[-1]
assert result.type == "item_transfer_targets"
usernames = {entry.username for entry in result.targets}
assert owner_session.user.username not in usernames
assert online_session.user.username in usernames
assert offline_session.user.username in usernames
by_username = {entry.username: entry for entry in result.targets}
assert by_username[online_session.user.username].online is True
assert by_username[offline_session.user.username].online is False
@pytest.mark.asyncio
async def test_item_delete_sends_others_notification(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
owner_ws = _fake_ws()
watcher_ws = _fake_ws()
owner = ClientConnection(
websocket=owner_ws,
id="u1",
nickname="owner",
authenticated=True,
user_id="1",
username="owner_user",
permissions={"item.delete.own"},
x=5,
y=6,
)
watcher = ClientConnection(
websocket=watcher_ws,
id="u2",
nickname="watcher",
authenticated=True,
user_id="2",
username="watcher_user",
permissions=set(),
x=5,
y=6,
)
server.clients[owner_ws] = owner
server.clients[watcher_ws] = watcher
item = server.item_service.default_item(owner, "dice")
item.x = owner.x
item.y = owner.y
server.item_service.add_item(item)
send_payloads: list[object] = []
broadcast_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
await server._handle_message(owner, json.dumps({"type": "item_delete", "itemId": item.id}))
result_packets = [packet for packet in send_payloads if getattr(packet, "type", "") == "item_action_result"]
assert result_packets
assert result_packets[-1].ok is True
assert "you deleted" in result_packets[-1].message.lower()
chat_packets = [packet for packet in broadcast_payloads if getattr(packet, "type", "") == "chat_message"]
assert chat_packets
assert "owner deleted" in getattr(chat_packets[-1], "message", "").lower()
@pytest.mark.asyncio
async def test_item_transfer_rejects_when_not_authorized(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
owner_ws = _fake_ws()
target_ws = _fake_ws()
owner = ClientConnection(
websocket=owner_ws,
id="u1",
nickname="owner",
authenticated=True,
user_id="1",
username="owner_user",
permissions={"item.use"},
x=5,
y=6,
)
target = ClientConnection(
websocket=target_ws,
id="u2",
nickname="target",
authenticated=True,
user_id="2",
username="target_user",
permissions=set(),
x=10,
y=10,
)
server.clients[owner_ws] = owner
server.clients[target_ws] = target
item = server.item_service.default_item(owner, "dice")
item.x = owner.x
item.y = owner.y
server.item_service.add_item(item)
send_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
await server._handle_message(owner, json.dumps({"type": "item_transfer", "itemId": item.id, "targetId": target.id}))
assert item.createdBy == owner.user_id
assert send_payloads
result = send_payloads[-1]
assert result.type == "item_action_result"
assert result.ok is False
assert result.action == "transfer"
assert "not authorized" in result.message.lower()
@pytest.mark.asyncio
async def test_admin_user_delete_requires_permission(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(
websocket=ws,
id="u1",
nickname="Tester",
authenticated=True,
user_id="1",
username="tester",
permissions={"user.ban_unban"},
)
server.clients[ws] = client
send_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
await server._handle_message(client, json.dumps({"type": "admin_user_delete", "username": "alpha"}))
assert send_payloads
packet = send_payloads[-1]
assert getattr(packet, "type", "") == "admin_action_result"
assert packet.ok is False
assert packet.action == "user_delete"
assert "not authorized" in packet.message.lower()
@pytest.mark.asyncio
async def test_admin_user_delete_calls_auth_service(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(
websocket=ws,
id="u1",
nickname="Tester",
authenticated=True,
user_id="1",
username="tester",
permissions={"account.delete.any"},
)
server.clients[ws] = client
send_payloads: list[object] = []
calls: list[tuple[str, str | None]] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server.auth_service, "get_user_id_by_username", lambda _username: None)
def fake_delete_user(username: str, *, actor_user_id: str | None = None) -> str:
calls.append((username, actor_user_id))
return username
monkeypatch.setattr(server.auth_service, "delete_user", fake_delete_user)
await server._handle_message(client, json.dumps({"type": "admin_user_delete", "username": "alpha"}))
assert calls == [("alpha", "1")]
assert send_payloads
packet = send_payloads[-1]
assert getattr(packet, "type", "") == "admin_action_result"
assert packet.ok is True
assert packet.action == "user_delete"
@pytest.mark.asyncio
async def test_broadcast_fanout_is_concurrent(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws1 = _fake_ws()
ws2 = _fake_ws()
server.clients[ws1] = ClientConnection(websocket=ws1, id="u1")
server.clients[ws2] = ClientConnection(websocket=ws2, id="u2")
send_started_at: dict[ServerConnection, float] = {}
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_started_at[websocket] = monotonic()
if websocket is ws1:
await asyncio.sleep(0.05)
monkeypatch.setattr(server, "_send", fake_send)
await server._broadcast({"type": "noop"})
assert ws1 in send_started_at
assert ws2 in send_started_at
assert abs(send_started_at[ws1] - send_started_at[ws2]) < 0.02
@pytest.mark.asyncio
async def test_item_add_rejects_unknown_type(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=6)
server.clients[ws] = client
send_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
await server._handle_message(client, json.dumps({"type": "item_add", "itemType": "not_a_type"}))
assert send_payloads
assert send_payloads[-1].ok is False
assert "unknown item type" in send_payloads[-1].message.lower()
@pytest.mark.asyncio
async def test_update_position_enforces_cumulative_budget_per_tick(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
server.movement_tick_ms = 100
server.movement_max_steps_per_tick = 2
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5)
server.clients[ws] = client
fixed_now = 10_000
monkeypatch.setattr(server.item_service, "now_ms", lambda: fixed_now)
broadcast_payloads: list[object] = []
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
# First 1-step move in this tick: allowed.
await server._handle_message(client, json.dumps({"type": "update_position", "x": 6, "y": 5}))
# Second 1-step move in the same tick: allowed (budget now exhausted at 2).
await server._handle_message(client, json.dumps({"type": "update_position", "x": 7, "y": 5}))
# Third 1-step move in the same tick: must be rejected.
await server._handle_message(client, json.dumps({"type": "update_position", "x": 8, "y": 5}))
assert client.x == 7
assert client.y == 5
assert len(broadcast_payloads) == 2
@pytest.mark.asyncio
async def test_teleport_complete_broadcasts_spatial_event(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=12, y=13)
server.clients[ws] = client
broadcast_payloads: list[object] = []
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
async def fake_send(websocket: ServerConnection, packet: object) -> None:
return None
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
monkeypatch.setattr(server, "_send", fake_send)
await server._handle_message(client, json.dumps({"type": "teleport_complete", "x": 12, "y": 13}))
assert len(broadcast_payloads) == 2
assert broadcast_payloads[0].type == "update_position"
assert broadcast_payloads[0].id == "u1"
assert broadcast_payloads[0].x == 12
assert broadcast_payloads[0].y == 13
assert broadcast_payloads[1].type == "teleport_complete"
assert broadcast_payloads[1].id == "u1"
assert broadcast_payloads[1].x == 12
assert broadcast_payloads[1].y == 13
@pytest.mark.asyncio
async def test_update_position_rate_reject_sends_self_correction(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5)
server.clients[ws] = client
server.movement_tick_ms = 100
server.movement_max_steps_per_tick = 1
fixed_now = 10_000
monkeypatch.setattr(server.item_service, "now_ms", lambda: fixed_now)
send_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
return None
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
# 2-tile move exceeds per-window budget and should be rejected with correction.
await server._handle_message(client, json.dumps({"type": "update_position", "x": 7, "y": 5}))
assert client.x == 5
assert client.y == 5
assert send_payloads
correction = send_payloads[-1]
assert correction.type == "update_position"
assert correction.id == "u1"
assert correction.x == 5
assert correction.y == 5
@pytest.mark.asyncio
async def test_chat_me_command_broadcasts_action(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="Tester")
server.clients[ws] = client
broadcast_payloads: list[object] = []
send_payloads: list[object] = []
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
monkeypatch.setattr(server, "_send", fake_send)
await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/Me waves hello"}))
assert send_payloads == []
assert len(broadcast_payloads) == 1
packet = broadcast_payloads[0]
assert getattr(packet, "type", "") == "chat_message"
assert packet.action is True
assert packet.system is False
assert packet.message == "Tester waves hello"
@pytest.mark.asyncio
async def test_chat_up_command_sends_sender_only(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="Tester")
server.clients[ws] = client
broadcast_payloads: list[object] = []
send_payloads: list[object] = []
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_format_uptime", lambda: "1h 2m 3s")
await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/UP"}))
assert broadcast_payloads == []
assert len(send_payloads) == 1
packet = send_payloads[0]
assert getattr(packet, "type", "") == "chat_message"
assert packet.system is True
assert packet.message == "Server uptime: 1h 2m 3s"
@pytest.mark.asyncio
async def test_chat_command_requires_leading_slash(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="Tester")
server.clients[ws] = client
broadcast_payloads: list[object] = []
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
await server._handle_message(client, json.dumps({"type": "chat_message", "message": " /up"}))
assert len(broadcast_payloads) == 1
packet = broadcast_payloads[0]
assert getattr(packet, "type", "") == "chat_message"
assert packet.system is False
assert packet.action is False
assert packet.message == " /up"
@pytest.mark.asyncio
async def test_chat_version_command_is_sender_only(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="Tester")
server.clients[ws] = client
server.server_version = "2026.02.27 R293"
broadcast_payloads: list[object] = []
send_payloads: list[object] = []
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
monkeypatch.setattr(server, "_send", fake_send)
await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/version"}))
assert broadcast_payloads == []
assert len(send_payloads) == 1
packet = send_payloads[0]
assert getattr(packet, "type", "") == "chat_message"
assert packet.system is True
assert packet.message == "Server version: 2026.02.27 R293"
@pytest.mark.asyncio
async def test_chat_reboot_requires_permission(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(websocket=ws, id="u1", nickname="Tester", authenticated=True, user_id="1", permissions={"chat.send"})
server.clients[ws] = client
send_payloads: list[object] = []
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_schedule_reboot", lambda _requested_by, _message: True)
await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/reboot patching"}))
assert send_payloads
packet = send_payloads[-1]
assert getattr(packet, "type", "") == "chat_message"
assert packet.system is True
assert "not authorized" in packet.message.lower()
@pytest.mark.asyncio
async def test_chat_reboot_schedules_and_broadcasts_message(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(
websocket=ws,
id="u1",
nickname="Tester",
authenticated=True,
user_id="1",
username="tester",
permissions={"chat.send", "server.allow_reboot"},
)
server.clients[ws] = client
broadcast_payloads: list[object] = []
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
monkeypatch.setattr(server, "_schedule_reboot", lambda requested_by, message: requested_by == "tester" and message == "maintenance")
await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/reboot maintenance"}))
assert len(broadcast_payloads) == 1
packet = broadcast_payloads[0]
assert getattr(packet, "type", "") == "chat_message"
assert packet.system is True
assert packet.message == "Server rebooting in 5 seconds. maintenance"
@pytest.mark.asyncio
async def test_chat_reboot_already_in_progress_sends_sender_only_notice(monkeypatch: pytest.MonkeyPatch) -> None:
server = SignalingServer("127.0.0.1", 8765, None, None)
ws = _fake_ws()
client = ClientConnection(
websocket=ws,
id="u1",
nickname="Tester",
authenticated=True,
user_id="1",
username="tester",
permissions={"chat.send", "server.allow_reboot"},
)
server.clients[ws] = client
broadcast_payloads: list[object] = []
send_payloads: list[object] = []
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
broadcast_payloads.append(packet)
async def fake_send(websocket: ServerConnection, packet: object) -> None:
send_payloads.append(packet)
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
monkeypatch.setattr(server, "_send", fake_send)
monkeypatch.setattr(server, "_schedule_reboot", lambda _requested_by, _message: False)
await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/reboot maintenance"}))
assert broadcast_payloads == []
assert len(send_payloads) == 1
packet = send_payloads[0]
assert getattr(packet, "type", "") == "chat_message"
assert packet.system is True
assert packet.message == "Server reboot already in progress."