fix(auth): resolve admin groups via keycloak admin API when token lacks groups

This commit is contained in:
Chris
2026-04-03 00:28:32 +08:00
parent fd55d90a44
commit 81085e1844

View File

@@ -26,6 +26,10 @@ class KeycloakTokenVerifier:
base_url: str | None,
userinfo_endpoint: str | None,
verify_tls: bool,
realm: str | None,
admin_realm: str | None,
admin_client_id: str | None,
admin_client_secret: str | None,
) -> None:
self.issuer = issuer.strip() if issuer else None
self.jwks_url = jwks_url.strip() if jwks_url else self._infer_jwks_url(self.issuer)
@@ -33,6 +37,10 @@ class KeycloakTokenVerifier:
self.client_id = client_id.strip() if client_id else None
self.client_secret = client_secret.strip() if client_secret else None
self.base_url = base_url.strip() if base_url else None
self.realm = realm.strip() if realm else None
self.admin_realm = admin_realm.strip() if admin_realm else self.realm
self.admin_client_id = admin_client_id.strip() if admin_client_id else None
self.admin_client_secret = admin_client_secret.strip() if admin_client_secret else None
self.userinfo_endpoint = (
userinfo_endpoint.strip() if userinfo_endpoint else self._infer_userinfo_endpoint(self.issuer, self.base_url)
)
@@ -95,7 +103,7 @@ class KeycloakTokenVerifier:
if principal.email and (principal.name or principal.preferred_username) and principal.groups:
return principal
if not self.userinfo_endpoint:
return principal
return self._enrich_groups_from_admin(principal)
try:
resp = httpx.get(
@@ -105,15 +113,15 @@ class KeycloakTokenVerifier:
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
)
except Exception:
return principal
return self._enrich_groups_from_admin(principal)
if resp.status_code >= 400:
return principal
return self._enrich_groups_from_admin(principal)
data = resp.json() if resp.content else {}
sub = data.get("sub")
if isinstance(sub, str) and sub and sub != principal.sub:
return principal
return self._enrich_groups_from_admin(principal)
email = principal.email or (data.get("email") if isinstance(data.get("email"), str) else None)
name = principal.name or (data.get("name") if isinstance(data.get("name"), str) else None)
@@ -124,13 +132,85 @@ class KeycloakTokenVerifier:
payload_groups = data.get("groups")
if isinstance(payload_groups, list):
groups = [str(g) for g in payload_groups if str(g)]
return KeycloakPrincipal(
enriched = KeycloakPrincipal(
sub=principal.sub,
email=email,
name=name,
preferred_username=preferred_username,
groups=groups,
)
return self._enrich_groups_from_admin(enriched)
def _get_admin_token(self) -> str | None:
if (
not self.base_url
or not self.admin_realm
or not self.admin_client_id
or not self.admin_client_secret
):
return None
token_endpoint = f"{self.base_url}/realms/{self.admin_realm}/protocol/openid-connect/token"
try:
resp = httpx.post(
token_endpoint,
data={
"grant_type": "client_credentials",
"client_id": self.admin_client_id,
"client_secret": self.admin_client_secret,
},
timeout=6,
verify=self.verify_tls,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
except Exception:
return None
if resp.status_code >= 400:
return None
token = resp.json().get("access_token")
return str(token) if token else None
def _enrich_groups_from_admin(self, principal: KeycloakPrincipal) -> KeycloakPrincipal:
if principal.groups:
return principal
if not self.base_url or not self.realm:
return principal
admin_token = self._get_admin_token()
if not admin_token:
return principal
try:
resp = httpx.get(
f"{self.base_url}/admin/realms/{self.realm}/users/{principal.sub}/groups",
timeout=6,
verify=self.verify_tls,
headers={"Authorization": f"Bearer {admin_token}", "Accept": "application/json"},
)
except Exception:
return principal
if resp.status_code >= 400:
return principal
payload = resp.json() if resp.content else []
groups: list[str] = []
if isinstance(payload, list):
for item in payload:
if not isinstance(item, dict):
continue
path = item.get("path")
name = item.get("name")
if isinstance(path, str) and path:
groups.append(path)
elif isinstance(name, str) and name:
groups.append(name)
if not groups:
return principal
return KeycloakPrincipal(
sub=principal.sub,
email=principal.email,
name=principal.name,
preferred_username=principal.preferred_username,
groups=groups,
)
def verify_access_token(self, token: str) -> KeycloakPrincipal:
try:
@@ -197,6 +277,10 @@ def _get_verifier() -> KeycloakTokenVerifier:
base_url=settings.idp_base_url,
userinfo_endpoint=settings.idp_userinfo_endpoint,
verify_tls=settings.idp_verify_tls,
realm=settings.keycloak_realm,
admin_realm=settings.keycloak_admin_realm,
admin_client_id=settings.keycloak_admin_client_id,
admin_client_secret=settings.keycloak_admin_client_secret,
)