32 lines
1.1 KiB
Python
32 lines
1.1 KiB
Python
from fastapi import Depends, HTTPException, status
|
|
|
|
from app.core.config import get_settings
|
|
from app.schemas.auth import ProviderPrincipal
|
|
from app.security.idp_jwt import require_authenticated_principal
|
|
|
|
|
|
def _normalize_roles(values: set[str]) -> set[str]:
|
|
normalized: set[str] = set()
|
|
for value in values:
|
|
role = value.strip().lower()
|
|
if role:
|
|
normalized.add(role)
|
|
return normalized
|
|
|
|
|
|
def require_admin_principal(
|
|
principal: ProviderPrincipal = Depends(require_authenticated_principal),
|
|
) -> ProviderPrincipal:
|
|
settings = get_settings()
|
|
required_roles = _normalize_roles(set(settings.admin_required_realm_roles))
|
|
|
|
if not required_roles:
|
|
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="admin_policy_not_configured")
|
|
|
|
principal_roles = _normalize_roles(set(principal.realm_roles))
|
|
role_ok = bool(required_roles.intersection(principal_roles))
|
|
|
|
if not role_ok:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="admin_forbidden")
|
|
return principal
|