Source code for flask_more_smorest.perms.models.user

"""User model for Flask-More-Smorest authentication system.

Provides User model with email/password auth, roles, settings, and tokens.
"""

from __future__ import annotations

import enum
import logging
import uuid
from typing import TypeVar, cast

from flask_jwt_extended import current_user as jwt_current_user
from flask_jwt_extended import exceptions, verify_jwt_in_request

from ...error.exceptions import UnprocessableEntity
from ...utils import check_password_hash, generate_password_hash
from ..user_context import AdminRole
from .abstract_user import AbstractUser

logger = logging.getLogger(__name__)

UserModelT = TypeVar("UserModelT", bound="User")

# TODO: should probably not have this top-level proxy here and only use get_current_user()
# current_user: LocalProxy[AbstractUser] = cast("LocalProxy[AbstractUser]", jwt_current_user)


def _get_jwt_current_user() -> AbstractUser | None:
    """Get current authenticated user via JWT.

    This is used as the default fallback when no custom function is registered.
    Applications should use get_current_user() from user_context instead.

    Returns:
        Current user instance if authenticated, None otherwise
    """
    try:
        verify_jwt_in_request()
    except exceptions.JWTExtendedException:
        return None
    except Exception as e:
        logger.exception("Error verifying JWT for current user: %s", e)
        return None

    # Resolve LocalProxy to get the actual user object
    try:
        resolved = jwt_current_user._get_current_object()
        return cast("AbstractUser | None", resolved)
    except (AttributeError, RuntimeError):
        return None


[docs] class User(AbstractUser): """User model with email/password auth, roles, and domain support. This is a concrete implementation of AbstractUser. For customization, subclass AbstractUser instead of this class. Example: .. code-block:: python from flask_more_smorest.perms.models import AbstractUser class CustomUser(AbstractUser): __tablename__ = "user" bio: Mapped[str | None] = mapped_column(db.String(500)) age: Mapped[int | None] = mapped_column(db.Integer) def _can_write(self, user) -> bool: if self.age and self.age < 18: return False # Minors can't edit return super()._can_write(user) @property def is_adult(self) -> bool: return self.age is not None and self.age >= 18 """ __tablename__ = "user" __table_args__ = {"extend_existing": True} # noqa: RUF012 PUBLIC_REGISTRATION: bool = False def __init__(self, **kwargs: object): """Create new user with optional password hashing.""" password = kwargs.pop("password", None) super().__init__(**kwargs) if password: if not isinstance(password, str): raise TypeError("Password must be a string") self.set_password(password)
[docs] @classmethod def get_current_user(cls: type[UserModelT]) -> UserModelT | None: """Get the current authenticated user of this User subclass. This provides zero-boilerplate typed access to the current user. Uses the application's configured authentication (JWT or custom getter). Returns: Current user instance of this User subclass if authenticated, None otherwise Example: >>> user = AbstractUser.get_current_user() >>> user = MyCustomUser.get_current_user() """ from ..user_context import get_current_user return cast("UserModelT | None", get_current_user(cls))
[docs] def normalize_email(self, email: str | None) -> str | None: """Normalize email to lowercase for case-insensitive lookups. Emails are automatically converted to lowercase when set, ensuring: - Case-insensitive login (user@example.com == USER@EXAMPLE.COM) - Prevention of duplicate registrations with different cases - Efficient database queries using the email index - Consistent email storage throughout the application Args: email: Email address to normalize Returns: Lowercase email address, or None if email is None """ return email.lower() if email else None
[docs] def set_password(self, password: str) -> None: """Set password with secure hashing.""" self.password = generate_password_hash(password)
[docs] def is_password_correct(self, password: str) -> bool: """Check if provided password matches stored hash.""" if self.password is None: return False return isinstance(password, str) and check_password_hash(password=password, hashed=self.password)
[docs] def update(self, commit: bool = True, **kwargs: str | int | float | bool | bytes | None) -> None: """Update user with password handling.""" password = kwargs.pop("password", None) old_password = kwargs.pop("old_password", None) if password and not getattr(self, "perms_disabled", False): if old_password is None: raise UnprocessableEntity( fields={"old_password": "Cannot be empty"}, message="Must provide old_password to set new password", ) if not self.is_password_correct(str(old_password)): raise UnprocessableEntity( message="Old password is incorrect", fields={"old_password": "Old password is incorrect"}, location="json", ) super().update(commit=False, **kwargs) if password: self.set_password(str(password)) self.save(commit=commit)
@property def is_admin(self) -> bool: """Check if user has admin privileges.""" from ..user_context import ROLE_ADMIN, ROLE_SUPERADMIN return self.has_role(ROLE_ADMIN) or self.has_role(ROLE_SUPERADMIN) @property def is_superadmin(self) -> bool: """Check if user has superadmin privileges.""" from ..user_context import ROLE_SUPERADMIN return self.has_role(ROLE_SUPERADMIN)
[docs] def has_role(self, role: AdminRole | str | enum.Enum, domain_name: str | None = None) -> bool: """Check if user has specified role, optionally scoped to domain. Args: role: Role to check (string or enum value) domain_name: Optional domain name to scope the check Returns: True if user has the role, False otherwise Example: >>> user.has_role("ADMIN") True >>> user.has_role("ADMIN", domain_name="main") True """ # Normalize role to uppercase string for comparison # This handles both enum values (already uppercase) and string inputs role_str = role.value.upper() if isinstance(role, enum.Enum) else str(role).upper() roles = self.roles return bool( any( r.role == role_str and (domain_name is None or r.domain is None or r.domain.name == domain_name or r.domain.name == "*") for r in roles ) )
[docs] def list_roles(self) -> list[str]: """List user roles as strings.""" roles = self.roles return [r.role for r in roles]
def _can_read(self, user: AbstractUser | None) -> bool: """Default read permission: users can read their own profile. Args: user: The current authenticated user, or None """ if not user: return False try: return self.id == user.id except Exception: return False def _can_write(self, user: AbstractUser | None) -> bool: """Default write permission: users can edit their own profile. Args: user: The current authenticated user, or None """ if not user: return False try: if self.id == user.id: return True if self.is_admin: return user.is_superadmin return user.is_admin except Exception: return False def _can_create(self, user: AbstractUser | None) -> bool: """Default create permission: admins can create users, or public registration if enabled. Args: user: The current authenticated user, or None """ # Check if public registration is enabled on the class if getattr(self.__class__, "PUBLIC_REGISTRATION", False): return True return user is not None and user.is_admin # Concrete methods that use relationships - available to all User models @property def num_tokens(self) -> int: """Get number of tokens for this user.""" return len(self.tokens) @property def domain_ids(self) -> set[uuid.UUID | str]: """Return set of domain IDs the user has roles for.""" return {r.domain_id or "*" for r in self.roles}
[docs] def has_domain_access(self, domain_id: uuid.UUID | None) -> bool: """Check if user has access to a specific domain. Users have access to a domain if they have any role associated with that domain, or if they have a wildcard role (*). Superadmins automatically have access. Args: domain_id: Domain UUID to check access for, or None for global access Returns: True if user has access to the domain, False otherwise Example: >>> user.has_domain_access(domain_id) True >>> user.has_domain_access(None) # Global access check True """ return domain_id is None or domain_id in self.domain_ids or "*" in self.domain_ids
# Import uuid for type annotations