from __future__ import annotations from dataclasses import dataclass import httpx from fastapi import HTTPException, status from app.core.config import Settings @dataclass class AuthentikSyncResult: user_id: int action: str authentik_sub: str | None = None class AuthentikAdminService: def __init__(self, settings: Settings) -> None: self.base_url = settings.authentik_base_url.rstrip("/") self.admin_token = settings.authentik_admin_token self.verify_tls = settings.authentik_verify_tls if not self.base_url or not self.admin_token: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="authentik_admin_not_configured", ) def _client(self) -> httpx.Client: return httpx.Client( base_url=self.base_url, headers={ "Authorization": f"Bearer {self.admin_token}", "Accept": "application/json", "Content-Type": "application/json", }, timeout=10, verify=self.verify_tls, ) @staticmethod def _safe_username(sub: str, email: str) -> str: if email and "@" in email: return email.split("@", 1)[0] return sub.replace("|", "_")[:150] def ensure_user(self, sub: str, email: str, display_name: str | None, is_active: bool = True) -> AuthentikSyncResult: payload = { "username": self._safe_username(sub=sub, email=email), "name": display_name or email, "email": email, "is_active": is_active, } with self._client() as client: resp = client.get("/api/v3/core/users/", params={"email": email}) if resp.status_code >= 400: raise HTTPException(status_code=502, detail="authentik_lookup_failed") data = resp.json() results = data.get("results") if isinstance(data, dict) else None existing = results[0] if isinstance(results, list) and results else None if existing and existing.get("pk") is not None: user_pk = int(existing["pk"]) patch_resp = client.patch(f"/api/v3/core/users/{user_pk}/", json=payload) if patch_resp.status_code >= 400: raise HTTPException(status_code=502, detail="authentik_update_failed") return AuthentikSyncResult(user_id=user_pk, action="updated", authentik_sub=existing.get("uid")) create_resp = client.post("/api/v3/core/users/", json=payload) if create_resp.status_code >= 400: raise HTTPException(status_code=502, detail="authentik_create_failed") created = create_resp.json() return AuthentikSyncResult( user_id=int(created["pk"]), action="created", authentik_sub=created.get("uid"), )