71 lines
2.5 KiB
Python
71 lines
2.5 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import asdict
|
|
|
|
from fastapi import HTTPException, status
|
|
|
|
from app.domain.permissions import build_permission_context
|
|
from app.repositories.directus.client import DirectusClient
|
|
from app.schemas.auth import AuthenticatedUser, PermissionContextRead
|
|
|
|
|
|
class AuthContextService:
|
|
"""Validates Directus-issued tokens and normalizes user context.
|
|
|
|
FastAPI remains the formal API entrypoint, but Directus is still the source
|
|
of truth for login/session/role data. This service is the seam between them.
|
|
"""
|
|
|
|
# Keep the first auth bootstrap intentionally conservative.
|
|
# Directus `/users/me` on this project reliably returns the core user/role
|
|
# fields below, while deeper `user_group.*` expansions can collapse the
|
|
# payload back to only `{id}`. We can layer group/domain permissions back in
|
|
# later once the base admin flow is verified end to end.
|
|
me_fields = [
|
|
"email",
|
|
"first_name",
|
|
"id",
|
|
"role.id",
|
|
"role.name",
|
|
"status",
|
|
]
|
|
|
|
def __init__(self, client: DirectusClient | None = None) -> None:
|
|
self.client = client or DirectusClient()
|
|
|
|
async def get_authenticated_user(self, access_token: str) -> AuthenticatedUser:
|
|
if not access_token:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Missing access token.",
|
|
)
|
|
|
|
me = await self.client.get_current_user(
|
|
access_token=access_token,
|
|
fields=self.me_fields,
|
|
)
|
|
if not me:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Unable to resolve current user from Directus.",
|
|
)
|
|
|
|
user_group = me.get("user_group") or {}
|
|
permissions = user_group.get("domain_permissions") or []
|
|
normalized_permissions = [str(permission) for permission in permissions]
|
|
role = me.get("role") or {}
|
|
permission_context = build_permission_context(
|
|
role_name=role.get("name"),
|
|
domain_permissions=normalized_permissions,
|
|
)
|
|
|
|
# Keep the normalized payload explicit so downstream services do not need
|
|
# to know the exact nested Directus response shape.
|
|
return AuthenticatedUser.model_validate(
|
|
{
|
|
**me,
|
|
"domain_permissions": normalized_permissions,
|
|
"permissions": PermissionContextRead.model_validate(asdict(permission_context)),
|
|
}
|
|
)
|