from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.orm import Session from app.db.session import get_db from app.models.api_client import ApiClient from app.repositories.companies_repo import CompaniesRepository from app.repositories.modules_repo import ModulesRepository from app.repositories.permissions_repo import PermissionsRepository from app.repositories.sites_repo import SitesRepository from app.repositories.systems_repo import SystemsRepository from app.repositories.users_repo import UsersRepository from app.schemas.permissions import ( DirectPermissionListResponse, DirectPermissionRow, PermissionGrantRequest, PermissionRevokeRequest, ) from app.security.api_client_auth import require_api_client from app.security.admin_guard import require_admin_principal router = APIRouter( prefix="/admin", tags=["admin"], dependencies=[Depends(require_admin_principal)], ) def _resolve_module_id(db: Session, system_key: str, module_key: str | None) -> str: systems_repo = SystemsRepository(db) modules_repo = ModulesRepository(db) system = systems_repo.get_by_key(system_key) if not system: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="system_not_found") target_module_key = module_key if module_key else f"__system__{system_key}" module = modules_repo.get_by_key(target_module_key) if module and module.system_key != system_key: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="module_system_mismatch") if not module: module = modules_repo.create( module_key=target_module_key, system_key=system_key, name=target_module_key, status="active", ) return module.id def _resolve_scope_ids(db: Session, scope_type: str, scope_id: str) -> tuple[str | None, str | None]: companies_repo = CompaniesRepository(db) sites_repo = SitesRepository(db) if scope_type == "company": company = companies_repo.get_by_key(scope_id) if not company: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="company_not_found") return company.id, None if scope_type == "site": site = sites_repo.get_by_key(scope_id) if not site: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="site_not_found") return None, site.id raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="invalid_scope_type") @router.post("/permissions/grant") def grant_permission( payload: PermissionGrantRequest, _: ApiClient = Depends(require_api_client), db: Session = Depends(get_db), ) -> dict[str, str]: users_repo = UsersRepository(db) perms_repo = PermissionsRepository(db) user = users_repo.upsert_by_sub( authentik_sub=payload.authentik_sub, email=payload.email, display_name=payload.display_name, is_active=True, ) module_id = _resolve_module_id(db, payload.system, payload.module) company_id, site_id = _resolve_scope_ids(db, payload.scope_type, payload.scope_id) permission = perms_repo.create_if_not_exists( user_id=user.id, module_id=module_id, action=payload.action, scope_type=payload.scope_type, company_id=company_id, site_id=site_id, ) return {"permission_id": permission.id, "result": "granted"} @router.post("/permissions/revoke") def revoke_permission( payload: PermissionRevokeRequest, _: ApiClient = Depends(require_api_client), db: Session = Depends(get_db), ) -> dict[str, int | str]: users_repo = UsersRepository(db) perms_repo = PermissionsRepository(db) user = users_repo.get_by_sub(payload.authentik_sub) if user is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="user_not_found") module_id = _resolve_module_id(db, payload.system, payload.module) company_id, site_id = _resolve_scope_ids(db, payload.scope_type, payload.scope_id) deleted = perms_repo.revoke( user_id=user.id, module_id=module_id, action=payload.action, scope_type=payload.scope_type, company_id=company_id, site_id=site_id, ) return {"deleted": deleted, "result": "revoked"} @router.get("/permissions/direct", response_model=DirectPermissionListResponse) def list_direct_permissions( _: ApiClient = Depends(require_api_client), db: Session = Depends(get_db), keyword: str | None = Query(default=None), scope_type: str | None = Query(default=None), limit: int = Query(default=200, ge=1, le=500), offset: int = Query(default=0, ge=0), ) -> DirectPermissionListResponse: perms_repo = PermissionsRepository(db) items, total = perms_repo.list_direct_permissions( keyword=keyword, scope_type=scope_type, limit=limit, offset=offset, ) return DirectPermissionListResponse( items=[DirectPermissionRow(**item) for item in items], total=total, limit=limit, offset=offset, ) @router.delete("/permissions/direct/{permission_id}") def delete_direct_permission( permission_id: str, _: ApiClient = Depends(require_api_client), db: Session = Depends(get_db), ) -> dict[str, int | str]: try: normalized_permission_id = str(UUID(permission_id)) except ValueError: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="invalid_permission_id") perms_repo = PermissionsRepository(db) deleted = perms_repo.revoke_by_permission_id(normalized_permission_id) return {"deleted": deleted, "result": "revoked"}