Files
member-backend/app/api/internal.py

98 lines
3.7 KiB
Python

from fastapi import APIRouter, Depends, Header, HTTPException, status
from sqlalchemy.orm import Session
from app.core.config import get_settings
from app.db.session import get_db
from app.repositories.permissions_repo import PermissionsRepository
from app.repositories.users_repo import UsersRepository
from app.schemas.authentik_admin import AuthentikEnsureUserRequest, AuthentikEnsureUserResponse
from app.schemas.permissions import PermissionSnapshotResponse
from app.schemas.users import UserUpsertBySubRequest
from app.services.authentik_admin_service import AuthentikAdminService
from app.services.permission_service import PermissionService
router = APIRouter(prefix="/internal", tags=["internal"])
def verify_internal_secret(x_internal_secret: str = Header(alias="X-Internal-Secret")) -> None:
settings = get_settings()
if not settings.internal_shared_secret:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="internal_secret_not_configured")
if x_internal_secret != settings.internal_shared_secret:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid_internal_secret")
@router.post("/users/upsert-by-sub")
def upsert_user_by_sub(
payload: UserUpsertBySubRequest,
_: None = Depends(verify_internal_secret),
db: Session = Depends(get_db),
) -> dict[str, str | bool | None]:
repo = UsersRepository(db)
user = repo.upsert_by_sub(
authentik_sub=payload.sub,
username=payload.username,
email=payload.email,
display_name=payload.display_name,
is_active=payload.is_active,
)
return {
"id": user.id,
"sub": user.authentik_sub,
"authentik_user_id": user.authentik_user_id,
"username": user.username,
"email": user.email,
"display_name": user.display_name,
"is_active": user.is_active,
}
@router.get("/permissions/{authentik_sub}/snapshot", response_model=PermissionSnapshotResponse)
def get_permission_snapshot(
authentik_sub: str,
_: None = Depends(verify_internal_secret),
db: Session = Depends(get_db),
) -> PermissionSnapshotResponse:
users_repo = UsersRepository(db)
perms_repo = PermissionsRepository(db)
user = users_repo.get_by_sub(authentik_sub)
if user is None:
return PermissionSnapshotResponse(authentik_sub=authentik_sub, permissions=[])
permissions = perms_repo.list_by_user(user.id, user.authentik_sub)
return PermissionService.build_snapshot(authentik_sub=authentik_sub, permissions=permissions)
@router.post("/authentik/users/ensure", response_model=AuthentikEnsureUserResponse)
def ensure_authentik_user(
payload: AuthentikEnsureUserRequest,
_: None = Depends(verify_internal_secret),
db: Session = Depends(get_db),
) -> AuthentikEnsureUserResponse:
settings = get_settings()
authentik_service = AuthentikAdminService(settings=settings)
sync_result = authentik_service.ensure_user(
sub=payload.sub,
email=payload.email,
username=payload.username,
display_name=payload.display_name,
is_active=payload.is_active,
)
users_repo = UsersRepository(db)
resolved_sub = payload.sub or ""
if sync_result.authentik_sub:
resolved_sub = sync_result.authentik_sub
if not resolved_sub:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail="authentik_missing_sub")
users_repo.upsert_by_sub(
authentik_sub=resolved_sub,
username=payload.username,
email=payload.email,
display_name=payload.display_name,
is_active=payload.is_active,
authentik_user_id=sync_result.user_id,
)
return AuthentikEnsureUserResponse(authentik_user_id=sync_result.user_id, action=sync_result.action)