from __future__ import annotations from functools import lru_cache import jwt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from app.core.config import get_settings from app.schemas.auth import AuthentikPrincipal bearer_scheme = HTTPBearer(auto_error=False) class AuthentikTokenVerifier: def __init__(self, issuer: str | None, jwks_url: str | None, audience: str | None) -> None: self.issuer = issuer.strip() if issuer else None self.jwks_url = jwks_url.strip() if jwks_url else self._infer_jwks_url(self.issuer) self.audience = audience.strip() if audience else None if not self.jwks_url: raise ValueError("AUTHENTIK_JWKS_URL or AUTHENTIK_ISSUER is required") self._jwk_client = jwt.PyJWKClient(self.jwks_url) @staticmethod def _infer_jwks_url(issuer: str | None) -> str | None: if not issuer: return None normalized = issuer.rstrip("/") + "/" if normalized.endswith("/jwks/"): return normalized return normalized + "jwks/" def verify_access_token(self, token: str) -> AuthentikPrincipal: try: signing_key = self._jwk_client.get_signing_key_from_jwt(token) options = { "verify_signature": True, "verify_exp": True, "verify_aud": bool(self.audience), "verify_iss": bool(self.issuer), } claims = jwt.decode( token, signing_key.key, algorithms=["RS256", "RS384", "RS512"], audience=self.audience, issuer=self.issuer, options=options, ) except Exception as exc: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid_bearer_token") from exc sub = claims.get("sub") if not sub: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="token_missing_sub") return AuthentikPrincipal( sub=sub, email=claims.get("email"), name=claims.get("name"), preferred_username=claims.get("preferred_username"), ) @lru_cache def _get_verifier() -> AuthentikTokenVerifier: settings = get_settings() return AuthentikTokenVerifier( issuer=settings.authentik_issuer, jwks_url=settings.authentik_jwks_url, audience=settings.authentik_audience, ) def require_authenticated_principal( credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme), ) -> AuthentikPrincipal: if credentials is None or credentials.scheme.lower() != "bearer": raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="missing_bearer_token") verifier = _get_verifier() return verifier.verify_access_token(credentials.credentials)