Add reboot and version slash commands with permission guard
This commit is contained in:
@@ -497,3 +497,89 @@ async def test_chat_command_requires_leading_slash(monkeypatch: pytest.MonkeyPat
|
||||
assert packet.system is False
|
||||
assert packet.action is False
|
||||
assert packet.message == " /up"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_chat_version_command_is_sender_only(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
server = SignalingServer("127.0.0.1", 8765, None, None)
|
||||
ws = _fake_ws()
|
||||
client = ClientConnection(websocket=ws, id="u1", nickname="Tester")
|
||||
server.clients[ws] = client
|
||||
server.server_version = "2026.02.27 R293"
|
||||
|
||||
broadcast_payloads: list[object] = []
|
||||
send_payloads: list[object] = []
|
||||
|
||||
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
|
||||
broadcast_payloads.append(packet)
|
||||
|
||||
async def fake_send(websocket: ServerConnection, packet: object) -> None:
|
||||
send_payloads.append(packet)
|
||||
|
||||
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
|
||||
monkeypatch.setattr(server, "_send", fake_send)
|
||||
|
||||
await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/version"}))
|
||||
|
||||
assert broadcast_payloads == []
|
||||
assert len(send_payloads) == 1
|
||||
packet = send_payloads[0]
|
||||
assert getattr(packet, "type", "") == "chat_message"
|
||||
assert packet.system is True
|
||||
assert packet.message == "Server version: 2026.02.27 R293"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_chat_reboot_requires_permission(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
server = SignalingServer("127.0.0.1", 8765, None, None)
|
||||
ws = _fake_ws()
|
||||
client = ClientConnection(websocket=ws, id="u1", nickname="Tester", authenticated=True, user_id="1", permissions={"chat.send"})
|
||||
server.clients[ws] = client
|
||||
|
||||
send_payloads: list[object] = []
|
||||
|
||||
async def fake_send(websocket: ServerConnection, packet: object) -> None:
|
||||
send_payloads.append(packet)
|
||||
|
||||
monkeypatch.setattr(server, "_send", fake_send)
|
||||
monkeypatch.setattr(server, "_schedule_reboot", lambda _requested_by, _message: True)
|
||||
|
||||
await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/reboot patching"}))
|
||||
|
||||
assert send_payloads
|
||||
packet = send_payloads[-1]
|
||||
assert getattr(packet, "type", "") == "chat_message"
|
||||
assert packet.system is True
|
||||
assert "not authorized" in packet.message.lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_chat_reboot_schedules_and_broadcasts_message(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
server = SignalingServer("127.0.0.1", 8765, None, None)
|
||||
ws = _fake_ws()
|
||||
client = ClientConnection(
|
||||
websocket=ws,
|
||||
id="u1",
|
||||
nickname="Tester",
|
||||
authenticated=True,
|
||||
user_id="1",
|
||||
username="tester",
|
||||
permissions={"chat.send", "server.allow_reboot"},
|
||||
)
|
||||
server.clients[ws] = client
|
||||
|
||||
broadcast_payloads: list[object] = []
|
||||
|
||||
async def fake_broadcast(packet: object, exclude: ServerConnection | None = None) -> None:
|
||||
broadcast_payloads.append(packet)
|
||||
|
||||
monkeypatch.setattr(server, "_broadcast", fake_broadcast)
|
||||
monkeypatch.setattr(server, "_schedule_reboot", lambda requested_by, message: requested_by == "tester" and message == "maintenance")
|
||||
|
||||
await server._handle_message(client, json.dumps({"type": "chat_message", "message": "/reboot maintenance"}))
|
||||
|
||||
assert len(broadcast_payloads) == 1
|
||||
packet = broadcast_payloads[0]
|
||||
assert getattr(packet, "type", "") == "chat_message"
|
||||
assert packet.system is True
|
||||
assert packet.message == "Server rebooting in 5 seconds. maintenance"
|
||||
|
||||
Reference in New Issue
Block a user