from __future__ import annotations import asyncio import json from time import monotonic from typing import cast import uuid import pytest from websockets.asyncio.server import ServerConnection from app.client import ClientConnection from app.server import SignalingServer def _fake_ws() -> ServerConnection: return cast(ServerConnection, object()) def _packet_types(payloads: list[object]) -> list[str]: return [getattr(packet, "type", "") for packet in payloads] @pytest.mark.asyncio async def test_update_position_rejects_out_of_bounds(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=6) server.clients[ws] = client broadcast_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) monkeypatch.setattr(server, "_broadcast", fake_broadcast) await server._handle_message(client, json.dumps({"type": "update_position", "x": 200, "y": -5})) assert client.x == 5 assert client.y == 6 assert broadcast_payloads == [] @pytest.mark.asyncio async def test_radio_metadata_refresh_updates_station_and_title(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=10, y=10) server.clients[ws] = client radio = server.item_service.default_item(client, "radio_station") radio.params["streamUrl"] = "http://example.com/stream" radio.params["enabled"] = True radio.params["emitRange"] = 10 radio.params["stationName"] = "" radio.params["nowPlaying"] = "" server.item_service.add_item(radio) async def fake_broadcast_item(item: object) -> None: return None def fake_fetch(url: str) -> tuple[str, str]: assert url == "http://example.com/stream" return ("Test Station", "Test Song") monkeypatch.setattr(server, "_broadcast_item", fake_broadcast_item) monkeypatch.setattr(server, "_fetch_stream_metadata", fake_fetch) await server._refresh_radio_metadata_once() assert radio.params["stationName"] == "Test Station" assert radio.params["nowPlaying"] == "Test Song" @pytest.mark.asyncio async def test_radio_metadata_refresh_skips_when_no_listener_in_range(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=0, y=0) server.clients[ws] = client radio = server.item_service.default_item(client, "radio_station") radio.x = 30 radio.y = 30 radio.params["streamUrl"] = "http://example.com/stream" radio.params["enabled"] = True radio.params["emitRange"] = 5 server.item_service.add_item(radio) called = False def fake_fetch(url: str) -> tuple[str, str]: nonlocal called called = True return ("X", "Y") monkeypatch.setattr(server, "_fetch_stream_metadata", fake_fetch) await server._refresh_radio_metadata_once() assert called is False @pytest.mark.asyncio async def test_item_secondary_use_radio_reports_now_playing(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5) server.clients[ws] = client radio = server.item_service.default_item(client, "radio_station") radio.x = 5 radio.y = 5 radio.params["enabled"] = True radio.params["stationName"] = "Station X" radio.params["nowPlaying"] = "Song Y" server.item_service.add_item(radio) send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: return None monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_broadcast", fake_broadcast) await server._handle_message(client, json.dumps({"type": "item_secondary_use", "itemId": radio.id})) results = [packet for packet in send_payloads if getattr(packet, "type", "") == "item_action_result"] assert results assert results[-1].ok is True assert results[-1].action == "secondary_use" assert "Playing Song Y from Station X." in results[-1].message @pytest.mark.asyncio async def test_item_secondary_use_missing_handler_returns_message(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5) server.clients[ws] = client dice = server.item_service.default_item(client, "dice") dice.x = 5 dice.y = 5 server.item_service.add_item(dice) send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(client, json.dumps({"type": "item_secondary_use", "itemId": dice.id})) results = [packet for packet in send_payloads if getattr(packet, "type", "") == "item_action_result"] assert results assert results[-1].ok is False assert results[-1].action == "secondary_use" assert "No secondary action" in results[-1].message @pytest.mark.asyncio async def test_auth_login_uses_hash_offload(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) username = f"alpha_{uuid.uuid4().hex[:8]}" server.auth_service.register(username, "password99") ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester") send_payloads: list[object] = [] offload_calls: list[str] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: return None async def fake_run_auth_hash_task(func, /, *args, **kwargs): offload_calls.append(getattr(func, "__name__", "unknown")) return func(*args, **kwargs) monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_broadcast", fake_broadcast) monkeypatch.setattr(server, "_run_auth_hash_task", fake_run_auth_hash_task) await server._handle_message( client, json.dumps({"type": "auth_login", "username": username, "password": "password99"}), ) assert "login" in offload_calls auth_results = [packet for packet in send_payloads if getattr(packet, "type", "") == "auth_result"] assert auth_results assert auth_results[-1].ok is True @pytest.mark.asyncio async def test_auth_rate_limit_blocks_before_hash(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester") send_payloads: list[object] = [] called_login = False async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) def fake_login(username: str, password: str): # pragma: no cover - should never run nonlocal called_login called_login = True raise RuntimeError("unexpected login call") monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_sleep_auth_failure_jitter", lambda: asyncio.sleep(0)) monkeypatch.setattr(server.auth_service, "login", fake_login) monkeypatch.setattr(server, "_is_auth_rate_limited", lambda _client, _packet: True) await server._handle_message(client, json.dumps({"type": "auth_login", "username": "alpha", "password": "wrongpass"})) assert called_login is False assert send_payloads assert send_payloads[-1].ok is False assert "too many" in send_payloads[-1].message.lower() @pytest.mark.asyncio async def test_item_drop_rejects_out_of_bounds(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=6) server.clients[ws] = client item = server.item_service.default_item(client, "dice") item.carrierId = client.id server.item_service.add_item(item) send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(client, json.dumps({"type": "item_drop", "itemId": item.id, "x": 999, "y": 999})) assert item.carrierId == client.id assert send_payloads[-1].ok is False assert "out of bounds" in send_payloads[-1].message.lower() @pytest.mark.asyncio async def test_broadcast_fanout_is_concurrent(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws1 = _fake_ws() ws2 = _fake_ws() server.clients[ws1] = ClientConnection(websocket=ws1, id="u1") server.clients[ws2] = ClientConnection(websocket=ws2, id="u2") send_started_at: dict[ServerConnection, float] = {} async def fake_send(websocket: ServerConnection, packet: object) -> None: send_started_at[websocket] = monotonic() if websocket is ws1: await asyncio.sleep(0.05) monkeypatch.setattr(server, "_send", fake_send) await server._broadcast({"type": "noop"}) assert ws1 in send_started_at assert ws2 in send_started_at assert abs(send_started_at[ws1] - send_started_at[ws2]) < 0.02 @pytest.mark.asyncio async def test_item_add_rejects_unknown_type(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=6) server.clients[ws] = client send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(client, json.dumps({"type": "item_add", "itemType": "not_a_type"})) assert send_payloads assert send_payloads[-1].ok is False assert "unknown item type" in send_payloads[-1].message.lower() @pytest.mark.asyncio async def test_update_position_enforces_cumulative_budget_per_tick(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) server.movement_tick_ms = 100 server.movement_max_steps_per_tick = 2 ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5) server.clients[ws] = client fixed_now = 10_000 monkeypatch.setattr(server.item_service, "now_ms", lambda: fixed_now) broadcast_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) monkeypatch.setattr(server, "_broadcast", fake_broadcast) # First 1-step move in this tick: allowed. await server._handle_message(client, json.dumps({"type": "update_position", "x": 6, "y": 5})) # Second 1-step move in the same tick: allowed (budget now exhausted at 2). await server._handle_message(client, json.dumps({"type": "update_position", "x": 7, "y": 5})) # Third 1-step move in the same tick: must be rejected. await server._handle_message(client, json.dumps({"type": "update_position", "x": 8, "y": 5})) assert client.x == 7 assert client.y == 5 assert len(broadcast_payloads) == 2 @pytest.mark.asyncio async def test_teleport_complete_broadcasts_spatial_event(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=12, y=13) server.clients[ws] = client broadcast_payloads: list[object] = [] async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: broadcast_payloads.append(packet) async def fake_send(websocket: ServerConnection, packet: object) -> None: return None monkeypatch.setattr(server, "_broadcast", fake_broadcast) monkeypatch.setattr(server, "_send", fake_send) await server._handle_message(client, json.dumps({"type": "teleport_complete", "x": 12, "y": 13})) assert len(broadcast_payloads) == 2 assert broadcast_payloads[0].type == "update_position" assert broadcast_payloads[0].id == "u1" assert broadcast_payloads[0].x == 12 assert broadcast_payloads[0].y == 13 assert broadcast_payloads[1].type == "teleport_complete" assert broadcast_payloads[1].id == "u1" assert broadcast_payloads[1].x == 12 assert broadcast_payloads[1].y == 13 @pytest.mark.asyncio async def test_update_position_rate_reject_sends_self_correction(monkeypatch: pytest.MonkeyPatch) -> None: server = SignalingServer("127.0.0.1", 8765, None, None, grid_size=41) ws = _fake_ws() client = ClientConnection(websocket=ws, id="u1", nickname="tester", x=5, y=5) server.clients[ws] = client server.movement_tick_ms = 100 server.movement_max_steps_per_tick = 1 fixed_now = 10_000 monkeypatch.setattr(server.item_service, "now_ms", lambda: fixed_now) send_payloads: list[object] = [] async def fake_send(websocket: ServerConnection, packet: object) -> None: send_payloads.append(packet) async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None: return None monkeypatch.setattr(server, "_send", fake_send) monkeypatch.setattr(server, "_broadcast", fake_broadcast) # 2-tile move exceeds per-window budget and should be rejected with correction. await server._handle_message(client, json.dumps({"type": "update_position", "x": 7, "y": 5})) assert client.x == 5 assert client.y == 5 assert send_payloads correction = send_payloads[-1] assert correction.type == "update_position" assert correction.id == "u1" assert correction.x == 5 assert correction.y == 5