diff --git a/app/security/idp_jwt.py b/app/security/idp_jwt.py index d6bf1c5..4aefe8e 100644 --- a/app/security/idp_jwt.py +++ b/app/security/idp_jwt.py @@ -26,6 +26,10 @@ class KeycloakTokenVerifier: base_url: str | None, userinfo_endpoint: str | None, verify_tls: bool, + realm: str | None, + admin_realm: str | None, + admin_client_id: str | None, + admin_client_secret: str | None, ) -> None: self.issuer = issuer.strip() if issuer else None self.jwks_url = jwks_url.strip() if jwks_url else self._infer_jwks_url(self.issuer) @@ -33,6 +37,10 @@ class KeycloakTokenVerifier: self.client_id = client_id.strip() if client_id else None self.client_secret = client_secret.strip() if client_secret else None self.base_url = base_url.strip() if base_url else None + self.realm = realm.strip() if realm else None + self.admin_realm = admin_realm.strip() if admin_realm else self.realm + self.admin_client_id = admin_client_id.strip() if admin_client_id else None + self.admin_client_secret = admin_client_secret.strip() if admin_client_secret else None self.userinfo_endpoint = ( userinfo_endpoint.strip() if userinfo_endpoint else self._infer_userinfo_endpoint(self.issuer, self.base_url) ) @@ -95,7 +103,7 @@ class KeycloakTokenVerifier: if principal.email and (principal.name or principal.preferred_username) and principal.groups: return principal if not self.userinfo_endpoint: - return principal + return self._enrich_groups_from_admin(principal) try: resp = httpx.get( @@ -105,15 +113,15 @@ class KeycloakTokenVerifier: headers={"Authorization": f"Bearer {token}", "Accept": "application/json"}, ) except Exception: - return principal + return self._enrich_groups_from_admin(principal) if resp.status_code >= 400: - return principal + return self._enrich_groups_from_admin(principal) data = resp.json() if resp.content else {} sub = data.get("sub") if isinstance(sub, str) and sub and sub != principal.sub: - return principal + return self._enrich_groups_from_admin(principal) email = principal.email or (data.get("email") if isinstance(data.get("email"), str) else None) name = principal.name or (data.get("name") if isinstance(data.get("name"), str) else None) @@ -124,13 +132,85 @@ class KeycloakTokenVerifier: payload_groups = data.get("groups") if isinstance(payload_groups, list): groups = [str(g) for g in payload_groups if str(g)] - return KeycloakPrincipal( + enriched = KeycloakPrincipal( sub=principal.sub, email=email, name=name, preferred_username=preferred_username, groups=groups, ) + return self._enrich_groups_from_admin(enriched) + + def _get_admin_token(self) -> str | None: + if ( + not self.base_url + or not self.admin_realm + or not self.admin_client_id + or not self.admin_client_secret + ): + return None + token_endpoint = f"{self.base_url}/realms/{self.admin_realm}/protocol/openid-connect/token" + try: + resp = httpx.post( + token_endpoint, + data={ + "grant_type": "client_credentials", + "client_id": self.admin_client_id, + "client_secret": self.admin_client_secret, + }, + timeout=6, + verify=self.verify_tls, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + except Exception: + return None + if resp.status_code >= 400: + return None + token = resp.json().get("access_token") + return str(token) if token else None + + def _enrich_groups_from_admin(self, principal: KeycloakPrincipal) -> KeycloakPrincipal: + if principal.groups: + return principal + if not self.base_url or not self.realm: + return principal + admin_token = self._get_admin_token() + if not admin_token: + return principal + + try: + resp = httpx.get( + f"{self.base_url}/admin/realms/{self.realm}/users/{principal.sub}/groups", + timeout=6, + verify=self.verify_tls, + headers={"Authorization": f"Bearer {admin_token}", "Accept": "application/json"}, + ) + except Exception: + return principal + if resp.status_code >= 400: + return principal + + payload = resp.json() if resp.content else [] + groups: list[str] = [] + if isinstance(payload, list): + for item in payload: + if not isinstance(item, dict): + continue + path = item.get("path") + name = item.get("name") + if isinstance(path, str) and path: + groups.append(path) + elif isinstance(name, str) and name: + groups.append(name) + if not groups: + return principal + return KeycloakPrincipal( + sub=principal.sub, + email=principal.email, + name=principal.name, + preferred_username=principal.preferred_username, + groups=groups, + ) def verify_access_token(self, token: str) -> KeycloakPrincipal: try: @@ -197,6 +277,10 @@ def _get_verifier() -> KeycloakTokenVerifier: base_url=settings.idp_base_url, userinfo_endpoint=settings.idp_userinfo_endpoint, verify_tls=settings.idp_verify_tls, + realm=settings.keycloak_realm, + admin_realm=settings.keycloak_admin_realm, + admin_client_id=settings.keycloak_admin_client_id, + admin_client_secret=settings.keycloak_admin_client_secret, )