from fastapi import APIRouter, Depends, Header, HTTPException, status from sqlalchemy.orm import Session from app.core.config import get_settings from app.db.session import get_db from app.repositories.permissions_repo import PermissionsRepository from app.repositories.users_repo import UsersRepository from app.schemas.permissions import PermissionSnapshotResponse from app.schemas.users import UserUpsertBySubRequest from app.services.permission_service import PermissionService router = APIRouter(prefix="/internal", tags=["internal"]) def verify_internal_secret(x_internal_secret: str = Header(alias="X-Internal-Secret")) -> None: settings = get_settings() if not settings.internal_shared_secret: raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="internal_secret_not_configured") if x_internal_secret != settings.internal_shared_secret: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid_internal_secret") @router.post("/users/upsert-by-sub") def upsert_user_by_sub( payload: UserUpsertBySubRequest, _: None = Depends(verify_internal_secret), db: Session = Depends(get_db), ) -> dict[str, str | bool | None]: repo = UsersRepository(db) user = repo.upsert_by_sub( authentik_sub=payload.sub, email=payload.email, display_name=payload.display_name, is_active=payload.is_active, ) return { "id": user.id, "sub": user.authentik_sub, "email": user.email, "display_name": user.display_name, "is_active": user.is_active, } @router.get("/permissions/{authentik_sub}/snapshot", response_model=PermissionSnapshotResponse) def get_permission_snapshot( authentik_sub: str, _: None = Depends(verify_internal_secret), db: Session = Depends(get_db), ) -> PermissionSnapshotResponse: users_repo = UsersRepository(db) perms_repo = PermissionsRepository(db) user = users_repo.get_by_sub(authentik_sub) if user is None: return PermissionSnapshotResponse(authentik_sub=authentik_sub, permissions=[]) permissions = perms_repo.list_by_user_id(user.id) tuples = [(p.scope_type, p.scope_id, p.module, p.action) for p in permissions] return PermissionService.build_snapshot(authentik_sub=authentik_sub, permissions=tuples)