Switch access control from groups to realm roles
This commit is contained in:
@@ -5,33 +5,27 @@ from app.schemas.auth import ProviderPrincipal
|
||||
from app.security.idp_jwt import require_authenticated_principal
|
||||
|
||||
|
||||
def _expand_group_aliases(groups: set[str]) -> set[str]:
|
||||
expanded: set[str] = set()
|
||||
for group in groups:
|
||||
value = group.strip().lower()
|
||||
if not value:
|
||||
continue
|
||||
expanded.add(value)
|
||||
stripped = value.lstrip("/")
|
||||
if stripped:
|
||||
expanded.add(stripped)
|
||||
if "/" in stripped:
|
||||
expanded.add(stripped.rsplit("/", 1)[-1])
|
||||
return expanded
|
||||
def _normalize_roles(values: set[str]) -> set[str]:
|
||||
normalized: set[str] = set()
|
||||
for value in values:
|
||||
role = value.strip().lower()
|
||||
if role:
|
||||
normalized.add(role)
|
||||
return normalized
|
||||
|
||||
|
||||
def require_admin_principal(
|
||||
principal: ProviderPrincipal = Depends(require_authenticated_principal),
|
||||
) -> ProviderPrincipal:
|
||||
settings = get_settings()
|
||||
required_groups = _expand_group_aliases(set(settings.admin_required_groups))
|
||||
required_roles = _normalize_roles(set(settings.admin_required_realm_roles))
|
||||
|
||||
if not required_groups:
|
||||
if not required_roles:
|
||||
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="admin_policy_not_configured")
|
||||
|
||||
principal_groups = _expand_group_aliases(set(principal.groups))
|
||||
group_ok = bool(required_groups.intersection(principal_groups))
|
||||
principal_roles = _normalize_roles(set(principal.realm_roles))
|
||||
role_ok = bool(required_roles.intersection(principal_roles))
|
||||
|
||||
if not group_ok:
|
||||
if not role_ok:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="admin_forbidden")
|
||||
return principal
|
||||
|
||||
@@ -31,6 +31,7 @@ class ProviderTokenVerifier:
|
||||
admin_realm: str | None,
|
||||
admin_client_id: str | None,
|
||||
admin_client_secret: str | None,
|
||||
member_required_realm_roles: list[str],
|
||||
) -> None:
|
||||
self.issuer = issuer.strip() if issuer else None
|
||||
self.jwks_url = jwks_url.strip() if jwks_url else self._infer_jwks_url(self.issuer)
|
||||
@@ -63,6 +64,7 @@ class ProviderTokenVerifier:
|
||||
self._admin_token_cached: str | None = None
|
||||
self._admin_token_expires_at: float = 0
|
||||
self._principal_cache: dict[str, tuple[float, ProviderPrincipal]] = {}
|
||||
self.member_required_realm_roles = {r.strip().lower() for r in member_required_realm_roles if r and r.strip()}
|
||||
|
||||
@staticmethod
|
||||
def _infer_introspection_endpoint(issuer: str | None) -> str | None:
|
||||
@@ -151,6 +153,7 @@ class ProviderTokenVerifier:
|
||||
name=name,
|
||||
preferred_username=preferred_username,
|
||||
groups=groups,
|
||||
realm_roles=principal.realm_roles,
|
||||
)
|
||||
return self._enrich_groups_from_admin(enriched)
|
||||
|
||||
@@ -233,8 +236,16 @@ class ProviderTokenVerifier:
|
||||
name=principal.name,
|
||||
preferred_username=principal.preferred_username,
|
||||
groups=groups,
|
||||
realm_roles=principal.realm_roles,
|
||||
)
|
||||
|
||||
def _require_member_role(self, principal: ProviderPrincipal) -> None:
|
||||
if not self.member_required_realm_roles:
|
||||
return
|
||||
user_roles = {r.strip().lower() for r in principal.realm_roles if isinstance(r, str) and r.strip()}
|
||||
if not user_roles.intersection(self.member_required_realm_roles):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="member_forbidden")
|
||||
|
||||
def verify_access_token(self, token: str) -> ProviderPrincipal:
|
||||
now = time.time()
|
||||
cached = self._principal_cache.get(token)
|
||||
@@ -289,8 +300,18 @@ class ProviderTokenVerifier:
|
||||
name=claims.get("name"),
|
||||
preferred_username=claims.get("preferred_username"),
|
||||
groups=[str(g) for g in claims.get("groups", []) if str(g)] if isinstance(claims.get("groups"), list) else [],
|
||||
realm_roles=[
|
||||
str(r)
|
||||
for r in (
|
||||
claims.get("realm_access", {}).get("roles", [])
|
||||
if isinstance(claims.get("realm_access"), dict)
|
||||
else []
|
||||
)
|
||||
if str(r)
|
||||
],
|
||||
)
|
||||
enriched = self._enrich_from_userinfo(principal, token)
|
||||
self._require_member_role(enriched)
|
||||
|
||||
exp = claims.get("exp")
|
||||
if isinstance(exp, int):
|
||||
@@ -322,6 +343,7 @@ def _get_verifier() -> ProviderTokenVerifier:
|
||||
admin_realm=settings.keycloak_admin_realm,
|
||||
admin_client_id=settings.keycloak_admin_client_id,
|
||||
admin_client_secret=settings.keycloak_admin_client_secret,
|
||||
member_required_realm_roles=settings.member_required_realm_roles,
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user