61 lines
1.9 KiB
Python
61 lines
1.9 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.db.session import get_db
|
|
from app.models.api_client import ApiClient
|
|
from app.repositories.permissions_repo import PermissionsRepository
|
|
from app.repositories.users_repo import UsersRepository
|
|
from app.schemas.permissions import PermissionGrantRequest, PermissionRevokeRequest
|
|
from app.security.api_client_auth import require_api_client
|
|
|
|
router = APIRouter(prefix="/admin", tags=["admin"])
|
|
|
|
|
|
@router.post("/permissions/grant")
|
|
def grant_permission(
|
|
payload: PermissionGrantRequest,
|
|
_: ApiClient = Depends(require_api_client),
|
|
db: Session = Depends(get_db),
|
|
) -> dict[str, str]:
|
|
users_repo = UsersRepository(db)
|
|
perms_repo = PermissionsRepository(db)
|
|
|
|
user = users_repo.upsert_by_sub(
|
|
authentik_sub=payload.authentik_sub,
|
|
email=payload.email,
|
|
display_name=payload.display_name,
|
|
is_active=True,
|
|
)
|
|
permission = perms_repo.create_if_not_exists(
|
|
user_id=user.id,
|
|
scope_type=payload.scope_type,
|
|
scope_id=payload.scope_id,
|
|
module=payload.module,
|
|
action=payload.action,
|
|
)
|
|
|
|
return {"permission_id": permission.id, "result": "granted"}
|
|
|
|
|
|
@router.post("/permissions/revoke")
|
|
def revoke_permission(
|
|
payload: PermissionRevokeRequest,
|
|
_: ApiClient = Depends(require_api_client),
|
|
db: Session = Depends(get_db),
|
|
) -> dict[str, int | str]:
|
|
users_repo = UsersRepository(db)
|
|
perms_repo = PermissionsRepository(db)
|
|
|
|
user = users_repo.get_by_sub(payload.authentik_sub)
|
|
if user is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="user_not_found")
|
|
|
|
deleted = perms_repo.revoke(
|
|
user_id=user.id,
|
|
scope_type=payload.scope_type,
|
|
scope_id=payload.scope_id,
|
|
module=payload.module,
|
|
action=payload.action,
|
|
)
|
|
return {"deleted": deleted, "result": "revoked"}
|