Files
chat_grid/server/app/server.py

1349 lines
58 KiB
Python

"""Websocket signaling server for chat, presence, and item interactions."""
from __future__ import annotations
import argparse
import asyncio
from datetime import datetime
from importlib.metadata import PackageNotFoundError, version as package_version
import json
import logging
import os
import re
import ssl
import time
import uuid
from pathlib import Path
from typing import Literal
from zoneinfo import ZoneInfo
from pydantic import ValidationError, TypeAdapter
from websockets.asyncio.server import ServerConnection, serve
from .client import ClientConnection
from .config import load_config
from .item_catalog import (
CLOCK_DEFAULT_TIME_ZONE,
CLOCK_TIME_ZONE_OPTIONS,
ITEM_PROPERTY_OPTIONS,
ITEM_TYPE_EDITABLE_PROPERTIES,
ITEM_TYPE_LABELS,
ITEM_TYPE_PROPERTY_METADATA,
ITEM_TYPE_SEQUENCE,
ITEM_TYPE_TOOLTIPS,
get_item_global_properties,
get_item_use_cooldown_ms,
)
from .item_type_handlers import get_item_type_handler
from .item_service import ItemService
from .models import (
BroadcastChatMessagePacket,
BroadcastNicknamePacket,
BroadcastPositionPacket,
ChatMessagePacket,
ClientPacket,
ForwardSignalPacket,
ItemActionResultPacket,
ItemAddPacket,
ItemDeletePacket,
ItemDropPacket,
ItemPianoNoteBroadcastPacket,
ItemPianoNotePacket,
ItemPianoRecordingPacket,
ItemPickupPacket,
ItemRemovePacket,
ItemUpdatePacket,
ItemUpsertPacket,
ItemUsePacket,
ItemUseSoundPacket,
NicknameResultPacket,
PingPacket,
PongPacket,
RemoteUser,
UpdateNicknamePacket,
UpdatePositionPacket,
UserLeftPacket,
WelcomePacket,
WorldItem,
)
LOGGER = logging.getLogger("chgrid.server")
PACKET_LOGGER = logging.getLogger("chgrid.server.packet")
CLIENT_PACKET_ADAPTER = TypeAdapter(ClientPacket)
MAX_ACTIVE_PIANO_KEYS_PER_CLIENT = 12
PIANO_RECORDING_MAX_MS = 30_000
PIANO_RECORDING_MAX_EVENTS = 4096
STATE_SAVE_DEBOUNCE_MS = 200
STATE_SAVE_MAX_DELAY_MS = 1000
class SignalingServer:
"""Coordinates websocket clients, signaling, and authoritative item actions."""
def __init__(
self,
host: str,
port: int,
ssl_cert: str | None,
ssl_key: str | None,
max_message_size: int = 2_000_000,
state_file: Path | None = None,
grid_size: int = 41,
):
"""Initialize runtime state, TLS context, and item service."""
self.host = host
self.port = port
self.max_message_size = max_message_size
self._ssl_context = self._build_ssl_context(ssl_cert, ssl_key)
self.clients: dict[ServerConnection, ClientConnection] = {}
self.item_service = ItemService(state_file=state_file)
self.item_last_use_ms: dict[str, int] = {}
self.active_piano_keys_by_client: dict[str, set[str]] = {}
self.piano_recording_state_by_item: dict[str, dict] = {}
self.piano_playback_tasks_by_item: dict[str, asyncio.Task[None]] = {}
self.grid_size = max(1, grid_size)
self.instance_id = str(uuid.uuid4())
self.server_version = self._resolve_server_version()
self._pending_state_save_handle: asyncio.TimerHandle | None = None
self._pending_state_save_started_at: float | None = None
@staticmethod
def _resolve_server_version() -> str:
"""Resolve serverInfo version, preferring synced web version when available."""
env_override = os.getenv("CHGRID_SERVER_VERSION", "").strip()
if env_override:
return env_override
try:
version_file = Path(__file__).resolve().parents[2] / "client" / "public" / "version.js"
text = version_file.read_text(encoding="utf-8")
match = re.search(r'CHGRID_WEB_VERSION\s*=\s*"([^"]+)"', text)
if match:
token = match.group(1).strip()
if token:
return token
except OSError:
pass
try:
return package_version("chgrid-server")
except PackageNotFoundError:
return "unknown"
@property
def items(self) -> dict[str, WorldItem]:
"""Expose current item map owned by the item service."""
return self.item_service.items
def _nickname_key(self, nickname: str) -> str:
"""Normalize nickname for case-insensitive comparisons."""
return nickname.casefold()
def _flush_state_save(self) -> None:
"""Immediately flush pending state persistence and clear debounce state."""
if self._pending_state_save_handle is not None:
self._pending_state_save_handle.cancel()
self._pending_state_save_handle = None
self._pending_state_save_started_at = None
self.item_service.save_state()
def _request_state_save(self) -> None:
"""Debounce/coalesce item-state persistence to reduce write churn."""
loop = asyncio.get_running_loop()
now = loop.time()
if self._pending_state_save_started_at is None:
self._pending_state_save_started_at = now
elapsed_ms = int((now - self._pending_state_save_started_at) * 1000)
if elapsed_ms >= STATE_SAVE_MAX_DELAY_MS:
self._flush_state_save()
return
if self._pending_state_save_handle is not None:
self._pending_state_save_handle.cancel()
remaining_ms = max(0, STATE_SAVE_MAX_DELAY_MS - elapsed_ms)
delay_ms = min(STATE_SAVE_DEBOUNCE_MS, remaining_ms)
self._pending_state_save_handle = loop.call_later(delay_ms / 1000, self._flush_state_save)
def _is_nickname_taken(self, nickname: str, exclude_client_id: str | None = None) -> bool:
"""Check whether nickname is already used by another active client."""
wanted = self._nickname_key(nickname)
for other in self.clients.values():
if exclude_client_id is not None and other.id == exclude_client_id:
continue
if self._nickname_key(other.nickname) == wanted:
return True
return False
@staticmethod
def _item_type_label(item: WorldItem) -> str:
"""Return user-facing item type wording for chat/status strings."""
return "radio" if item.type == "radio_station" else item.type
@staticmethod
def _resolve_item_use_sound(item: WorldItem) -> str | None:
"""Resolve one-shot use sound, preferring per-item param override."""
param_sound = item.params.get("useSound")
if isinstance(param_sound, str):
token = param_sound.strip()
if token:
return token
return None
if isinstance(item.useSound, str) and item.useSound.strip():
return item.useSound.strip()
return None
def _get_client_by_id(self, client_id: str) -> ClientConnection | None:
"""Resolve one connected client by id."""
for connected in self.clients.values():
if connected.id == client_id:
return connected
return None
def _get_piano_source_position(self, item: WorldItem) -> tuple[int, int]:
"""Resolve world position used for piano note spatial broadcasts."""
if item.carrierId:
carrier = self._get_client_by_id(item.carrierId)
if carrier is not None:
return carrier.x, carrier.y
return item.x, item.y
async def _broadcast_item_piano_note(
self,
item: WorldItem,
*,
sender_id: str,
key_id: str,
midi: int,
on: bool,
instrument_override: str | None = None,
voice_mode_override: str | None = None,
attack_override: int | None = None,
decay_override: int | None = None,
release_override: int | None = None,
brightness_override: int | None = None,
emit_range_override: int | None = None,
exclude: ServerConnection | None = None,
) -> None:
"""Broadcast one piano note event using current item synth settings."""
instrument = (instrument_override if isinstance(instrument_override, str) else str(item.params.get("instrument", "piano"))).strip().lower()
voice_mode = (voice_mode_override if isinstance(voice_mode_override, str) else str(item.params.get("voiceMode", "poly"))).strip().lower()
if voice_mode not in {"poly", "mono"}:
voice_mode = "poly"
octave = int(item.params.get("octave", 0)) if isinstance(item.params.get("octave", 0), (int, float)) else 0
attack = (
int(attack_override)
if isinstance(attack_override, int)
else int(item.params.get("attack", 15))
if isinstance(item.params.get("attack", 15), (int, float))
else 15
)
decay = (
int(decay_override)
if isinstance(decay_override, int)
else int(item.params.get("decay", 45))
if isinstance(item.params.get("decay", 45), (int, float))
else 45
)
release = (
int(release_override)
if isinstance(release_override, int)
else int(item.params.get("release", 35))
if isinstance(item.params.get("release", 35), (int, float))
else 35
)
brightness = (
int(brightness_override)
if isinstance(brightness_override, int)
else int(item.params.get("brightness", 55))
if isinstance(item.params.get("brightness", 55), (int, float))
else 55
)
emit_range = (
int(emit_range_override)
if isinstance(emit_range_override, int)
else int(item.params.get("emitRange", 15))
if isinstance(item.params.get("emitRange", 15), (int, float))
else 15
)
source_x, source_y = self._get_piano_source_position(item)
await self._broadcast(
ItemPianoNoteBroadcastPacket(
type="item_piano_note",
itemId=item.id,
senderId=sender_id,
keyId=key_id,
midi=max(0, min(127, int(midi))),
on=on,
instrument=instrument,
voiceMode=voice_mode,
octave=max(-2, min(2, octave)),
attack=max(0, min(100, attack)),
decay=max(0, min(100, decay)),
release=max(0, min(100, release)),
brightness=max(0, min(100, brightness)),
x=source_x,
y=source_y,
emitRange=max(5, min(20, emit_range)),
),
exclude=exclude,
)
def _cancel_piano_playback(self, item_id: str) -> None:
"""Cancel active playback task for one piano item, if any."""
task = self.piano_playback_tasks_by_item.pop(item_id, None)
if task is not None and not task.done():
task.cancel()
@staticmethod
def _recording_elapsed_ms(session: dict, now_monotonic: float | None = None) -> int:
"""Compute effective recorded duration, including currently active segment."""
elapsed_ms = int(session.get("elapsedMs", 0)) if isinstance(session.get("elapsedMs"), (int, float)) else 0
paused = session.get("paused") is True
if paused:
return max(0, elapsed_ms)
last_resume = session.get("lastResumeMonotonic")
if isinstance(last_resume, (int, float)):
now_value = now_monotonic if isinstance(now_monotonic, (int, float)) else time.monotonic()
elapsed_ms += max(0, int((now_value - float(last_resume)) * 1000))
return max(0, elapsed_ms)
async def _finalize_piano_recording(self, item_id: str, *, status_message: str | None = None) -> None:
"""Persist and broadcast one active recording session, then clear runtime state."""
session = self.piano_recording_state_by_item.pop(item_id, None)
if not session:
return
auto_stop_task = session.get("autoStopTask")
if isinstance(auto_stop_task, asyncio.Task) and not auto_stop_task.done():
auto_stop_task.cancel()
item = self.items.get(item_id)
if not item or item.type != "piano":
return
elapsed_ms = max(0, min(PIANO_RECORDING_MAX_MS, self._recording_elapsed_ms(session)))
recorded_events = session.get("events")
events = list(recorded_events) if isinstance(recorded_events, list) else []
song_id = f"item:{item.id}:recording"
keys: list[str] = []
key_to_index: dict[str, int] = {}
states: list[list[object]] = []
state_to_index: dict[tuple[object, ...], int] = {}
compact_events: list[list[int]] = []
for event in events:
if not isinstance(event, dict):
continue
t = int(event.get("t", 0)) if isinstance(event.get("t"), (int, float)) else 0
key_id = str(event.get("keyId", "")).strip()
midi = int(event.get("midi", 0)) if isinstance(event.get("midi"), (int, float)) else 0
on = 1 if event.get("on") is True else 0
instrument = str(event.get("instrument", "piano")).strip().lower() or "piano"
voice_mode = str(event.get("voiceMode", "poly")).strip().lower()
if voice_mode not in {"mono", "poly"}:
voice_mode = "poly"
attack = int(event.get("attack", 15)) if isinstance(event.get("attack"), (int, float)) else 15
decay = int(event.get("decay", 45)) if isinstance(event.get("decay"), (int, float)) else 45
release = int(event.get("release", 35)) if isinstance(event.get("release"), (int, float)) else 35
brightness = int(event.get("brightness", 55)) if isinstance(event.get("brightness"), (int, float)) else 55
emit_range = int(event.get("emitRange", 15)) if isinstance(event.get("emitRange"), (int, float)) else 15
state_key = (
instrument,
voice_mode,
max(0, min(100, attack)),
max(0, min(100, decay)),
max(0, min(100, release)),
max(0, min(100, brightness)),
max(5, min(20, emit_range)),
)
if not key_id:
continue
index = key_to_index.get(key_id)
if index is None:
index = len(keys)
keys.append(key_id)
key_to_index[key_id] = index
state_index = state_to_index.get(state_key)
if state_index is None:
state_index = len(states)
states.append(list(state_key))
state_to_index[state_key] = state_index
compact_events.append([max(0, min(PIANO_RECORDING_MAX_MS, t)), index, max(0, min(127, midi)), on, state_index])
compact_events.sort(key=lambda row: row[0])
first_state = states[0] if states else ["piano", "poly", 15, 45, 35, 55, 15]
self.item_service.piano_songs[song_id] = {
"meta": {
"instrument": first_state[0],
"voiceMode": first_state[1],
"attack": first_state[2],
"decay": first_state[3],
"release": first_state[4],
"brightness": first_state[5],
"emitRange": first_state[6],
"recordingLengthMs": elapsed_ms,
},
"keys": keys,
"states": states,
"events": compact_events,
}
self.item_service.save_piano_songs()
item.params["songId"] = song_id
item.params.pop("recording", None)
item.params.pop("recordingLengthMs", None)
item.updatedAt = self.item_service.now_ms()
item.version += 1
self._request_state_save()
await self._broadcast_item(item)
owner_id = str(session.get("ownerClientId", ""))
owner = self._get_client_by_id(owner_id) if owner_id else None
if owner and status_message:
await self._send_item_result(owner, True, "use", status_message, item.id)
async def _auto_stop_piano_recording(self, item_id: str) -> None:
"""Stop a recording automatically at the max recording duration."""
try:
while True:
session = self.piano_recording_state_by_item.get(item_id)
if not isinstance(session, dict):
return
if self._recording_elapsed_ms(session) >= PIANO_RECORDING_MAX_MS:
await self._finalize_piano_recording(item_id, status_message="stop")
return
await asyncio.sleep(0.25)
except asyncio.CancelledError:
return
async def _start_piano_playback(self, item: WorldItem) -> None:
"""Run one piano recording playback task and broadcast note events."""
sender_id = f"item:{item.id}:playback"
events: list[dict[str, object]] = []
song_id = str(item.params.get("songId", "")).strip()
song_payload = self.item_service.piano_songs.get(song_id) if song_id else None
if isinstance(song_payload, dict):
keys = song_payload.get("keys")
states = song_payload.get("states")
compact_events = song_payload.get("events")
meta = song_payload.get("meta")
if isinstance(keys, list) and isinstance(compact_events, list):
base_state = None
if isinstance(meta, dict):
instrument = str(meta.get("instrument", "")).strip().lower() or "piano"
raw_voice_mode = str(meta.get("voiceMode", "")).strip().lower()
voice_mode = raw_voice_mode if raw_voice_mode in {"mono", "poly"} else "poly"
attack = int(meta.get("attack", 15)) if isinstance(meta.get("attack"), (int, float)) else 15
decay = int(meta.get("decay", 45)) if isinstance(meta.get("decay"), (int, float)) else 45
release = int(meta.get("release", 35)) if isinstance(meta.get("release"), (int, float)) else 35
brightness = int(meta.get("brightness", 55)) if isinstance(meta.get("brightness"), (int, float)) else 55
emit_range = int(meta.get("emitRange", 15)) if isinstance(meta.get("emitRange"), (int, float)) else 15
base_state = (
instrument,
voice_mode,
max(0, min(100, attack)),
max(0, min(100, decay)),
max(0, min(100, release)),
max(0, min(100, brightness)),
max(5, min(20, emit_range)),
)
for row in compact_events:
if not isinstance(row, list) or len(row) < 4:
continue
raw_time, raw_key_idx, raw_midi, raw_on = row[:4]
if not isinstance(raw_time, (int, float)) or not isinstance(raw_key_idx, (int, float)) or not isinstance(raw_midi, (int, float)):
continue
key_idx = int(raw_key_idx)
if key_idx < 0 or key_idx >= len(keys):
continue
raw_key = keys[key_idx]
if not isinstance(raw_key, str) or not raw_key.strip():
continue
state = base_state
if len(row) >= 5 and isinstance(states, list) and isinstance(row[4], (int, float)):
state_idx = int(row[4])
if 0 <= state_idx < len(states):
state_row = states[state_idx]
if isinstance(state_row, list) and len(state_row) >= 7:
candidate_instrument = str(state_row[0]).strip().lower() or "piano"
candidate_voice_mode = str(state_row[1]).strip().lower()
state = (
candidate_instrument,
candidate_voice_mode if candidate_voice_mode in {"mono", "poly"} else "poly",
max(0, min(100, int(state_row[2]) if isinstance(state_row[2], (int, float)) else 15)),
max(0, min(100, int(state_row[3]) if isinstance(state_row[3], (int, float)) else 45)),
max(0, min(100, int(state_row[4]) if isinstance(state_row[4], (int, float)) else 35)),
max(0, min(100, int(state_row[5]) if isinstance(state_row[5], (int, float)) else 55)),
max(5, min(20, int(state_row[6]) if isinstance(state_row[6], (int, float)) else 15)),
)
if state is None:
continue
events.append(
{
"t": max(0, min(PIANO_RECORDING_MAX_MS, int(raw_time))),
"keyId": raw_key[:32],
"midi": max(0, min(127, int(raw_midi))),
"on": bool(raw_on),
"instrument": state[0],
"voiceMode": state[1],
"attack": state[2],
"decay": state[3],
"release": state[4],
"brightness": state[5],
"emitRange": state[6],
}
)
events.sort(key=lambda entry: int(entry["t"]))
if not events:
return
active_keys: dict[str, int] = {}
previous_at_ms = 0
try:
for event in events:
current_at_ms = int(event["t"])
delay_ms = max(0, current_at_ms - previous_at_ms)
if delay_ms > 0:
await asyncio.sleep(delay_ms / 1000)
current_item = self.items.get(item.id)
if not current_item or current_item.type != "piano":
break
key_id = str(event["keyId"])
midi = int(event["midi"])
on = bool(event["on"])
if on:
active_keys[key_id] = midi
else:
active_keys.pop(key_id, None)
await self._broadcast_item_piano_note(
current_item,
sender_id=sender_id,
key_id=key_id,
midi=midi,
on=on,
instrument_override=event.get("instrument") if isinstance(event.get("instrument"), str) else None,
voice_mode_override=event.get("voiceMode") if isinstance(event.get("voiceMode"), str) else None,
attack_override=event.get("attack") if isinstance(event.get("attack"), int) else None,
decay_override=event.get("decay") if isinstance(event.get("decay"), int) else None,
release_override=event.get("release") if isinstance(event.get("release"), int) else None,
brightness_override=event.get("brightness") if isinstance(event.get("brightness"), int) else None,
emit_range_override=event.get("emitRange") if isinstance(event.get("emitRange"), int) else None,
)
previous_at_ms = current_at_ms
except asyncio.CancelledError:
pass
finally:
current_item = self.items.get(item.id)
if current_item and current_item.type == "piano":
for key_id, midi in list(active_keys.items()):
await self._broadcast_item_piano_note(
current_item,
sender_id=sender_id,
key_id=key_id,
midi=midi,
on=False,
)
current_task = self.piano_playback_tasks_by_item.get(item.id)
if current_task is asyncio.current_task():
self.piano_playback_tasks_by_item.pop(item.id, None)
def _is_in_bounds(self, x: int, y: int) -> bool:
"""Check whether a coordinate is inside server-authoritative world bounds."""
return 0 <= x < self.grid_size and 0 <= y < self.grid_size
@staticmethod
def _normalize_clock_timezone(value: object) -> str:
"""Normalize timezone input to one of supported clock zones."""
token = str(value or "").strip()
if token in CLOCK_TIME_ZONE_OPTIONS:
return token
return CLOCK_DEFAULT_TIME_ZONE
@staticmethod
def _parse_clock_use_24_hour(value: object) -> bool | None:
"""Parse bool-like clock format values (`on/off`, `true/false`, etc.)."""
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
token = value.strip().lower()
if token in {"on", "true", "1", "yes"}:
return True
if token in {"off", "false", "0", "no"}:
return False
return None
@classmethod
def _format_clock_display_time(cls, params: dict) -> str:
"""Render current clock text based on item timezone/format params."""
tz_name = cls._normalize_clock_timezone(params.get("timeZone"))
use_24_hour = cls._parse_clock_use_24_hour(params.get("use24Hour"))
if use_24_hour is None:
use_24_hour = False
now = datetime.now(ZoneInfo(tz_name))
if use_24_hour:
return now.strftime("%H:%M")
hour_12 = now.hour % 12 or 12
return f"{hour_12}:{now.minute:02d} {'AM' if now.hour < 12 else 'PM'}"
async def _send_item_result(
self,
client: ClientConnection,
ok: bool,
action: Literal["add", "pickup", "drop", "delete", "use", "update"],
message: str,
item_id: str | None = None,
) -> None:
"""Send a structured item action result to one client."""
await self._send(
client.websocket,
ItemActionResultPacket(
type="item_action_result",
ok=ok,
action=action,
message=message,
itemId=item_id,
),
)
async def _broadcast_item(self, item: WorldItem) -> None:
"""Broadcast a full item snapshot update to all connected clients."""
await self._broadcast(ItemUpsertPacket(type="item_upsert", item=item))
async def start(self) -> None:
"""Start websocket serving and run until cancelled."""
protocol = "wss" if self._ssl_context else "ws"
LOGGER.info("starting signaling server on %s://%s:%d", protocol, self.host, self.port)
try:
async with serve(
self._handle_client,
self.host,
self.port,
ssl=self._ssl_context,
max_size=self.max_message_size,
):
await asyncio.Future()
finally:
self._flush_state_save()
async def _handle_client(self, websocket: ServerConnection) -> None:
"""Handle one websocket client's connect/message/disconnect lifecycle."""
client = ClientConnection(websocket=websocket, id=str(uuid.uuid4()))
self.clients[websocket] = client
LOGGER.info("client connected id=%s total=%d", client.id, len(self.clients))
try:
await self._send_welcome(client)
async for raw_message in websocket:
await self._handle_message(client, raw_message)
finally:
if websocket in self.clients:
disconnected = self.clients.pop(websocket)
self.active_piano_keys_by_client.pop(disconnected.id, None)
for item_id, session in list(self.piano_recording_state_by_item.items()):
if session.get("ownerClientId") != disconnected.id:
continue
await self._finalize_piano_recording(item_id)
for item in self.item_service.drop_carried_items_for_disconnect(disconnected):
await self._broadcast_item(item)
self._request_state_save()
LOGGER.info(
"client disconnected id=%s nickname=%s total=%d",
disconnected.id,
disconnected.nickname,
len(self.clients),
)
await self._broadcast(UserLeftPacket(type="user_left", id=disconnected.id), exclude=websocket)
await self._broadcast(
BroadcastChatMessagePacket(
type="chat_message",
message=f"{disconnected.nickname} has logged out.",
system=True,
),
exclude=websocket,
)
async def _send_welcome(self, client: ClientConnection) -> None:
"""Send initial world snapshot to a newly connected client."""
users = [
RemoteUser(id=other.id, nickname=other.nickname, x=other.x, y=other.y)
for ws, other in self.clients.items()
if ws is not client.websocket
]
packet = WelcomePacket(
type="welcome",
id=client.id,
users=users,
items=[item.model_dump(exclude_none=True) for item in self.items.values()],
worldConfig={"gridSize": self.grid_size},
uiDefinitions=self._build_ui_definitions(),
serverInfo={"instanceId": self.instance_id, "version": self.server_version},
)
await self._send(client.websocket, packet)
def _build_ui_definitions(self) -> dict:
"""Build server-owned UI definitions for item/menu rendering."""
item_types: list[dict] = []
for item_type in ITEM_TYPE_SEQUENCE:
editable = list(ITEM_TYPE_EDITABLE_PROPERTIES.get(item_type, ("title",)))
property_options: dict[str, list[str]] = {}
for key in editable:
options = ITEM_PROPERTY_OPTIONS.get(key)
if options:
property_options[key] = list(options)
item_types.append(
{
"type": item_type,
"label": ITEM_TYPE_LABELS.get(item_type, item_type),
"tooltip": ITEM_TYPE_TOOLTIPS.get(item_type),
"editableProperties": editable,
"propertyOptions": property_options,
"propertyMetadata": ITEM_TYPE_PROPERTY_METADATA.get(item_type, {}),
"globalProperties": get_item_global_properties(item_type),
}
)
return {
"itemTypeOrder": list(ITEM_TYPE_SEQUENCE),
"itemTypes": item_types,
}
async def _broadcast_wheel_result_after_delay(
self,
client: ClientConnection,
self_message: str,
others_message: str,
delay_seconds: float = 3.0,
) -> None:
"""Delay then publish wheel result text to self and other users."""
await asyncio.sleep(delay_seconds)
await self._broadcast(
BroadcastChatMessagePacket(type="chat_message", message=others_message, system=True),
exclude=client.websocket,
)
if client.websocket in self.clients:
await self._send(
client.websocket,
BroadcastChatMessagePacket(type="chat_message", message=self_message, system=True),
)
async def _handle_message(self, client: ClientConnection, raw_message: str) -> None:
"""Decode, validate, and route one inbound client packet."""
try:
payload = json.loads(raw_message)
except json.JSONDecodeError:
PACKET_LOGGER.warning("non-json packet from id=%s", client.id)
return
try:
packet = CLIENT_PACKET_ADAPTER.validate_python(payload)
except ValidationError as exc:
PACKET_LOGGER.warning("invalid packet from id=%s: %s", client.id, exc)
return
if isinstance(packet, UpdatePositionPacket):
if not self._is_in_bounds(packet.x, packet.y):
PACKET_LOGGER.warning(
"out-of-bounds position ignored id=%s x=%d y=%d grid_size=%d",
client.id,
packet.x,
packet.y,
self.grid_size,
)
return
client.x = packet.x
client.y = packet.y
await self._broadcast(
BroadcastPositionPacket(type="update_position", id=client.id, x=client.x, y=client.y),
exclude=client.websocket,
)
carried = self.item_service.find_carried_item(client.id)
if carried:
carried.x = client.x
carried.y = client.y
carried.updatedAt = self.item_service.now_ms()
await self._broadcast_item(carried)
return
if isinstance(packet, UpdateNicknamePacket):
requested_nickname = packet.nickname.strip()
if not requested_nickname:
await self._send(
client.websocket,
NicknameResultPacket(
type="nickname_result",
accepted=False,
requestedNickname=packet.nickname,
effectiveNickname=client.nickname,
reason="Nickname is required.",
),
)
return
old_nickname = client.nickname
if self._is_nickname_taken(requested_nickname, exclude_client_id=client.id):
await self._send(
client.websocket,
NicknameResultPacket(
type="nickname_result",
accepted=False,
requestedNickname=requested_nickname,
effectiveNickname=client.nickname,
reason="Nickname already in use.",
),
)
return
if requested_nickname == old_nickname:
await self._send(
client.websocket,
NicknameResultPacket(
type="nickname_result",
accepted=True,
requestedNickname=requested_nickname,
effectiveNickname=client.nickname,
),
)
return
client.nickname = requested_nickname
if old_nickname == "user...":
LOGGER.info("user login id=%s nickname=%s", client.id, client.nickname)
else:
LOGGER.info("nickname change id=%s old=%s new=%s", client.id, old_nickname, client.nickname)
await self._send(
client.websocket,
NicknameResultPacket(
type="nickname_result",
accepted=True,
requestedNickname=requested_nickname,
effectiveNickname=client.nickname,
),
)
await self._broadcast(
BroadcastNicknamePacket(type="update_nickname", id=client.id, nickname=client.nickname),
exclude=client.websocket,
)
if old_nickname == "user...":
await self._broadcast(
BroadcastChatMessagePacket(
type="chat_message",
message=f"{client.nickname} has logged in.",
system=True,
),
exclude=client.websocket,
)
else:
await self._broadcast(
BroadcastChatMessagePacket(
type="chat_message",
message=f"{old_nickname} is now known as {client.nickname}.",
system=True,
),
exclude=client.websocket,
)
self_message = (
f"Welcome. Logged in as {client.nickname}."
if old_nickname == "user..."
else f"You are now known as {client.nickname}."
)
await self._send(
client.websocket,
BroadcastChatMessagePacket(
type="chat_message",
message=self_message,
system=True,
),
)
return
if isinstance(packet, ChatMessagePacket):
await self._broadcast(
BroadcastChatMessagePacket(
type="chat_message",
message=packet.message,
senderId=client.id,
senderNickname=client.nickname,
system=False,
)
)
return
if isinstance(packet, PingPacket):
await self._send(
client.websocket,
PongPacket(type="pong", clientSentAt=packet.clientSentAt),
)
return
if isinstance(packet, ItemAddPacket):
item = self.item_service.default_item(client, packet.itemType)
self.item_service.add_item(item)
await self._broadcast_item(item)
self._request_state_save()
LOGGER.info(
"item created by=%s item_id=%s type=%s title=%s x=%d y=%d",
client.nickname,
item.id,
item.type,
item.title,
item.x,
item.y,
)
item_text = f"{item.title} ({self._item_type_label(item)})"
await self._broadcast(
BroadcastChatMessagePacket(
type="chat_message",
message=f"{client.nickname} placed {item_text} at {item.x}, {item.y}.",
system=True,
),
exclude=client.websocket,
)
await self._send_item_result(
client,
True,
"add",
f"You placed {item_text} at {item.x}, {item.y}.",
item.id,
)
return
if isinstance(packet, ItemPickupPacket):
item = self.items.get(packet.itemId)
if not item:
await self._send_item_result(client, False, "pickup", "Item not found.")
return
if item.carrierId and item.carrierId != client.id:
await self._send_item_result(client, False, "pickup", "Item is already being carried.", item.id)
return
carried = self.item_service.find_carried_item(client.id)
if carried and carried.id != item.id:
await self._send_item_result(client, False, "pickup", "You are already carrying an item.", item.id)
return
if item.carrierId is None and (item.x != client.x or item.y != client.y):
await self._send_item_result(client, False, "pickup", "Item is not on your square.", item.id)
return
item.carrierId = client.id
item.x = client.x
item.y = client.y
item.updatedAt = self.item_service.now_ms()
await self._broadcast_item(item)
self._request_state_save()
await self._send_item_result(client, True, "pickup", f"Picked up {item.title}.", item.id)
return
if isinstance(packet, ItemDropPacket):
item = self.items.get(packet.itemId)
if not item:
await self._send_item_result(client, False, "drop", "Item not found.")
return
if item.carrierId != client.id:
await self._send_item_result(client, False, "drop", "You are not carrying that item.", item.id)
return
if not self._is_in_bounds(packet.x, packet.y):
await self._send_item_result(client, False, "drop", "Drop position is out of bounds.", item.id)
return
item.carrierId = None
item.x = packet.x
item.y = packet.y
item.updatedAt = self.item_service.now_ms()
await self._broadcast_item(item)
self._request_state_save()
await self._send_item_result(client, True, "drop", f"Dropped {item.title}.", item.id)
return
if isinstance(packet, ItemDeletePacket):
item = self.items.get(packet.itemId)
if not item:
await self._send_item_result(client, False, "delete", "Item not found.")
return
if item.carrierId and item.carrierId != client.id:
await self._send_item_result(client, False, "delete", "Item is being carried by another user.", item.id)
return
if item.carrierId is None and (item.x != client.x or item.y != client.y):
await self._send_item_result(client, False, "delete", "Item is not on your square.", item.id)
return
LOGGER.info(
"item deleted by=%s item_id=%s type=%s title=%s",
client.nickname,
item.id,
item.type,
item.title,
)
self._cancel_piano_playback(item.id)
recording_state = self.piano_recording_state_by_item.pop(item.id, None)
if recording_state is not None:
auto_stop_task = recording_state.get("autoStopTask")
if isinstance(auto_stop_task, asyncio.Task) and not auto_stop_task.done():
auto_stop_task.cancel()
song_id = str(item.params.get("songId", "")).strip()
if song_id and song_id in self.item_service.piano_songs:
self.item_service.piano_songs.pop(song_id, None)
self.item_service.save_piano_songs()
self.item_service.remove_item(item.id)
self.item_last_use_ms.pop(item.id, None)
await self._broadcast(ItemRemovePacket(type="item_remove", itemId=item.id))
self._request_state_save()
await self._send_item_result(client, True, "delete", f"Deleted {item.title}.", item.id)
return
if isinstance(packet, ItemUsePacket):
item = self.items.get(packet.itemId)
if not item:
await self._send_item_result(client, False, "use", "Item not found.")
return
if item.carrierId not in (None, client.id):
await self._send_item_result(client, False, "use", "Item is not available.", item.id)
return
if item.carrierId is None and (item.x != client.x or item.y != client.y):
await self._send_item_result(client, False, "use", "Item is not on your square.", item.id)
return
handler = get_item_type_handler(item.type)
now_ms = self.item_service.now_ms()
cooldown_ms = get_item_use_cooldown_ms(item.type)
last_use_ms = self.item_last_use_ms.get(item.id)
if last_use_ms is not None and now_ms - last_use_ms < cooldown_ms:
remaining_ms = cooldown_ms - (now_ms - last_use_ms)
remaining_seconds = max(0.1, round(remaining_ms / 1000, 1))
await self._send_item_result(
client,
False,
"use",
f"{item.title} is on cooldown for {remaining_seconds:.1f} s.",
item.id,
)
return
try:
use_result = handler.use(item, client.nickname, self._format_clock_display_time)
except ValueError as exc:
await self._send_item_result(client, False, "use", str(exc), item.id)
return
if use_result.updated_params is not None:
try:
item.params = handler.validate_update(item, {**item.params, **use_result.updated_params})
except ValueError as exc:
await self._send_item_result(client, False, "use", str(exc), item.id)
return
item.updatedAt = now_ms
self._request_state_save()
await self._broadcast_item(item)
self.item_last_use_ms[item.id] = now_ms
await self._broadcast(
BroadcastChatMessagePacket(type="chat_message", message=use_result.others_message, system=True),
exclude=client.websocket,
)
use_sound = self._resolve_item_use_sound(item)
if use_sound:
await self._broadcast(
ItemUseSoundPacket(
type="item_use_sound",
itemId=item.id,
sound=use_sound,
x=item.x,
y=item.y,
)
)
await self._send_item_result(client, True, "use", use_result.self_message, item.id)
if use_result.delayed_self_message is not None and use_result.delayed_others_message is not None:
asyncio.create_task(
self._broadcast_wheel_result_after_delay(
client=client,
self_message=use_result.delayed_self_message,
others_message=use_result.delayed_others_message,
)
)
return
if isinstance(packet, ItemPianoNotePacket):
item = self.items.get(packet.itemId)
if not item or item.type != "piano":
return
if item.carrierId not in (None, client.id):
return
if item.carrierId is None and (item.x != client.x or item.y != client.y):
return
active_keys = self.active_piano_keys_by_client.setdefault(client.id, set())
if packet.on:
if packet.keyId not in active_keys and len(active_keys) >= MAX_ACTIVE_PIANO_KEYS_PER_CLIENT:
return
active_keys.add(packet.keyId)
else:
active_keys.discard(packet.keyId)
recording_state = self.piano_recording_state_by_item.get(item.id)
if recording_state and recording_state.get("ownerClientId") == client.id and recording_state.get("paused") is not True:
elapsed_ms = max(0, min(PIANO_RECORDING_MAX_MS, self._recording_elapsed_ms(recording_state)))
events = recording_state.get("events")
if isinstance(events, list) and len(events) < PIANO_RECORDING_MAX_EVENTS:
instrument = str(item.params.get("instrument", "piano")).strip().lower()
voice_mode = str(item.params.get("voiceMode", "poly")).strip().lower()
if voice_mode not in {"poly", "mono"}:
voice_mode = "poly"
attack = int(item.params.get("attack", 15)) if isinstance(item.params.get("attack", 15), (int, float)) else 15
decay = int(item.params.get("decay", 45)) if isinstance(item.params.get("decay", 45), (int, float)) else 45
release = int(item.params.get("release", 35)) if isinstance(item.params.get("release", 35), (int, float)) else 35
brightness = int(item.params.get("brightness", 55)) if isinstance(item.params.get("brightness", 55), (int, float)) else 55
emit_range = int(item.params.get("emitRange", 15)) if isinstance(item.params.get("emitRange", 15), (int, float)) else 15
events.append(
{
"t": elapsed_ms,
"keyId": packet.keyId[:32],
"midi": packet.midi,
"on": packet.on,
"instrument": instrument,
"voiceMode": voice_mode,
"attack": max(0, min(100, attack)),
"decay": max(0, min(100, decay)),
"release": max(0, min(100, release)),
"brightness": max(0, min(100, brightness)),
"emitRange": max(5, min(20, emit_range)),
}
)
if elapsed_ms >= PIANO_RECORDING_MAX_MS:
await self._finalize_piano_recording(item.id, status_message="stop")
await self._broadcast_item_piano_note(
item,
sender_id=client.id,
key_id=packet.keyId,
midi=packet.midi,
on=packet.on,
exclude=client.websocket,
)
return
if isinstance(packet, ItemPianoRecordingPacket):
item = self.items.get(packet.itemId)
if not item or item.type != "piano":
await self._send_item_result(client, False, "use", "Piano not found.")
return
if item.carrierId not in (None, client.id):
await self._send_item_result(client, False, "use", "Piano is not available.", item.id)
return
if item.carrierId is None and (item.x != client.x or item.y != client.y):
await self._send_item_result(client, False, "use", "Piano is not on your square.", item.id)
return
if packet.action == "toggle_record":
existing = self.piano_recording_state_by_item.get(item.id)
if existing and existing.get("ownerClientId") != client.id:
await self._send_item_result(client, False, "use", "This piano is already recording.", item.id)
return
if existing and existing.get("ownerClientId") == client.id:
if existing.get("paused") is True:
existing["paused"] = False
existing["lastResumeMonotonic"] = time.monotonic()
await self._send_item_result(client, True, "use", "resume", item.id)
else:
existing["elapsedMs"] = self._recording_elapsed_ms(existing)
existing["paused"] = True
existing.pop("lastResumeMonotonic", None)
await self._send_item_result(client, True, "use", "pause", item.id)
return
self._cancel_piano_playback(item.id)
recording_state = {
"ownerClientId": client.id,
"elapsedMs": 0,
"paused": False,
"lastResumeMonotonic": time.monotonic(),
"events": [],
}
self.piano_recording_state_by_item[item.id] = recording_state
auto_stop_task = asyncio.create_task(self._auto_stop_piano_recording(item.id))
recording_state["autoStopTask"] = auto_stop_task
await self._send_item_result(client, True, "use", "record", item.id)
return
if packet.action == "stop_record":
existing = self.piano_recording_state_by_item.get(item.id)
if existing and existing.get("ownerClientId") != client.id:
await self._send_item_result(client, False, "use", "This piano is already recording.", item.id)
return
if existing and existing.get("ownerClientId") == client.id:
await self._finalize_piano_recording(item.id, status_message="stop")
return
await self._send_item_result(client, True, "use", "stop", item.id)
return
if packet.action == "playback":
if item.id in self.piano_recording_state_by_item:
await self._send_item_result(client, False, "use", "Stop recording before playback.", item.id)
return
song_id = str(item.params.get("songId", "")).strip()
has_song = isinstance(self.item_service.piano_songs.get(song_id), dict) if song_id else False
if not has_song:
await self._send_item_result(client, False, "use", "No recording saved on this piano.", item.id)
return
self._cancel_piano_playback(item.id)
playback_task = asyncio.create_task(self._start_piano_playback(item))
self.piano_playback_tasks_by_item[item.id] = playback_task
await self._send_item_result(client, True, "use", "play", item.id)
return
if packet.action == "stop_playback":
self._cancel_piano_playback(item.id)
await self._send_item_result(client, True, "use", "stop", item.id)
return
return
if isinstance(packet, ItemUpdatePacket):
item = self.items.get(packet.itemId)
if not item:
await self._send_item_result(client, False, "update", "Item not found.")
return
if item.carrierId not in (None, client.id):
await self._send_item_result(client, False, "update", "Item is not available for editing.", item.id)
return
if item.carrierId is None and (item.x != client.x or item.y != client.y):
await self._send_item_result(client, False, "update", "Item is not on your square.", item.id)
return
if packet.title is not None:
title = packet.title.strip()
if not title:
await self._send_item_result(client, False, "update", "Title cannot be empty.", item.id)
return
item.title = title[:80]
if packet.params:
next_params = {**item.params, **packet.params}
handler = get_item_type_handler(item.type)
try:
next_params = handler.validate_update(item, next_params)
except ValueError as exc:
await self._send_item_result(client, False, "update", str(exc), item.id)
return
item.params = next_params
item.updatedAt = self.item_service.now_ms()
item.version += 1
await self._broadcast_item(item)
self._request_state_save()
await self._send_item_result(client, True, "update", f"Updated {item.title}.", item.id)
return
target = self._find_by_id(packet.targetId)
if not target:
PACKET_LOGGER.info("signal target not found sender=%s target=%s", client.id, packet.targetId)
return
await self._send(
target.websocket,
ForwardSignalPacket(
type="signal",
senderId=client.id,
senderNickname=client.nickname,
x=client.x,
y=client.y,
sdp=packet.sdp,
ice=packet.ice,
),
)
async def _broadcast(self, packet: object, exclude: ServerConnection | None = None) -> None:
"""Broadcast one packet to all clients except an optional websocket."""
recipients = [websocket for websocket in self.clients if websocket is not exclude]
if not recipients:
return
await asyncio.gather(*(self._send(websocket, packet) for websocket in recipients))
async def _send(self, websocket: ServerConnection, packet: object) -> None:
"""Send one packet to one websocket, swallowing per-client send failures."""
try:
if hasattr(packet, "model_dump"):
data = packet.model_dump(exclude_none=True)
else:
data = packet
await websocket.send(json.dumps(data))
except Exception as exc: # intentionally broad to keep server alive per client error
LOGGER.debug("send failure: %s", exc)
def _find_by_id(self, client_id: str) -> ClientConnection | None:
"""Resolve a client id to an active connection."""
for client in self.clients.values():
if client.id == client_id:
return client
return None
@staticmethod
def _build_ssl_context(cert: str | None, key: str | None) -> ssl.SSLContext | None:
"""Create TLS server context when cert/key are configured."""
if not cert or not key:
return None
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(certfile=Path(cert), keyfile=Path(key))
return context
def run() -> None:
"""CLI entrypoint for running the signaling server process."""
parser = argparse.ArgumentParser(description="chgrid signaling server")
parser.add_argument("--config", default="config.toml")
parser.add_argument("--host", default=None)
parser.add_argument("--port", type=int, default=None)
parser.add_argument("--ssl-cert", default=None)
parser.add_argument("--ssl-key", default=None)
parser.add_argument("--allow-insecure-ws", action="store_true", default=None)
args = parser.parse_args()
config_path = Path(args.config) if args.config else None
if config_path and not config_path.exists() and args.config == "config.toml":
config_path = None
config = load_config(config_path)
host = args.host or config.server.bind_ip
port = args.port or config.server.port
allow_insecure_ws = config.network.allow_insecure_ws
if args.allow_insecure_ws is True:
allow_insecure_ws = True
ssl_cert = args.ssl_cert if args.ssl_cert is not None else config.tls.cert_file or None
ssl_key = args.ssl_key if args.ssl_key is not None else config.tls.key_file or None
state_file_value = config.storage.state_file.strip()
state_file: Path | None = None
if state_file_value:
base_dir = config_path.parent if config_path is not None else Path.cwd()
state_file = Path(state_file_value)
if not state_file.is_absolute():
state_file = base_dir / state_file
if not allow_insecure_ws and (not ssl_cert or not ssl_key):
raise SystemExit(
"TLS is required when insecure ws is disabled. Set tls.cert_file/tls.key_file in config.toml."
)
logging.basicConfig(
level=getattr(logging, config.logging.level.upper(), logging.INFO),
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
server = SignalingServer(
host,
port,
ssl_cert,
ssl_key,
max_message_size=config.network.max_message_bytes,
state_file=state_file,
grid_size=config.world.grid_size,
)
asyncio.run(server.start())