from __future__ import annotations import asyncio import json from pathlib import Path from time import monotonic from typing import cast import uuid import pytest from websockets.asyncio.server import ServerConnection from app.client import ClientConnection from app.auth_service import AuthError from app.server import AUTH_LOGIN_FAILURE_MESSAGE, AUTH_RESUME_FAILURE_MESSAGE, SignalingServer def _fake_ws() -> ServerConnection: return cast(ServerConnection, object()) def _packet_types(payloads: list[object]) -> list[str]: return [getattr(packet, "type", "") for packet in payloads] @pytest.mark.asyncio async def test_update_position_rejects_out_of_bounds(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=6) server.clients[ws] = client broadcast_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) monkeypatch.setattr(server, "_broadcast", fake_broadcast) await server._handle_message(client, json.dumps({"type": "update_position", "x": 200, "y": -5})) assert client.x == 5 assert client.y == 6 assert broadcast_payloads == [] @pytest.mark.asyncio async def test_radio_metadata_refresh_updates_station_and_title(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=10, y=10) server.clients[ws] = client radio = server.item_service.default_item(client, "radio_station") radio.params["streamUrl"] = "http://example.com/stream" radio.params["enabled"] = True radio.params["emitRange"] = 10 radio.params["stationName"] = "" radio.params["nowPlaying"] = "" server.item_service.add_item(radio) async def fake_broadcast_item(item: object) -> None: return None def fake_fetch(url: str) -> tuple[str, str]: assert url == "http://example.com/stream" return ("Test Station", "Test Song") monkeypatch.setattr(server, "_broadcast_item", fake_broadcast_item) monkeypatch.setattr(server, "_fetch_stream_metadata", fake_fetch) await server._refresh_radio_metadata_once() assert radio.params["stationName"] == "Test Station" assert radio.params["nowPlaying"] == "Test Song" @pytest.mark.asyncio async def test_radio_metadata_refresh_skips_when_no_listener_in_range(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=0, y=0) server.clients[ws] = client radio = server.item_service.default_item(client, "radio_station") radio.x = 30 radio.y = 30 radio.params["streamUrl"] = "http://example.com/stream" radio.params["enabled"] = True radio.params["emitRange"] = 5 server.item_service.add_item(radio) called = False def fake_fetch(url: str) -> tuple[str, str]: nonlocal called called = True return ("X", "Y") monkeypatch.setattr(server, "_fetch_stream_metadata", fake_fetch) await server._refresh_radio_metadata_once() assert called is False @pytest.mark.asyncio async def test_item_secondary_use_radio_reports_now_playing(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5) server.clients[ws] = client radio = server.item_service.default_item(client, "radio_station") radio.x = 5 radio.y = 5 radio.params["enabled"] = True radio.params["stationName"] = "Station X" radio.params["nowPlaying"] = "Song Y" server.item_service.add_item(radio) send_payloads: list[object] = [] broadcast_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_broadcast", fake_broadcast) await server._handle_message(client, json.dumps({"type": "item_secondary_use", "itemId": radio.id})) results = [packet for packet in send_payloads if getattr(packet, "type", "") == "item_action_result"] assert results assert results[-1].ok is True assert results[-1].action == "secondary_use" assert "Playing Song Y from Station X." in results[-1].message assert broadcast_payloads == [] @pytest.mark.asyncio async def test_item_secondary_use_missing_handler_returns_message(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5) server.clients[ws] = client dice = server.item_service.default_item(client, "dice") dice.x = 5 dice.y = 5 server.item_service.add_item(dice) send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(client, json.dumps({"type": "item_secondary_use", "itemId": dice.id})) results = [packet for packet in send_payloads if getattr(packet, "type", "") == "item_action_result"] assert results assert results[-1].ok is False assert results[-1].action == "secondary_use" assert "No secondary action" in results[-1].message def test_clock_alarm_announcement_sequence_shape() -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) params = {"timeZone": "America/Detroit", "use24Hour": False} alarm_sounds = server._build_clock_announcement_sounds(params, top_of_hour=False, alarm=True) assert alarm_sounds assert alarm_sounds[0] == "/sounds/clock/el640/announcement.ogg" assert alarm_sounds[-1] == "/sounds/clock/el640/alarm.ogg" top_of_hour_sounds = server._build_clock_announcement_sounds(params, top_of_hour=True, alarm=False) assert top_of_hour_sounds assert top_of_hour_sounds[0] == "/sounds/clock/el640/hour1.ogg" assert top_of_hour_sounds[-1] == "/sounds/clock/el640/hour2.ogg" @pytest.mark.asyncio async def test_auth_login_uses_hash_offload(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) username = f"alpha_{uuid.uuid4().hex[:8]}" server.auth_service.register(username, "password99") ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester") send_payloads: list[object] = [] offload_calls: list[str] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: return None async def fake_run_auth_hash_task(func, /, *args, **kwargs): offload_calls.append(getattr(func, "__name__", "unknown")) return func(*args, **kwargs) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_broadcast", fake_broadcast) monkeypatch.setattr(server, "_run_auth_hash_task", fake_run_auth_hash_task) await server._handle_message( client, json.dumps({"type": "auth_login", "username": username, "password": "password99"}), ) assert "login" in offload_calls auth_results = [packet for packet in send_payloads if getattr(packet, "type", "") == "auth_result"] assert auth_results assert auth_results[-1].ok is True @pytest.mark.asyncio async def test_auth_rate_limit_blocks_before_hash(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester") send_payloads: list[object] = [] called_login = False async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) def fake_login(username: str, password: str): # pragma: no cover - should never run nonlocal called_login called_login = True raise RuntimeError("unexpected login call") monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_sleep_auth_failure_jitter", lambda: asyncio.sleep(0)) monkeypatch.setattr(server.auth_service, "login", fake_login) monkeypatch.setattr(server, "_is_auth_rate_limited", lambda _client, _packet: True) await server._handle_message(client, json.dumps({"type": "auth_login", "username": "alpha", "password": "wrongpass"})) assert called_login is False assert send_payloads assert send_payloads[-1].ok is False assert "too many" in send_payloads[-1].message.lower() @pytest.mark.asyncio async def test_auth_login_failure_message_is_generic(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester") send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_sleep_auth_failure_jitter", lambda: asyncio.sleep(0)) def fake_login(_username: str, _password: str): raise AuthError("Account is disabled.") monkeypatch.setattr(server.auth_service, "login", fake_login) await server._handle_message(client, json.dumps({"type": "auth_login", "username": "alpha", "password": "wrongpass"})) auth_results = [packet for packet in send_payloads if getattr(packet, "type", "") == "auth_result"] assert auth_results assert auth_results[-1].ok is False assert auth_results[-1].message == AUTH_LOGIN_FAILURE_MESSAGE @pytest.mark.asyncio async def test_auth_resume_failure_message_is_generic(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester") send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_sleep_auth_failure_jitter", lambda: asyncio.sleep(0)) def fake_resume(_token: str): raise AuthError("Session has expired.") monkeypatch.setattr(server.auth_service, "resume", fake_resume) await server._handle_message(client, json.dumps({"type": "auth_resume", "sessionToken": "expired-token"})) auth_results = [packet for packet in send_payloads if getattr(packet, "type", "") == "auth_result"] assert auth_results assert auth_results[-1].ok is False assert auth_results[-1].message == AUTH_RESUME_FAILURE_MESSAGE @pytest.mark.asyncio async def test_item_drop_rejects_out_of_bounds(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=6) server.clients[ws] = client item = server.item_service.default_item(client, "dice") item.carrierId = client.id server.item_service.add_item(item) send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(client, json.dumps({"type": "item_drop", "itemId": item.id, "x": 999, "y": 999})) assert item.carrierId == client.id assert send_payloads[-1].ok is False assert "out of bounds" in send_payloads[-1].message.lower() @pytest.mark.asyncio async def test_item_transfer_updates_item_owner(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) owner_ws = _fake_ws() target_ws = _fake_ws() owner = ClientConnection( websocket=owner_ws, id="u1", nickname="owner", authenticated=True, user_id="1", username="owner_user", permissions={"item.transfer.own"}, x=5, y=6, ) target = ClientConnection( websocket=target_ws, id="u2", nickname="target", authenticated=True, user_id="2", username="target_user", permissions=set(), x=10, y=10, ) server.clients[owner_ws] = owner server.clients[target_ws] = target item = server.item_service.default_item(owner, "dice") item.x = owner.x item.y = owner.y server.item_service.add_item(item) send_payloads: list[object] = [] broadcasted_items: list[object] = [] broadcast_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) async def fake_broadcast_item(broadcast_item: object) -> None: broadcasted_items.append(broadcast_item) async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_broadcast_item", fake_broadcast_item) monkeypatch.setattr(server, "_broadcast", fake_broadcast) await server._handle_message( owner, json.dumps({"type": "item_transfer", "itemId": item.id, "targetUserId": target.user_id}), ) assert item.createdBy == target.user_id assert item.createdByName == target.username assert broadcasted_items assert send_payloads result = send_payloads[-1] assert result.type == "item_action_result" assert result.ok is True assert result.action == "transfer" assert "you transferred" in result.message.lower() assert broadcast_payloads assert getattr(broadcast_payloads[-1], "type", "") == "chat_message" assert "owner transferred" in getattr(broadcast_payloads[-1], "message", "").lower() @pytest.mark.asyncio async def test_item_transfer_allows_self_target_for_transfer_any(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) owner_ws = _fake_ws() actor_ws = _fake_ws() owner = ClientConnection( websocket=owner_ws, id="u1", nickname="owner", authenticated=True, user_id="1", username="owner_user", permissions=set(), x=5, y=6, ) actor = ClientConnection( websocket=actor_ws, id="u3", nickname="actor", authenticated=True, user_id="3", username="actor_user", permissions={"item.transfer.any"}, x=5, y=6, ) server.clients[owner_ws] = owner server.clients[actor_ws] = actor item = server.item_service.default_item(owner, "dice") item.x = actor.x item.y = actor.y server.item_service.add_item(item) send_payloads: list[object] = [] broadcasted_items: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) async def fake_broadcast_item(broadcast_item: object) -> None: broadcasted_items.append(broadcast_item) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_broadcast_item", fake_broadcast_item) await server._handle_message( actor, json.dumps({"type": "item_transfer", "itemId": item.id, "targetUserId": actor.user_id}), ) assert item.createdBy == actor.user_id assert item.createdByName == actor.username assert broadcasted_items result = send_payloads[-1] assert result.type == "item_action_result" assert result.ok is True assert result.action == "transfer" @pytest.mark.asyncio async def test_item_transfer_accepts_offline_target_user_id(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, auth_db_path=tmp_path / "auth.db", grid_size=41) owner_session = server.auth_service.register("owner_test", "password99") actor_session = server.auth_service.register("actor_test", "password99") offline_session = server.auth_service.register("offline_test", "password99") owner_ws = _fake_ws() actor_ws = _fake_ws() owner = ClientConnection( websocket=owner_ws, id="u1", nickname="owner", authenticated=True, user_id=owner_session.user.id, username=owner_session.user.username, permissions=set(), x=5, y=6, ) actor = ClientConnection( websocket=actor_ws, id="u3", nickname="actor", authenticated=True, user_id=actor_session.user.id, username=actor_session.user.username, permissions={"item.transfer.any"}, x=5, y=6, ) server.clients[owner_ws] = owner server.clients[actor_ws] = actor item = server.item_service.default_item(owner, "dice") item.x = actor.x item.y = actor.y server.item_service.add_item(item) send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message( actor, json.dumps({"type": "item_transfer", "itemId": item.id, "targetUserId": offline_session.user.id}), ) assert item.createdBy == offline_session.user.id assert item.createdByName == offline_session.user.username result = send_payloads[-1] assert result.type == "item_action_result" assert result.ok is True assert result.action == "transfer" @pytest.mark.asyncio async def test_item_transfer_targets_lists_online_and_offline(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, auth_db_path=tmp_path / "auth.db", grid_size=41) owner_session = server.auth_service.register("owner_menu", "password99") actor_session = server.auth_service.register("actor_menu", "password99") online_session = server.auth_service.register("online_menu", "password99") offline_session = server.auth_service.register("offline_menu", "password99") owner_ws = _fake_ws() actor_ws = _fake_ws() online_ws = _fake_ws() owner = ClientConnection( websocket=owner_ws, id="u1", nickname="owner", authenticated=True, user_id=owner_session.user.id, username=owner_session.user.username, permissions=set(), x=5, y=6, ) actor = ClientConnection( websocket=actor_ws, id="u3", nickname="actor", authenticated=True, user_id=actor_session.user.id, username=actor_session.user.username, permissions={"item.transfer.any"}, x=5, y=6, ) online = ClientConnection( websocket=online_ws, id="u4", nickname="online", authenticated=True, user_id=online_session.user.id, username=online_session.user.username, permissions=set(), x=10, y=10, ) server.clients[owner_ws] = owner server.clients[actor_ws] = actor server.clients[online_ws] = online item = server.item_service.default_item(owner, "dice") item.x = actor.x item.y = actor.y server.item_service.add_item(item) send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(actor, json.dumps({"type": "item_transfer_targets", "itemId": item.id})) assert send_payloads result = send_payloads[-1] assert result.type == "item_transfer_targets" usernames = {entry.username for entry in result.targets} assert owner_session.user.username not in usernames assert online_session.user.username in usernames assert offline_session.user.username in usernames by_username = {entry.username: entry for entry in result.targets} assert by_username[online_session.user.username].online is True assert by_username[offline_session.user.username].online is False @pytest.mark.asyncio async def test_item_delete_sends_others_notification(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) owner_ws = _fake_ws() watcher_ws = _fake_ws() owner = ClientConnection( websocket=owner_ws, id="u1", nickname="owner", authenticated=True, user_id="1", username="owner_user", permissions={"item.delete.own"}, x=5, y=6, ) watcher = ClientConnection( websocket=watcher_ws, id="u2", nickname="watcher", authenticated=True, user_id="2", username="watcher_user", permissions=set(), x=5, y=6, ) server.clients[owner_ws] = owner server.clients[watcher_ws] = watcher item = server.item_service.default_item(owner, "dice") item.x = owner.x item.y = owner.y server.item_service.add_item(item) send_payloads: list[object] = [] broadcast_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_broadcast", fake_broadcast) await server._handle_message(owner, json.dumps({"type": "item_delete", "itemId": item.id})) result_packets = [packet for packet in send_payloads if getattr(packet, "type", "") == "item_action_result"] assert result_packets assert result_packets[-1].ok is True assert "you deleted" in result_packets[-1].message.lower() chat_packets = [packet for packet in broadcast_payloads if getattr(packet, "type", "") == "chat_message"] assert chat_packets assert "owner deleted" in getattr(chat_packets[-1], "message", "").lower() @pytest.mark.asyncio async def test_item_transfer_rejects_when_not_authorized(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) owner_ws = _fake_ws() target_ws = _fake_ws() owner = ClientConnection( websocket=owner_ws, id="u1", nickname="owner", authenticated=True, user_id="1", username="owner_user", permissions={"item.use"}, x=5, y=6, ) target = ClientConnection( websocket=target_ws, id="u2", nickname="target", authenticated=True, user_id="2", username="target_user", permissions=set(), x=10, y=10, ) server.clients[owner_ws] = owner server.clients[target_ws] = target item = server.item_service.default_item(owner, "dice") item.x = owner.x item.y = owner.y server.item_service.add_item(item) send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message( owner, json.dumps({"type": "item_transfer", "itemId": item.id, "targetUserId": target.user_id}), ) assert item.createdBy == owner.user_id assert send_payloads result = send_payloads[-1] assert result.type == "item_action_result" assert result.ok is False assert result.action == "transfer" assert "not authorized" in result.message.lower() @pytest.mark.asyncio async def test_admin_user_delete_requires_permission(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection( websocket=ws, id="u1", nickname="Tester", authenticated=True, user_id="1", username="tester", permissions={"user.ban_unban"}, ) server.clients[ws] = client send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(client, json.dumps({"type": "admin_user_delete", "username": "alpha"})) assert send_payloads packet = send_payloads[-1] assert getattr(packet, "type", "") == "admin_action_result" assert packet.ok is False assert packet.action == "user_delete" assert "not authorized" in packet.message.lower() @pytest.mark.asyncio async def test_admin_user_delete_calls_auth_service(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection( websocket=ws, id="u1", nickname="Tester", authenticated=True, user_id="1", username="tester", permissions={"account.delete.any"}, ) server.clients[ws] = client send_payloads: list[object] = [] calls: list[tuple[str, str | None]] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server.auth_service, "get_user_id_by_username", lambda _username: None) def fake_delete_user(username: str, *, actor_user_id: str | None = None) -> str: calls.append((username, actor_user_id)) return username monkeypatch.setattr(server.auth_service, "delete_user", fake_delete_user) await server._handle_message(client, json.dumps({"type": "admin_user_delete", "username": "alpha"})) assert calls == [("alpha", "1")] assert send_payloads packet = send_payloads[-1] assert getattr(packet, "type", "") == "admin_action_result" assert packet.ok is True assert packet.action == "user_delete" @pytest.mark.asyncio async def test_broadcast_fanout_is_concurrent(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws1 = _fake_ws() ws2 = _fake_ws() server.clients[ws1] = ClientConnection(websocket=ws1, id="u1") server.clients[ws2] = ClientConnection(websocket=ws2, id="u2") send_started_at: dict[ServerConnection, float] = {} async def fake_send(websocket: ServerConnection, packet: object) -> None: send_started_at[websocket] = monotonic() if websocket is ws1: await asyncio.sleep(0.05) monkeypatch.setattr(server, "_send", fake_send) await server._broadcast({"type": "noop"}) assert ws1 in send_started_at assert ws2 in send_started_at assert abs(send_started_at[ws1] - send_started_at[ws2]) < 0.02 @pytest.mark.asyncio async def test_item_add_rejects_unknown_type(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=6) server.clients[ws] = client send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(client, json.dumps({"type": "item_add", "itemType": "not_a_type"})) assert send_payloads assert send_payloads[-1].ok is False assert "unknown item type" in send_payloads[-1].message.lower() @pytest.mark.asyncio async def test_update_position_enforces_cumulative_budget_per_tick(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) server.movement_tick_ms = 100 server.movement_max_steps_per_tick = 2 ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5) server.clients[ws] = client fixed_now = 10_000 monkeypatch.setattr(server.item_service, "now_ms", lambda: fixed_now) broadcast_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) monkeypatch.setattr(server, "_broadcast", fake_broadcast) # First 1-step move in this tick: allowed. await server._handle_message(client, json.dumps({"type": "update_position", "x": 6, "y": 5})) # Second 1-step move in the same tick: allowed (budget now exhausted at 2). await server._handle_message(client, json.dumps({"type": "update_position", "x": 7, "y": 5})) # Third 1-step move in the same tick: must be rejected. await server._handle_message(client, json.dumps({"type": "update_position", "x": 8, "y": 5})) assert client.x == 7 assert client.y == 5 assert len(broadcast_payloads) == 2 @pytest.mark.asyncio async def test_teleport_complete_broadcasts_spatial_event(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=12, y=13) server.clients[ws] = client broadcast_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) async def fake_send(websocket: ServerConnection, packet: object) -> None: return None monkeypatch.setattr(server, "_broadcast", fake_broadcast) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(client, json.dumps({"type": "teleport_complete", "x": 12, "y": 13})) assert len(broadcast_payloads) == 2 assert broadcast_payloads[0].type == "update_position" assert broadcast_payloads[0].id == "u1" assert broadcast_payloads[0].x == 12 assert broadcast_payloads[0].y == 13 assert broadcast_payloads[1].type == "teleport_complete" assert broadcast_payloads[1].id == "u1" assert broadcast_payloads[1].x == 12 assert broadcast_payloads[1].y == 13 @pytest.mark.asyncio async def test_update_position_rate_reject_sends_self_correction(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5) server.clients[ws] = client server.movement_tick_ms = 100 server.movement_max_steps_per_tick = 1 fixed_now = 10_000 monkeypatch.setattr(server.item_service, "now_ms", lambda: fixed_now) send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: return None monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_broadcast", fake_broadcast) # 2-tile move exceeds per-window budget and should be rejected with correction. await server._handle_message(client, json.dumps({"type": "update_position", "x": 7, "y": 5})) assert client.x == 5 assert client.y == 5 assert send_payloads correction = send_payloads[-1] assert correction.type == "update_position" assert correction.id == "u1" assert correction.x == 5 assert correction.y == 5 @pytest.mark.asyncio async def test_chat_me_command_broadcasts_action(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="Tester") server.clients[ws] = client broadcast_payloads: list[object] = [] send_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_broadcast", fake_broadcast) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/Me waves hello"})) assert send_payloads == [] assert len(broadcast_payloads) == 1 packet = broadcast_payloads[0] assert getattr(packet, "type", "") == "chat_message" assert packet.action is True assert packet.system is False assert packet.message == "Tester waves hello" @pytest.mark.asyncio async def test_chat_up_command_sends_sender_only(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="Tester") server.clients[ws] = client broadcast_payloads: list[object] = [] send_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_broadcast", fake_broadcast) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_format_uptime", lambda: "1h 2m 3s") await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/UP"})) assert broadcast_payloads == [] assert len(send_payloads) == 1 packet = send_payloads[0] assert getattr(packet, "type", "") == "chat_message" assert packet.system is True assert packet.message == "Server uptime: 1h 2m 3s" @pytest.mark.asyncio async def test_chat_command_requires_leading_slash(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="Tester") server.clients[ws] = client broadcast_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) monkeypatch.setattr(server, "_broadcast", fake_broadcast) await server._handle_message(client, json.dumps({"type": "chat_message", "message": " /up"})) assert len(broadcast_payloads) == 1 packet = broadcast_payloads[0] assert getattr(packet, "type", "") == "chat_message" assert packet.system is False assert packet.action is False assert packet.message == " /up" @pytest.mark.asyncio async def test_chat_version_command_is_sender_only(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="Tester") server.clients[ws] = client server.server_version = "2026.02.27 R293" broadcast_payloads: list[object] = [] send_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_broadcast", fake_broadcast) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/version"})) assert broadcast_payloads == [] assert len(send_payloads) == 1 packet = send_payloads[0] assert getattr(packet, "type", "") == "chat_message" assert packet.system is True assert packet.message == "Server version: 2026.02.27 R293" @pytest.mark.asyncio async def test_chat_reboot_requires_permission(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="Tester", authenticated=True, user_id="1", permissions={"chat.send"}) server.clients[ws] = client send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_schedule_reboot", lambda _requested_by, _message: True) await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/reboot patching"})) assert send_payloads packet = send_payloads[-1] assert getattr(packet, "type", "") == "chat_message" assert packet.system is True assert "not authorized" in packet.message.lower() @pytest.mark.asyncio async def test_chat_reboot_schedules_and_broadcasts_message(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection( websocket=ws, id="u1", nickname="Tester", authenticated=True, user_id="1", username="tester", permissions={"chat.send", "server.allow_reboot"}, ) server.clients[ws] = client broadcast_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) monkeypatch.setattr(server, "_broadcast", fake_broadcast) monkeypatch.setattr(server, "_schedule_reboot", lambda requested_by, message: requested_by == "tester" and message == "maintenance") await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/reboot maintenance"})) assert len(broadcast_payloads) == 1 packet = broadcast_payloads[0] assert getattr(packet, "type", "") == "chat_message" assert packet.system is True assert packet.message == "Server rebooting in 5 seconds. maintenance" @pytest.mark.asyncio async def test_chat_reboot_already_in_progress_sends_sender_only_notice(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection( websocket=ws, id="u1", nickname="Tester", authenticated=True, user_id="1", username="tester", permissions={"chat.send", "server.allow_reboot"}, ) server.clients[ws] = client broadcast_payloads: list[object] = [] send_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_broadcast", fake_broadcast) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_schedule_reboot", lambda _requested_by, _message: False) await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/reboot maintenance"})) assert broadcast_payloads == [] assert len(send_payloads) == 1 packet = send_payloads[0] assert getattr(packet, "type", "") == "chat_message" assert packet.system is True assert packet.message == "Server reboot already in progress."