feat(security): enforce admin allowlist guard on admin APIs and attach bearer for admin client
This commit is contained in:
26
app/security/admin_guard.py
Normal file
26
app/security/admin_guard.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from fastapi import Depends, HTTPException, status
|
||||
|
||||
from app.core.config import get_settings
|
||||
from app.schemas.auth import AuthentikPrincipal
|
||||
from app.security.authentik_jwt import require_authenticated_principal
|
||||
|
||||
|
||||
def require_admin_principal(
|
||||
principal: AuthentikPrincipal = Depends(require_authenticated_principal),
|
||||
) -> AuthentikPrincipal:
|
||||
settings = get_settings()
|
||||
allowed_emails = {email.lower() for email in settings.admin_allowlist_emails}
|
||||
allowed_subs = set(settings.admin_allowlist_subs)
|
||||
required_groups = {group.lower() for group in settings.admin_required_groups}
|
||||
|
||||
if not allowed_emails and not allowed_subs and not required_groups:
|
||||
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="admin_policy_not_configured")
|
||||
|
||||
email_ok = bool(principal.email and principal.email.lower() in allowed_emails)
|
||||
sub_ok = principal.sub in allowed_subs
|
||||
principal_groups = {group.lower() for group in principal.groups}
|
||||
group_ok = bool(required_groups.intersection(principal_groups))
|
||||
|
||||
if not (email_ok or sub_ok or group_ok):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="admin_forbidden")
|
||||
return principal
|
||||
@@ -63,7 +63,7 @@ class AuthentikTokenVerifier:
|
||||
return None
|
||||
|
||||
def _enrich_from_userinfo(self, principal: AuthentikPrincipal, token: str) -> AuthentikPrincipal:
|
||||
if principal.email and (principal.name or principal.preferred_username):
|
||||
if principal.email and (principal.name or principal.preferred_username) and principal.groups:
|
||||
return principal
|
||||
if not self.userinfo_endpoint:
|
||||
return principal
|
||||
@@ -91,11 +91,16 @@ class AuthentikTokenVerifier:
|
||||
preferred_username = principal.preferred_username or (
|
||||
data.get("preferred_username") if isinstance(data.get("preferred_username"), str) else None
|
||||
)
|
||||
groups = principal.groups
|
||||
payload_groups = data.get("groups")
|
||||
if isinstance(payload_groups, list):
|
||||
groups = [str(g) for g in payload_groups if str(g)]
|
||||
return AuthentikPrincipal(
|
||||
sub=principal.sub,
|
||||
email=email,
|
||||
name=name,
|
||||
preferred_username=preferred_username,
|
||||
groups=groups,
|
||||
)
|
||||
|
||||
def verify_access_token(self, token: str) -> AuthentikPrincipal:
|
||||
@@ -142,6 +147,7 @@ class AuthentikTokenVerifier:
|
||||
email=claims.get("email"),
|
||||
name=claims.get("name"),
|
||||
preferred_username=claims.get("preferred_username"),
|
||||
groups=[str(g) for g in claims.get("groups", []) if str(g)] if isinstance(claims.get("groups"), list) else [],
|
||||
)
|
||||
return self._enrich_from_userinfo(principal, token)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user