"""User context helpers for flask-more-smorest.
This module exposes runtime user lookup helpers and role checks.
All configuration is driven through ``init_fms`` in ``user_registry``.
"""
from __future__ import annotations
import uuid
from collections.abc import Callable
from contextlib import suppress
from typing import TYPE_CHECKING, Literal, TypeVar, cast, overload
# Admin role type - constrained to valid admin roles only
AdminRole = Literal["ADMIN", "SUPERADMIN"]
# Role constants - uppercase values
ROLE_ADMIN: AdminRole = "ADMIN"
ROLE_SUPERADMIN: AdminRole = "SUPERADMIN"
if TYPE_CHECKING:
from .models.abstract_user import AbstractUser
GetCurrentUserFunc = Callable[[], AbstractUser | None]
else: # pragma: no cover - runtime placeholder
AbstractUser = object # type: ignore[assignment]
GetCurrentUserFunc = Callable[[], object | None] # type: ignore[assignment]
UserT = TypeVar("UserT", bound="AbstractUser")
@overload
def get_current_user() -> AbstractUser | None: ...
@overload
def get_current_user(user_type: type[UserT]) -> UserT | None: ...
[docs]
def get_current_user(user_type: type[UserT] | None = None) -> UserT | AbstractUser | None:
"""Get the current authenticated user.
Resolution order:
1. Registered custom getter (via init_fms)
2. Default: JWT-based authentication (built-in)
Args:
user_type: Optional user class for typed return.
If None, returns AbstractUser | None.
Returns:
Current user instance if authenticated, None otherwise
"""
from .user_registry import get_current_user_func
get_user_func = get_current_user_func()
if get_user_func is not None:
user = get_user_func()
else:
# Fall back to built-in JWT authentication
from flask_jwt_extended import current_user as jwt_current_user
from flask_jwt_extended import exceptions, verify_jwt_in_request
try:
verify_jwt_in_request()
except exceptions.JWTExtendedException:
return None
except Exception:
return None
try:
return cast("AbstractUser | None", jwt_current_user._get_current_object())
except (AttributeError, RuntimeError):
return None
if user_type is not None:
if user is None:
return None
if not isinstance(user, user_type):
return None
# Explicit cast to help mypy's incremental mode understand the narrowing
return cast(UserT, user) # type: ignore[redundant-cast]
return user
[docs]
def get_current_user_id() -> uuid.UUID | None:
"""Get the current authenticated user's ID."""
user = get_current_user()
if user is None:
return None
user_id = getattr(user, "id", None)
if user_id is None:
return None
# Handle Mapped[UUID] by extracting the value
with suppress(Exception):
from sqlalchemy.orm.attributes import InstrumentedAttribute
if isinstance(user_id, InstrumentedAttribute):
return cast(uuid.UUID, user_id.property.class_.impl.type.python_type(user_id))
return cast(uuid.UUID, user_id)
[docs]
def is_current_user_admin() -> bool:
"""Check if the current user is an admin."""
user = get_current_user()
if user is None:
return False
return bool(user.has_role(ROLE_ADMIN) or user.has_role(ROLE_SUPERADMIN))
[docs]
def is_current_user_superadmin() -> bool:
"""Check if the current user is a superadmin."""
user = get_current_user()
if user is None:
return False
return bool(user.has_role(ROLE_SUPERADMIN))