Files
member-platform/backend/app/api/admin.py

61 lines
1.9 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.models.api_client import ApiClient
from app.repositories.permissions_repo import PermissionsRepository
from app.repositories.users_repo import UsersRepository
from app.schemas.permissions import PermissionGrantRequest, PermissionRevokeRequest
from app.security.api_client_auth import require_api_client
router = APIRouter(prefix="/admin", tags=["admin"])
@router.post("/permissions/grant")
def grant_permission(
payload: PermissionGrantRequest,
_: ApiClient = Depends(require_api_client),
db: Session = Depends(get_db),
) -> dict[str, str]:
users_repo = UsersRepository(db)
perms_repo = PermissionsRepository(db)
user = users_repo.upsert_by_sub(
authentik_sub=payload.authentik_sub,
email=payload.email,
display_name=payload.display_name,
is_active=True,
)
permission = perms_repo.create_if_not_exists(
user_id=user.id,
scope_type=payload.scope_type,
scope_id=payload.scope_id,
module=payload.module,
action=payload.action,
)
return {"permission_id": permission.id, "result": "granted"}
@router.post("/permissions/revoke")
def revoke_permission(
payload: PermissionRevokeRequest,
_: ApiClient = Depends(require_api_client),
db: Session = Depends(get_db),
) -> dict[str, int | str]:
users_repo = UsersRepository(db)
perms_repo = PermissionsRepository(db)
user = users_repo.get_by_sub(payload.authentik_sub)
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="user_not_found")
deleted = perms_repo.revoke(
user_id=user.id,
scope_type=payload.scope_type,
scope_id=payload.scope_id,
module=payload.module,
action=payload.action,
)
return {"deleted": deleted, "result": "revoked"}