Add admin delete-account flow with yes/no confirmation

This commit is contained in:
Jage9
2026-02-28 20:06:43 -05:00
parent b0fa040d33
commit 906c320e51
12 changed files with 239 additions and 6 deletions

View File

@@ -438,6 +438,35 @@ class AuthService:
self._db_commit()
return normalized_username
def delete_user(self, target_username: str, *, actor_user_id: str | None = None) -> str:
"""Delete one account and related session/state rows."""
normalized_username = self._normalize_username(target_username)
user_row = self._db_fetchone(
"""
SELECT u.id, u.status, r.name AS role_name
FROM users u
JOIN roles r ON r.id = u.role_id
WHERE u.username = ?
""",
(normalized_username,),
)
if user_row is None:
raise AuthError("User not found.")
user_id = int(user_row["id"])
current_status = str(user_row["status"])
current_role = str(user_row["role_name"])
if current_role == "admin" and current_status == "active" and self._active_admin_count() <= 1:
raise AuthError("Cannot delete the last active admin.")
if actor_user_id is not None and str(user_id) == str(actor_user_id):
if current_role == "admin" and current_status == "active" and self._active_admin_count() <= 1:
raise AuthError("Cannot delete your own account while you are the last active admin.")
self._db_execute("DELETE FROM sessions WHERE user_id = ?", (user_id,))
self._db_execute("DELETE FROM user_state WHERE user_id = ?", (user_id,))
self._db_execute("DELETE FROM users WHERE id = ?", (user_id,))
self._db_commit()
return normalized_username
def get_user_by_id(self, user_id: str) -> AuthUser | None:
"""Return one user by id with current role and permissions."""

View File

@@ -86,7 +86,7 @@ class AdminRoleDeletePacket(BasePacket):
class AdminUsersListPacket(BasePacket):
type: Literal["admin_users_list"]
action: Literal["set_role", "ban", "unban"] | None = None
action: Literal["set_role", "ban", "unban", "delete_account"] | None = None
class AdminUserSetRolePacket(BasePacket):
@@ -105,6 +105,11 @@ class AdminUserUnbanPacket(BasePacket):
username: str = Field(min_length=1, max_length=128)
class AdminUserDeletePacket(BasePacket):
type: Literal["admin_user_delete"]
username: str = Field(min_length=1, max_length=128)
class PingPacket(BasePacket):
type: Literal["ping"]
clientSentAt: int
@@ -187,6 +192,7 @@ ClientPacket = (
| AdminUserSetRolePacket
| AdminUserBanPacket
| AdminUserUnbanPacket
| AdminUserDeletePacket
| PingPacket
| ItemAddPacket
| ItemPickupPacket
@@ -449,5 +455,6 @@ class AdminActionResultPacket(BasePacket):
"user_set_role",
"user_ban",
"user_unban",
"user_delete",
]
message: str

View File

@@ -63,6 +63,7 @@ from .models import (
AdminRolesListPacket,
AdminRolesListResultPacket,
AdminUserBanPacket,
AdminUserDeletePacket,
AdminUserSetRolePacket,
AdminUserUnbanPacket,
AdminUsersListPacket,
@@ -133,6 +134,7 @@ ADMIN_MENU_ACTION_DEFINITIONS: tuple[dict[str, str], ...] = (
{"id": "change_user_role", "label": "Change user role", "permission": "user.change_role"},
{"id": "ban_user", "label": "Ban user", "permission": "user.ban_unban"},
{"id": "unban_user", "label": "Unban user", "permission": "user.ban_unban"},
{"id": "delete_account", "label": "Delete account", "permission": "account.delete.any"},
)
@@ -1671,6 +1673,7 @@ class SignalingServer:
"user_set_role",
"user_ban",
"user_unban",
"user_delete",
],
message: str,
) -> None:
@@ -1831,6 +1834,7 @@ class SignalingServer:
AdminUserSetRolePacket,
AdminUserBanPacket,
AdminUserUnbanPacket,
AdminUserDeletePacket,
),
):
return False
@@ -1861,6 +1865,7 @@ class SignalingServer:
if not (
self._client_has_permission(client, "user.change_role")
or self._client_has_permission(client, "user.ban_unban")
or self._client_has_permission(client, "account.delete.any")
):
await deny("user_set_role", "Not authorized.")
return True
@@ -2012,6 +2017,34 @@ class SignalingServer:
)
return True
if isinstance(packet, AdminUserDeletePacket):
if not self._client_has_permission(client, "account.delete.any"):
await deny("user_delete", "Not authorized.")
return True
target_id = self.auth_service.get_user_id_by_username(packet.username)
try:
username = self.auth_service.delete_user(packet.username, actor_user_id=client.user_id)
except AuthError as exc:
await deny("user_delete", str(exc))
return True
if target_id:
for active in list(self.clients.values()):
if active.user_id != target_id:
continue
await self._send(
active.websocket,
AuthResultPacket(type="auth_result", ok=False, message="Account deleted."),
)
await active.websocket.close()
LOGGER.info("user deleted actor=%s target=%s", client.user_id, username)
await self._send_admin_action_result(
client,
ok=True,
action="user_delete",
message=f"Deleted account {username}.",
)
return True
return True
async def _handle_message(self, client: ClientConnection, raw_message: str) -> None: