refactor: rebuild backend around role-site authorization model

This commit is contained in:
Chris
2026-04-02 23:58:13 +08:00
parent 0bc667847d
commit 2f92b94f59
43 changed files with 1593 additions and 2257 deletions

View File

@@ -1,153 +0,0 @@
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.repositories.companies_repo import CompaniesRepository
from app.repositories.modules_repo import ModulesRepository
from app.repositories.permissions_repo import PermissionsRepository
from app.repositories.sites_repo import SitesRepository
from app.repositories.systems_repo import SystemsRepository
from app.repositories.users_repo import UsersRepository
from app.schemas.permissions import (
DirectPermissionListResponse,
DirectPermissionRow,
PermissionGrantRequest,
PermissionRevokeRequest,
)
from app.security.admin_guard import require_admin_principal
router = APIRouter(
prefix="/admin",
tags=["admin"],
dependencies=[Depends(require_admin_principal)],
)
def _resolve_module_id(db: Session, system_key: str, module_key: str | None) -> str:
systems_repo = SystemsRepository(db)
modules_repo = ModulesRepository(db)
system = systems_repo.get_by_key(system_key)
if not system:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="system_not_found")
target_module_key = module_key if module_key else f"__system__{system_key}"
module = modules_repo.get_by_key(target_module_key)
if module and module.system_key != system_key:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="module_system_mismatch")
if not module:
module = modules_repo.create(
module_key=target_module_key,
system_key=system_key,
name=target_module_key,
status="active",
)
return module.id
def _resolve_scope_ids(db: Session, scope_type: str, scope_id: str) -> tuple[str | None, str | None]:
companies_repo = CompaniesRepository(db)
sites_repo = SitesRepository(db)
if scope_type == "company":
company = companies_repo.get_by_key(scope_id)
if not company:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="company_not_found")
return company.id, None
if scope_type == "site":
site = sites_repo.get_by_key(scope_id)
if not site:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="site_not_found")
return None, site.id
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="invalid_scope_type")
@router.post("/permissions/grant")
def grant_permission(
payload: PermissionGrantRequest,
db: Session = Depends(get_db),
) -> dict[str, str]:
users_repo = UsersRepository(db)
perms_repo = PermissionsRepository(db)
user = users_repo.upsert_by_sub(
user_sub=payload.user_sub,
username=None,
email=payload.email,
display_name=payload.display_name,
is_active=True,
)
module_id = _resolve_module_id(db, payload.system, payload.module)
company_id, site_id = _resolve_scope_ids(db, payload.scope_type, payload.scope_id)
permission = perms_repo.create_if_not_exists(
user_id=user.id,
module_id=module_id,
action=payload.action,
scope_type=payload.scope_type,
company_id=company_id,
site_id=site_id,
)
return {"permission_id": permission.id, "result": "granted"}
@router.post("/permissions/revoke")
def revoke_permission(
payload: PermissionRevokeRequest,
db: Session = Depends(get_db),
) -> dict[str, int | str]:
users_repo = UsersRepository(db)
perms_repo = PermissionsRepository(db)
user = users_repo.get_by_sub(payload.user_sub)
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="user_not_found")
module_id = _resolve_module_id(db, payload.system, payload.module)
company_id, site_id = _resolve_scope_ids(db, payload.scope_type, payload.scope_id)
deleted = perms_repo.revoke(
user_id=user.id,
module_id=module_id,
action=payload.action,
scope_type=payload.scope_type,
company_id=company_id,
site_id=site_id,
)
return {"deleted": deleted, "result": "revoked"}
@router.get("/permissions/direct", response_model=DirectPermissionListResponse)
def list_direct_permissions(
db: Session = Depends(get_db),
keyword: str | None = Query(default=None),
scope_type: str | None = Query(default=None),
limit: int = Query(default=200, ge=1, le=500),
offset: int = Query(default=0, ge=0),
) -> DirectPermissionListResponse:
perms_repo = PermissionsRepository(db)
items, total = perms_repo.list_direct_permissions(
keyword=keyword,
scope_type=scope_type,
limit=limit,
offset=offset,
)
return DirectPermissionListResponse(
items=[DirectPermissionRow(**item) for item in items],
total=total,
limit=limit,
offset=offset,
)
@router.delete("/permissions/direct/{permission_id}")
def delete_direct_permission(
permission_id: str,
db: Session = Depends(get_db),
) -> dict[str, int | str]:
try:
normalized_permission_id = str(UUID(permission_id))
except ValueError:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="invalid_permission_id")
perms_repo = PermissionsRepository(db)
deleted = perms_repo.revoke_by_permission_id(normalized_permission_id)
return {"deleted": deleted, "result": "revoked"}

File diff suppressed because it is too large Load Diff

View File

@@ -3,11 +3,11 @@ from sqlalchemy.orm import Session
from app.core.config import get_settings
from app.db.session import get_db
from app.repositories.permissions_repo import PermissionsRepository
from app.schemas.internal import InternalUpsertUserBySubResponse
from app.repositories.users_repo import UsersRepository
from app.repositories.user_sites_repo import UserSitesRepository
from app.schemas.idp_admin import KeycloakEnsureUserRequest, KeycloakEnsureUserResponse
from app.schemas.permissions import PermissionSnapshotResponse
from app.schemas.internal import InternalUpsertUserBySubResponse, InternalUserRoleItem, InternalUserRoleResponse
from app.schemas.permissions import RoleSnapshotResponse
from app.schemas.users import UserUpsertBySubRequest
from app.security.api_client_auth import require_api_client
from app.services.idp_admin_service import KeycloakAdminService
@@ -28,32 +28,84 @@ def upsert_user_by_sub(
email=payload.email,
display_name=payload.display_name,
is_active=payload.is_active,
status=payload.status,
)
return InternalUpsertUserBySubResponse(
id=user.id,
user_sub=user.user_sub,
idp_user_id=user.idp_user_id,
username=user.username,
email=user.email,
display_name=user.display_name,
is_active=user.is_active,
status=user.status,
)
return {
"id": user.id,
"user_sub": user.user_sub,
"idp_user_id": user.idp_user_id,
"username": user.username,
"email": user.email,
"display_name": user.display_name,
"is_active": user.is_active,
}
@router.get("/permissions/{user_sub}/snapshot", response_model=PermissionSnapshotResponse)
def get_permission_snapshot(
user_sub: str,
db: Session = Depends(get_db),
) -> PermissionSnapshotResponse:
def _build_user_role_rows(db: Session, user_sub: str) -> list[tuple[str, str, str, str, str, str, str, str, str]]:
users_repo = UsersRepository(db)
perms_repo = PermissionsRepository(db)
user_sites_repo = UserSitesRepository(db)
user = users_repo.get_by_sub(user_sub)
if user is None:
return PermissionSnapshotResponse(user_sub=user_sub, permissions=[])
return []
permissions = perms_repo.list_by_user(user.id, user.user_sub)
return PermissionService.build_snapshot(user_sub=user_sub, permissions=permissions)
rows = user_sites_repo.get_user_role_rows(user.id)
return [
(
site.site_key,
site.display_name,
company.company_key,
company.display_name,
system.system_key,
system.name,
role.role_key,
role.name,
role.idp_role_name,
)
for site, company, role, system in rows
]
@router.get("/users/{user_sub}/roles", response_model=InternalUserRoleResponse)
def get_user_roles(user_sub: str, db: Session = Depends(get_db)) -> InternalUserRoleResponse:
rows = _build_user_role_rows(db, user_sub)
return InternalUserRoleResponse(
user_sub=user_sub,
roles=[
InternalUserRoleItem(
site_key=site_key,
site_display_name=site_display_name,
company_key=company_key,
company_display_name=company_display_name,
system_key=system_key,
system_name=system_name,
role_key=role_key,
role_name=role_name,
idp_role_name=idp_role_name,
)
for (
site_key,
site_display_name,
company_key,
company_display_name,
system_key,
system_name,
role_key,
role_name,
idp_role_name,
) in rows
],
)
@router.get("/permissions/{user_sub}/snapshot", response_model=RoleSnapshotResponse)
def get_permission_snapshot(
user_sub: str,
db: Session = Depends(get_db),
) -> RoleSnapshotResponse:
rows = _build_user_role_rows(db, user_sub)
return PermissionService.build_role_snapshot(user_sub=user_sub, rows=rows)
@router.post("/idp/users/ensure", response_model=KeycloakEnsureUserResponse)
@@ -73,17 +125,17 @@ def ensure_idp_user(
)
users_repo = UsersRepository(db)
resolved_sub = payload.user_sub or ""
if sync_result.user_sub:
resolved_sub = sync_result.user_sub
resolved_sub = payload.user_sub or sync_result.user_sub or ""
if not resolved_sub:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail="idp_missing_sub")
users_repo.upsert_by_sub(
user_sub=resolved_sub,
username=payload.username,
email=payload.email,
display_name=payload.display_name,
is_active=payload.is_active,
status="active",
idp_user_id=sync_result.user_id,
)
return KeycloakEnsureUserResponse(idp_user_id=sync_result.user_id, action=sync_result.action)

View File

@@ -3,14 +3,15 @@ from sqlalchemy.orm import Session
from app.db.session import get_db
from app.repositories.companies_repo import CompaniesRepository
from app.repositories.modules_repo import ModulesRepository
from app.repositories.roles_repo import RolesRepository
from app.repositories.sites_repo import SitesRepository
from app.repositories.systems_repo import SystemsRepository
from app.repositories.users_repo import UsersRepository
from app.schemas.internal import (
InternalCompanyListResponse,
InternalMemberListResponse,
InternalModuleListResponse,
InternalRoleItem,
InternalRoleListResponse,
InternalSiteListResponse,
InternalSystemListResponse,
)
@@ -27,24 +28,13 @@ def internal_list_systems(
) -> InternalSystemListResponse:
repo = SystemsRepository(db)
items, total = repo.list(limit=limit, offset=offset)
return {"items": [{"id": i.id, "system_key": i.system_key, "name": i.name, "status": i.status} for i in items], "total": total, "limit": limit, "offset": offset}
@router.get("/modules", response_model=InternalModuleListResponse)
def internal_list_modules(
db: Session = Depends(get_db),
limit: int = Query(default=500, ge=1, le=2000),
offset: int = Query(default=0, ge=0),
) -> InternalModuleListResponse:
modules_repo = ModulesRepository(db)
items, total = modules_repo.list(limit=limit, offset=offset)
return {
"items": [
{
"id": i.id,
"module_key": i.module_key,
"system_key": i.system_key,
"name": i.name,
"idp_client_id": i.idp_client_id,
"status": i.status,
}
for i in items
@@ -55,6 +45,43 @@ def internal_list_modules(
}
@router.get("/roles", response_model=InternalRoleListResponse)
def internal_list_roles(
db: Session = Depends(get_db),
system_key: str | None = Query(default=None),
limit: int = Query(default=500, ge=1, le=2000),
offset: int = Query(default=0, ge=0),
) -> InternalRoleListResponse:
systems_repo = SystemsRepository(db)
roles_repo = RolesRepository(db)
system_id = None
systems, _ = systems_repo.list(limit=5000, offset=0)
system_map = {s.id: s for s in systems}
if system_key:
system = systems_repo.get_by_key(system_key)
if not system:
return InternalRoleListResponse(items=[], total=0, limit=limit, offset=offset)
system_id = system.id
items, total = roles_repo.list(system_id=system_id, limit=limit, offset=offset)
rows = [
InternalRoleItem(
id=i.id,
role_key=i.role_key,
system_key=system_map[i.system_id].system_key,
system_name=system_map[i.system_id].name,
name=i.name,
idp_role_name=i.idp_role_name,
description=i.description,
status=i.status,
)
for i in items
if i.system_id in system_map
]
return InternalRoleListResponse(items=rows, total=total, limit=limit, offset=offset)
@router.get("/companies", response_model=InternalCompanyListResponse)
def internal_list_companies(
db: Session = Depends(get_db),
@@ -64,7 +91,21 @@ def internal_list_companies(
) -> InternalCompanyListResponse:
repo = CompaniesRepository(db)
items, total = repo.list(keyword=keyword, limit=limit, offset=offset)
return {"items": [{"id": i.id, "company_key": i.company_key, "name": i.name, "status": i.status} for i in items], "total": total, "limit": limit, "offset": offset}
return {
"items": [
{
"id": i.id,
"company_key": i.company_key,
"display_name": i.display_name,
"legal_name": i.legal_name,
"status": i.status,
}
for i in items
],
"total": total,
"limit": limit,
"offset": offset,
}
@router.get("/sites", response_model=InternalSiteListResponse)
@@ -81,10 +122,27 @@ def internal_list_sites(
company = companies_repo.get_by_key(company_key)
if company:
company_id = company.id
companies, _ = companies_repo.list(limit=2000, offset=0)
mapping = {c.id: c.company_key for c in companies}
companies, _ = companies_repo.list(limit=5000, offset=0)
mapping = {c.id: c for c in companies}
items, total = sites_repo.list(company_id=company_id, limit=limit, offset=offset)
return {"items": [{"id": i.id, "site_key": i.site_key, "company_key": mapping.get(i.company_id), "name": i.name, "status": i.status} for i in items], "total": total, "limit": limit, "offset": offset}
return {
"items": [
{
"id": i.id,
"site_key": i.site_key,
"company_key": mapping[i.company_id].company_key,
"company_display_name": mapping[i.company_id].display_name,
"display_name": i.display_name,
"domain": i.domain,
"status": i.status,
}
for i in items
if i.company_id in mapping
],
"total": total,
"limit": limit,
"offset": offset,
}
@router.get("/members", response_model=InternalMemberListResponse)
@@ -105,6 +163,7 @@ def internal_list_members(
"email": i.email,
"display_name": i.display_name,
"is_active": i.is_active,
"status": i.status,
}
for i in items
],

View File

@@ -3,10 +3,10 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.repositories.permissions_repo import PermissionsRepository
from app.repositories.users_repo import UsersRepository
from app.repositories.user_sites_repo import UserSitesRepository
from app.schemas.auth import KeycloakPrincipal, MeSummaryResponse
from app.schemas.permissions import PermissionSnapshotResponse
from app.schemas.permissions import RoleSnapshotResponse
from app.security.idp_jwt import require_authenticated_principal
from app.services.permission_service import PermissionService
@@ -26,10 +26,10 @@ def get_me(
email=principal.email,
display_name=principal.name or principal.preferred_username,
is_active=True,
status="active",
)
return MeSummaryResponse(sub=user.user_sub, email=user.email, display_name=user.display_name)
except SQLAlchemyError:
# DB schema compatibility fallback for local bring-up.
return MeSummaryResponse(
sub=principal.sub,
email=principal.email,
@@ -37,14 +37,14 @@ def get_me(
)
@router.get("/permissions/snapshot", response_model=PermissionSnapshotResponse)
@router.get("/permissions/snapshot", response_model=RoleSnapshotResponse)
def get_my_permission_snapshot(
principal: KeycloakPrincipal = Depends(require_authenticated_principal),
db: Session = Depends(get_db),
) -> PermissionSnapshotResponse:
) -> RoleSnapshotResponse:
try:
users_repo = UsersRepository(db)
perms_repo = PermissionsRepository(db)
user_sites_repo = UserSitesRepository(db)
user = users_repo.upsert_by_sub(
user_sub=principal.sub,
@@ -52,8 +52,23 @@ def get_my_permission_snapshot(
email=principal.email,
display_name=principal.name or principal.preferred_username,
is_active=True,
status="active",
)
permissions = perms_repo.list_by_user(user.id, user.user_sub)
return PermissionService.build_snapshot(user_sub=principal.sub, permissions=permissions)
rows = user_sites_repo.get_user_role_rows(user.id)
serialized = [
(
site.site_key,
site.display_name,
company.company_key,
company.display_name,
system.system_key,
system.name,
role.role_key,
role.name,
role.idp_role_name,
)
for site, company, role, system in rows
]
return PermissionService.build_role_snapshot(user_sub=principal.sub, rows=serialized)
except SQLAlchemyError:
return PermissionSnapshotResponse(user_sub=principal.sub, permissions=[])
return RoleSnapshotResponse(user_sub=principal.sub, roles=[])

View File

@@ -1,7 +1,6 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.admin import router as admin_router
from app.api.admin_catalog import router as admin_catalog_router
from app.api.auth import router as auth_router
from app.api.internal_catalog import router as internal_catalog_router
@@ -28,7 +27,6 @@ def healthz() -> dict[str, str]:
app.include_router(internal_router)
app.include_router(internal_catalog_router)
app.include_router(admin_router)
app.include_router(admin_catalog_router)
app.include_router(me_router)
app.include_router(auth_router)

View File

@@ -1,25 +1,21 @@
from app.models.api_client import ApiClient
from app.models.auth_sync_state import AuthSyncState
from app.models.company import Company
from app.models.module import Module
from app.models.permission import Permission
from app.models.permission_group import PermissionGroup
from app.models.permission_group_member import PermissionGroupMember
from app.models.permission_group_permission import PermissionGroupPermission
from app.models.role import Role
from app.models.site import Site
from app.models.site_role import SiteRole
from app.models.system import System
from app.models.user import User
from app.models.user_scope_permission import UserScopePermission
from app.models.user_site import UserSite
__all__ = [
"ApiClient",
"AuthSyncState",
"Company",
"Module",
"Permission",
"PermissionGroup",
"PermissionGroupMember",
"PermissionGroupPermission",
"Role",
"Site",
"SiteRole",
"System",
"User",
"UserScopePermission",
"UserSite",
]

View File

@@ -0,0 +1,21 @@
from datetime import datetime
from uuid import uuid4
from sqlalchemy import DateTime, String, UniqueConstraint, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base
class AuthSyncState(Base):
__tablename__ = "auth_sync_state"
__table_args__ = (UniqueConstraint("entity_type", "entity_id", name="uq_auth_sync_state_entity"),)
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
entity_type: Mapped[str] = mapped_column(String(32), nullable=False)
entity_id: Mapped[str] = mapped_column(UUID(as_uuid=False), nullable=False)
last_synced_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
source_version: Mapped[str | None] = mapped_column(String(255))
last_error: Mapped[str | None] = mapped_column(String)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)

View File

@@ -13,7 +13,9 @@ class Company(Base):
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
company_key: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True)
name: Mapped[str] = mapped_column(String(255), nullable=False)
display_name: Mapped[str] = mapped_column(String(255), nullable=False)
legal_name: Mapped[str | None] = mapped_column(String(255))
idp_group_id: Mapped[str | None] = mapped_column(String(128))
status: Mapped[str] = mapped_column(String(16), nullable=False, default="active")
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
updated_at: Mapped[datetime] = mapped_column(

View File

@@ -1,31 +0,0 @@
from datetime import datetime
from uuid import uuid4
from sqlalchemy import DateTime, ForeignKey, String, UniqueConstraint, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base
class Permission(Base):
__tablename__ = "permissions"
__table_args__ = (
UniqueConstraint(
"user_id",
"scope_type",
"scope_id",
"module",
"action",
name="uq_permissions_user_scope_module_action",
),
)
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
user_id: Mapped[str] = mapped_column(UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
scope_type: Mapped[str] = mapped_column(String(32), nullable=False)
scope_id: Mapped[str] = mapped_column(String(128), nullable=False)
module: Mapped[str] = mapped_column(String(128), nullable=False)
action: Mapped[str] = mapped_column(String(32), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)

View File

@@ -1,20 +0,0 @@
from datetime import datetime
from uuid import uuid4
from sqlalchemy import DateTime, ForeignKey, String, UniqueConstraint, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base
class PermissionGroupMember(Base):
__tablename__ = "permission_group_members"
__table_args__ = (UniqueConstraint("group_id", "user_sub", name="uq_permission_group_members_group_sub"),)
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
group_id: Mapped[str] = mapped_column(
UUID(as_uuid=False), ForeignKey("permission_groups.id", ondelete="CASCADE"), nullable=False
)
user_sub: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)

View File

@@ -1,23 +0,0 @@
from datetime import datetime
from uuid import uuid4
from sqlalchemy import DateTime, ForeignKey, String, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base
class PermissionGroupPermission(Base):
__tablename__ = "permission_group_permissions"
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
group_id: Mapped[str] = mapped_column(
UUID(as_uuid=False), ForeignKey("permission_groups.id", ondelete="CASCADE"), nullable=False
)
system: Mapped[str] = mapped_column(String(64), nullable=False)
module: Mapped[str] = mapped_column(String(128), nullable=False)
action: Mapped[str] = mapped_column(String(32), nullable=False)
scope_type: Mapped[str] = mapped_column(String(16), nullable=False)
scope_id: Mapped[str] = mapped_column(String(128), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)

View File

@@ -1,22 +1,23 @@
from datetime import datetime
from uuid import uuid4
from sqlalchemy import DateTime, ForeignKey, String, func
from sqlalchemy import DateTime, ForeignKey, String, UniqueConstraint, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base
class Module(Base):
__tablename__ = "modules"
class Role(Base):
__tablename__ = "roles"
__table_args__ = (UniqueConstraint("system_id", "idp_role_name", name="uq_roles_system_idp_role_name"),)
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
system_key: Mapped[str] = mapped_column(
String(128), ForeignKey("systems.system_key", ondelete="CASCADE"), nullable=False, index=True
)
module_key: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True)
role_key: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True)
system_id: Mapped[str] = mapped_column(UUID(as_uuid=False), ForeignKey("systems.id", ondelete="CASCADE"), nullable=False)
name: Mapped[str] = mapped_column(String(255), nullable=False)
description: Mapped[str | None] = mapped_column(String(1024))
idp_role_name: Mapped[str] = mapped_column(String(255), nullable=False)
status: Mapped[str] = mapped_column(String(16), nullable=False, default="active")
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
updated_at: Mapped[datetime] = mapped_column(

View File

@@ -14,7 +14,9 @@ class Site(Base):
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
site_key: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True)
company_id: Mapped[str] = mapped_column(UUID(as_uuid=False), ForeignKey("companies.id", ondelete="CASCADE"), nullable=False)
name: Mapped[str] = mapped_column(String(255), nullable=False)
display_name: Mapped[str] = mapped_column(String(255), nullable=False)
domain: Mapped[str | None] = mapped_column(String(255))
idp_group_id: Mapped[str | None] = mapped_column(String(128))
status: Mapped[str] = mapped_column(String(16), nullable=False, default="active")
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
updated_at: Mapped[datetime] = mapped_column(

18
app/models/site_role.py Normal file
View File

@@ -0,0 +1,18 @@
from datetime import datetime
from uuid import uuid4
from sqlalchemy import DateTime, ForeignKey, UniqueConstraint, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base
class SiteRole(Base):
__tablename__ = "site_roles"
__table_args__ = (UniqueConstraint("site_id", "role_id", name="uq_site_roles_site_role"),)
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
site_id: Mapped[str] = mapped_column(UUID(as_uuid=False), ForeignKey("sites.id", ondelete="CASCADE"), nullable=False)
role_id: Mapped[str] = mapped_column(UUID(as_uuid=False), ForeignKey("roles.id", ondelete="CASCADE"), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)

View File

@@ -14,6 +14,7 @@ class System(Base):
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
system_key: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True)
name: Mapped[str] = mapped_column(String(255), nullable=False)
idp_client_id: Mapped[str] = mapped_column(String(128), unique=True, nullable=False)
status: Mapped[str] = mapped_column(String(16), nullable=False, default="active")
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
updated_at: Mapped[datetime] = mapped_column(

View File

@@ -13,12 +13,12 @@ class User(Base):
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
user_sub: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
idp_user_id: Mapped[str | None] = mapped_column(String(128))
idp_user_id: Mapped[str | None] = mapped_column(String(128), unique=True)
username: Mapped[str | None] = mapped_column(String(255), unique=True)
email: Mapped[str | None] = mapped_column(String(320))
email: Mapped[str | None] = mapped_column(String(320), unique=True)
display_name: Mapped[str | None] = mapped_column(String(255))
status: Mapped[str] = mapped_column(String(16), nullable=False, default="active")
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False

View File

@@ -1,24 +0,0 @@
from datetime import datetime
from uuid import uuid4
from sqlalchemy import DateTime, ForeignKey, String, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base
class UserScopePermission(Base):
__tablename__ = "user_scope_permissions"
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
user_id: Mapped[str] = mapped_column(UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
module_id: Mapped[str] = mapped_column(UUID(as_uuid=False), ForeignKey("modules.id", ondelete="CASCADE"), nullable=False)
action: Mapped[str] = mapped_column(String(32), nullable=False)
scope_type: Mapped[str] = mapped_column(String(16), nullable=False)
company_id: Mapped[str | None] = mapped_column(UUID(as_uuid=False), ForeignKey("companies.id", ondelete="CASCADE"))
site_id: Mapped[str | None] = mapped_column(UUID(as_uuid=False), ForeignKey("sites.id", ondelete="CASCADE"))
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False
)

View File

@@ -1,20 +1,20 @@
from datetime import datetime
from uuid import uuid4
from sqlalchemy import DateTime, String, func
from sqlalchemy import DateTime, ForeignKey, UniqueConstraint, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base
class PermissionGroup(Base):
__tablename__ = "permission_groups"
class UserSite(Base):
__tablename__ = "user_sites"
__table_args__ = (UniqueConstraint("user_id", "site_id", name="uq_user_sites_user_site"),)
id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=lambda: str(uuid4()))
group_key: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True)
name: Mapped[str] = mapped_column(String(255), nullable=False)
status: Mapped[str] = mapped_column(String(16), nullable=False, default="active")
user_id: Mapped[str] = mapped_column(UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
site_id: Mapped[str] = mapped_column(UUID(as_uuid=False), ForeignKey("sites.id", ondelete="CASCADE"), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), nullable=False)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False

View File

@@ -0,0 +1,94 @@
from sqlalchemy import func, or_, select
from sqlalchemy.orm import Session
from app.models.api_client import ApiClient
class ApiClientsRepository:
def __init__(self, db: Session) -> None:
self.db = db
def get_by_key(self, client_key: str) -> ApiClient | None:
return self.db.scalar(select(ApiClient).where(ApiClient.client_key == client_key))
def list(self, *, keyword: str | None = None, status: str | None = None, limit: int = 100, offset: int = 0) -> tuple[list[ApiClient], int]:
stmt = select(ApiClient)
count_stmt = select(func.count()).select_from(ApiClient)
if keyword:
pattern = f"%{keyword}%"
cond = or_(ApiClient.client_key.ilike(pattern), ApiClient.name.ilike(pattern))
stmt = stmt.where(cond)
count_stmt = count_stmt.where(cond)
if status:
stmt = stmt.where(ApiClient.status == status)
count_stmt = count_stmt.where(ApiClient.status == status)
stmt = stmt.order_by(ApiClient.created_at.desc()).limit(limit).offset(offset)
return list(self.db.scalars(stmt).all()), int(self.db.scalar(count_stmt) or 0)
def create(
self,
*,
client_key: str,
name: str,
status: str,
api_key_hash: str,
allowed_origins: list[str],
allowed_ips: list[str],
allowed_paths: list[str],
rate_limit_per_min: int | None,
expires_at,
) -> ApiClient:
item = ApiClient(
client_key=client_key,
name=name,
status=status,
api_key_hash=api_key_hash,
allowed_origins=allowed_origins,
allowed_ips=allowed_ips,
allowed_paths=allowed_paths,
rate_limit_per_min=rate_limit_per_min,
expires_at=expires_at,
)
self.db.add(item)
self.db.commit()
self.db.refresh(item)
return item
def update(
self,
item: ApiClient,
*,
name: str | None = None,
status: str | None = None,
api_key_hash: str | None = None,
allowed_origins: list[str] | None = None,
allowed_ips: list[str] | None = None,
allowed_paths: list[str] | None = None,
rate_limit_per_min: int | None = None,
expires_at=None,
) -> ApiClient:
if name is not None:
item.name = name
if status is not None:
item.status = status
if api_key_hash is not None:
item.api_key_hash = api_key_hash
if allowed_origins is not None:
item.allowed_origins = allowed_origins
if allowed_ips is not None:
item.allowed_ips = allowed_ips
if allowed_paths is not None:
item.allowed_paths = allowed_paths
if rate_limit_per_min is not None:
item.rate_limit_per_min = rate_limit_per_min
if expires_at is not None:
item.expires_at = expires_at
self.db.commit()
self.db.refresh(item)
return item
def delete(self, item: ApiClient) -> None:
self.db.delete(item)
self.db.commit()

View File

@@ -9,36 +9,55 @@ class CompaniesRepository:
self.db = db
def get_by_key(self, company_key: str) -> Company | None:
stmt = select(Company).where(Company.company_key == company_key)
return self.db.scalar(stmt)
return self.db.scalar(select(Company).where(Company.company_key == company_key))
def get_by_id(self, company_id: str) -> Company | None:
stmt = select(Company).where(Company.id == company_id)
return self.db.scalar(stmt)
return self.db.scalar(select(Company).where(Company.id == company_id))
def list(self, keyword: str | None = None, limit: int = 100, offset: int = 0) -> tuple[list[Company], int]:
stmt = select(Company)
count_stmt = select(func.count()).select_from(Company)
if keyword:
pattern = f"%{keyword}%"
cond = or_(Company.company_key.ilike(pattern), Company.name.ilike(pattern))
cond = or_(
Company.company_key.ilike(pattern),
Company.display_name.ilike(pattern),
Company.legal_name.ilike(pattern),
)
stmt = stmt.where(cond)
count_stmt = count_stmt.where(cond)
stmt = stmt.order_by(Company.created_at.desc()).limit(limit).offset(offset)
return list(self.db.scalars(stmt).all()), int(self.db.scalar(count_stmt) or 0)
def create(self, company_key: str, name: str, status: str = "active") -> Company:
item = Company(company_key=company_key, name=name, status=status)
def create(self, *, company_key: str, display_name: str, legal_name: str | None, status: str = "active") -> Company:
item = Company(company_key=company_key, display_name=display_name, legal_name=legal_name, status=status)
self.db.add(item)
self.db.commit()
self.db.refresh(item)
return item
def update(self, item: Company, *, name: str | None = None, status: str | None = None) -> Company:
if name is not None:
item.name = name
def update(
self,
item: Company,
*,
display_name: str | None = None,
legal_name: str | None = None,
idp_group_id: str | None = None,
status: str | None = None,
) -> Company:
if display_name is not None:
item.display_name = display_name
if legal_name is not None:
item.legal_name = legal_name
if idp_group_id is not None:
item.idp_group_id = idp_group_id
if status is not None:
item.status = status
self.db.commit()
self.db.refresh(item)
return item
def delete(self, item: Company) -> None:
self.db.delete(item)
self.db.commit()

View File

@@ -1,35 +0,0 @@
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from app.models.module import Module
class ModulesRepository:
def __init__(self, db: Session) -> None:
self.db = db
def get_by_key(self, module_key: str) -> Module | None:
stmt = select(Module).where(Module.module_key == module_key)
return self.db.scalar(stmt)
def list(self, limit: int = 200, offset: int = 0) -> tuple[list[Module], int]:
stmt = select(Module)
count_stmt = select(func.count()).select_from(Module)
stmt = stmt.order_by(Module.created_at.desc()).limit(limit).offset(offset)
return list(self.db.scalars(stmt).all()), int(self.db.scalar(count_stmt) or 0)
def create(self, module_key: str, system_key: str, name: str, status: str = "active") -> Module:
item = Module(module_key=module_key, system_key=system_key, name=name, status=status)
self.db.add(item)
self.db.commit()
self.db.refresh(item)
return item
def update(self, item: Module, *, name: str | None = None, status: str | None = None) -> Module:
if name is not None:
item.name = name
if status is not None:
item.status = status
self.db.commit()
self.db.refresh(item)
return item

View File

@@ -1,269 +0,0 @@
from __future__ import annotations
from sqlalchemy import delete, func, select
from sqlalchemy.orm import Session
from app.models.permission_group import PermissionGroup
from app.models.permission_group_member import PermissionGroupMember
from app.models.permission_group_permission import PermissionGroupPermission
from app.models.user import User
class PermissionGroupsRepository:
def __init__(self, db: Session) -> None:
self.db = db
def get_by_key(self, group_key: str) -> PermissionGroup | None:
return self.db.scalar(select(PermissionGroup).where(PermissionGroup.group_key == group_key))
def get_by_id(self, group_id: str) -> PermissionGroup | None:
return self.db.scalar(select(PermissionGroup).where(PermissionGroup.id == group_id))
def get_by_keys(self, group_keys: list[str]) -> list[PermissionGroup]:
if not group_keys:
return []
stmt = select(PermissionGroup).where(PermissionGroup.group_key.in_(group_keys))
return list(self.db.scalars(stmt).all())
def list(self, limit: int = 100, offset: int = 0) -> tuple[list[PermissionGroup], int]:
stmt = select(PermissionGroup).order_by(PermissionGroup.created_at.desc()).limit(limit).offset(offset)
count_stmt = select(func.count()).select_from(PermissionGroup)
return list(self.db.scalars(stmt).all()), int(self.db.scalar(count_stmt) or 0)
def create(self, group_key: str, name: str, status: str = "active") -> PermissionGroup:
item = PermissionGroup(group_key=group_key, name=name, status=status)
self.db.add(item)
self.db.commit()
self.db.refresh(item)
return item
def update(self, item: PermissionGroup, *, name: str | None = None, status: str | None = None) -> PermissionGroup:
if name is not None:
item.name = name
if status is not None:
item.status = status
self.db.commit()
self.db.refresh(item)
return item
def add_member_if_not_exists(self, group_id: str, user_sub: str) -> PermissionGroupMember:
existing = self.db.scalar(
select(PermissionGroupMember).where(
PermissionGroupMember.group_id == group_id, PermissionGroupMember.user_sub == user_sub
)
)
if existing:
return existing
row = PermissionGroupMember(group_id=group_id, user_sub=user_sub)
self.db.add(row)
self.db.commit()
self.db.refresh(row)
return row
def remove_member(self, group_id: str, user_sub: str) -> int:
result = self.db.execute(
delete(PermissionGroupMember).where(
PermissionGroupMember.group_id == group_id, PermissionGroupMember.user_sub == user_sub
)
)
self.db.commit()
return int(result.rowcount or 0)
def list_group_keys_by_member_sub(self, user_sub: str) -> list[str]:
stmt = (
select(PermissionGroup.group_key)
.select_from(PermissionGroupMember)
.join(PermissionGroup, PermissionGroup.id == PermissionGroupMember.group_id)
.where(PermissionGroupMember.user_sub == user_sub)
.order_by(PermissionGroup.group_key.asc())
)
return [row[0] for row in self.db.execute(stmt).all()]
def replace_member_groups(self, user_sub: str, group_ids: list[str]) -> None:
self.db.execute(delete(PermissionGroupMember).where(PermissionGroupMember.user_sub == user_sub))
for group_id in group_ids:
self.db.add(PermissionGroupMember(group_id=group_id, user_sub=user_sub))
self.db.commit()
def grant_group_permission(
self,
group_id: str,
system: str,
module: str,
action: str,
scope_type: str,
scope_id: str,
) -> PermissionGroupPermission:
where = [
PermissionGroupPermission.group_id == group_id,
PermissionGroupPermission.system == system,
PermissionGroupPermission.module == module,
PermissionGroupPermission.action == action,
PermissionGroupPermission.scope_type == scope_type,
PermissionGroupPermission.scope_id == scope_id,
]
existing = self.db.scalar(select(PermissionGroupPermission).where(*where))
if existing:
return existing
row = PermissionGroupPermission(
group_id=group_id,
system=system,
module=module,
action=action,
scope_type=scope_type,
scope_id=scope_id,
)
self.db.add(row)
self.db.commit()
self.db.refresh(row)
return row
def list_group_permissions(self, group_id: str) -> list[PermissionGroupPermission]:
stmt = (
select(PermissionGroupPermission)
.where(PermissionGroupPermission.group_id == group_id)
.order_by(PermissionGroupPermission.scope_type.asc(), PermissionGroupPermission.scope_id.asc(), PermissionGroupPermission.system.asc(), PermissionGroupPermission.module.asc(), PermissionGroupPermission.action.asc())
)
return list(self.db.scalars(stmt).all())
def replace_group_bindings(
self,
*,
group_id: str,
site_keys: list[str],
system_keys: list[str],
module_keys: list[str],
member_subs: list[str],
actions: list[str],
) -> None:
normalized_sites = list(dict.fromkeys([s for s in site_keys if s]))
normalized_actions = [a for a in list(dict.fromkeys(actions)) if a in {"view", "edit"}]
normalized_member_subs = list(dict.fromkeys([s for s in member_subs if s]))
normalized_systems = set([s for s in system_keys if s])
module_pairs = []
for pair in module_keys:
if "|" not in pair:
continue
system_key, module_key = pair.split("|", 1)
if not system_key or not module_key:
continue
module_pairs.append((system_key, module_key))
normalized_systems.add(system_key)
self.db.execute(delete(PermissionGroupPermission).where(PermissionGroupPermission.group_id == group_id))
self.db.execute(delete(PermissionGroupMember).where(PermissionGroupMember.group_id == group_id))
for sub in normalized_member_subs:
self.db.add(PermissionGroupMember(group_id=group_id, user_sub=sub))
for site_key in normalized_sites:
for action in normalized_actions:
for system_key in sorted(normalized_systems):
module_names = [m for s, m in module_pairs if s == system_key] or ["__system__"]
for module_name in module_names:
self.db.add(
PermissionGroupPermission(
group_id=group_id,
system=system_key,
module=module_name,
action=action,
scope_type="site",
scope_id=site_key,
)
)
self.db.commit()
def get_group_binding_snapshot(self, group_id: str, group_key: str) -> dict:
permissions = self.list_group_permissions(group_id)
site_keys = sorted({p.scope_id for p in permissions if p.scope_type == "site"})
system_keys = sorted({p.system for p in permissions})
actions = sorted({p.action for p in permissions if p.action in {"view", "edit"}})
module_keys = sorted(
{
f"{p.system}|{p.module}"
for p in permissions
if p.module and p.module != "__system__"
}
)
member_subs = sorted(self.list_group_member_subs(group_id))
return {
"group_key": group_key,
"site_keys": site_keys,
"system_keys": system_keys,
"module_keys": module_keys,
"member_subs": member_subs,
"actions": actions,
}
def list_group_member_subs(self, group_id: str) -> list[str]:
stmt = (
select(PermissionGroupMember.user_sub)
.where(PermissionGroupMember.group_id == group_id)
.order_by(PermissionGroupMember.user_sub.asc())
)
return [row[0] for row in self.db.execute(stmt).all()]
def list_system_groups(self, system_key: str) -> list[PermissionGroup]:
stmt = (
select(PermissionGroup)
.join(PermissionGroupPermission, PermissionGroupPermission.group_id == PermissionGroup.id)
.where(PermissionGroupPermission.system == system_key)
.order_by(PermissionGroup.name.asc())
.distinct()
)
return list(self.db.scalars(stmt).all())
def list_system_members(self, system_key: str) -> list[User]:
stmt = (
select(User)
.join(PermissionGroupMember, PermissionGroupMember.user_sub == User.user_sub)
.join(PermissionGroupPermission, PermissionGroupPermission.group_id == PermissionGroupMember.group_id)
.where(PermissionGroupPermission.system == system_key)
.order_by(User.email.asc(), User.user_sub.asc())
.distinct()
)
return list(self.db.scalars(stmt).all())
def list_module_groups(self, system_key: str, module_name: str) -> list[PermissionGroup]:
stmt = (
select(PermissionGroup)
.join(PermissionGroupPermission, PermissionGroupPermission.group_id == PermissionGroup.id)
.where(PermissionGroupPermission.system == system_key, PermissionGroupPermission.module == module_name)
.order_by(PermissionGroup.name.asc())
.distinct()
)
return list(self.db.scalars(stmt).all())
def list_module_members(self, system_key: str, module_name: str) -> list[User]:
stmt = (
select(User)
.join(PermissionGroupMember, PermissionGroupMember.user_sub == User.user_sub)
.join(PermissionGroupPermission, PermissionGroupPermission.group_id == PermissionGroupMember.group_id)
.where(PermissionGroupPermission.system == system_key, PermissionGroupPermission.module == module_name)
.order_by(User.email.asc(), User.user_sub.asc())
.distinct()
)
return list(self.db.scalars(stmt).all())
def revoke_group_permission(
self,
group_id: str,
system: str,
module: str,
action: str,
scope_type: str,
scope_id: str,
) -> int:
stmt = delete(PermissionGroupPermission).where(
PermissionGroupPermission.group_id == group_id,
PermissionGroupPermission.system == system,
PermissionGroupPermission.module == module,
PermissionGroupPermission.action == action,
PermissionGroupPermission.scope_type == scope_type,
PermissionGroupPermission.scope_id == scope_id,
)
result = self.db.execute(stmt)
self.db.commit()
return int(result.rowcount or 0)

View File

@@ -1,231 +0,0 @@
from sqlalchemy import and_, delete, func, literal, or_, select
from sqlalchemy.orm import Session
from app.models.company import Company
from app.models.module import Module
from app.models.permission_group_member import PermissionGroupMember
from app.models.permission_group_permission import PermissionGroupPermission
from app.models.site import Site
from app.models.user import User
from app.models.user_scope_permission import UserScopePermission
class PermissionsRepository:
def __init__(self, db: Session) -> None:
self.db = db
def list_by_user(self, user_id: str, user_sub: str) -> list[tuple[str, str, str | None, str, str]]:
direct_stmt = (
select(
literal("direct"),
UserScopePermission.scope_type,
Company.company_key,
Site.site_key,
Module.system_key,
Module.module_key,
UserScopePermission.action,
)
.select_from(UserScopePermission)
.join(Module, Module.id == UserScopePermission.module_id)
.join(Company, Company.id == UserScopePermission.company_id, isouter=True)
.join(Site, Site.id == UserScopePermission.site_id, isouter=True)
.where(UserScopePermission.user_id == user_id)
.where(UserScopePermission.action.in_(["view", "edit"]))
.where(UserScopePermission.scope_type == "site")
)
group_stmt = (
select(
literal("group"),
PermissionGroupPermission.scope_type,
PermissionGroupPermission.scope_id,
PermissionGroupPermission.system,
PermissionGroupPermission.module,
PermissionGroupPermission.action,
)
.select_from(PermissionGroupPermission)
.join(PermissionGroupMember, PermissionGroupMember.group_id == PermissionGroupPermission.group_id)
.where(PermissionGroupMember.user_sub == user_sub)
.where(PermissionGroupPermission.action.in_(["view", "edit"]))
.where(PermissionGroupPermission.scope_type == "site")
)
rows = self.db.execute(direct_stmt).all() + self.db.execute(group_stmt).all()
result: list[tuple[str, str, str | None, str, str]] = []
dedup = set()
for row in rows:
source = row[0]
if source == "group":
_, scope_type, scope_id, system_key, module_key, action = row
if module_key == "__system__":
module_key = f"__system__{system_key}"
else:
_, scope_type, company_key, site_key, system_key, module_key, action = row
scope_id = company_key if scope_type == "company" else site_key
key = (scope_type, scope_id or "", system_key, module_key, action)
if key in dedup:
continue
dedup.add(key)
result.append(key)
return result
def create_if_not_exists(
self,
user_id: str,
module_id: str,
action: str,
scope_type: str,
company_id: str | None,
site_id: str | None,
) -> UserScopePermission:
where_expr = [
UserScopePermission.user_id == user_id,
UserScopePermission.module_id == module_id,
UserScopePermission.action == action,
UserScopePermission.scope_type == scope_type,
]
if scope_type == "company":
where_expr.append(UserScopePermission.company_id == company_id)
else:
where_expr.append(UserScopePermission.site_id == site_id)
existing = self.db.scalar(select(UserScopePermission).where(and_(*where_expr)))
if existing:
return existing
item = UserScopePermission(
user_id=user_id,
module_id=module_id,
action=action,
scope_type=scope_type,
company_id=company_id,
site_id=site_id,
)
self.db.add(item)
self.db.commit()
self.db.refresh(item)
return item
def revoke(
self,
user_id: str,
module_id: str,
action: str,
scope_type: str,
company_id: str | None,
site_id: str | None,
) -> int:
stmt = delete(UserScopePermission).where(
UserScopePermission.user_id == user_id,
UserScopePermission.module_id == module_id,
UserScopePermission.action == action,
UserScopePermission.scope_type == scope_type,
or_(
and_(scope_type == "company", UserScopePermission.company_id == company_id),
and_(scope_type == "site", UserScopePermission.site_id == site_id),
),
)
result = self.db.execute(stmt)
self.db.commit()
return int(result.rowcount or 0)
def list_direct_permissions(
self,
*,
keyword: str | None = None,
scope_type: str | None = None,
limit: int = 200,
offset: int = 0,
) -> tuple[list[dict], int]:
stmt = (
select(
UserScopePermission.id,
User.user_sub,
User.email,
User.display_name,
UserScopePermission.scope_type,
Company.company_key,
Site.site_key,
Module.system_key,
Module.module_key,
UserScopePermission.action,
UserScopePermission.created_at,
)
.select_from(UserScopePermission)
.join(User, User.id == UserScopePermission.user_id)
.join(Module, Module.id == UserScopePermission.module_id)
.join(Company, Company.id == UserScopePermission.company_id, isouter=True)
.join(Site, Site.id == UserScopePermission.site_id, isouter=True)
.where(UserScopePermission.action.in_(["view", "edit"]))
.where(UserScopePermission.scope_type == "site")
)
count_stmt = (
select(func.count())
.select_from(UserScopePermission)
.join(User, User.id == UserScopePermission.user_id)
.join(Module, Module.id == UserScopePermission.module_id)
.join(Company, Company.id == UserScopePermission.company_id, isouter=True)
.join(Site, Site.id == UserScopePermission.site_id, isouter=True)
.where(UserScopePermission.action.in_(["view", "edit"]))
.where(UserScopePermission.scope_type == "site")
)
if scope_type == "site":
stmt = stmt.where(UserScopePermission.scope_type == scope_type)
count_stmt = count_stmt.where(UserScopePermission.scope_type == scope_type)
if keyword:
pattern = f"%{keyword}%"
cond = or_(
User.user_sub.ilike(pattern),
User.email.ilike(pattern),
User.display_name.ilike(pattern),
Module.module_key.ilike(pattern),
Company.company_key.ilike(pattern),
Site.site_key.ilike(pattern),
UserScopePermission.action.ilike(pattern),
)
stmt = stmt.where(cond)
count_stmt = count_stmt.where(cond)
stmt = stmt.order_by(UserScopePermission.created_at.desc()).limit(limit).offset(offset)
rows = self.db.execute(stmt).all()
total = int(self.db.scalar(count_stmt) or 0)
items: list[dict] = []
for row in rows:
(
permission_id,
user_sub,
email,
display_name,
row_scope_type,
company_key,
site_key,
system_key,
module_key,
action,
created_at,
) = row
scope_id = company_key if row_scope_type == "company" else site_key
module_name = module_key
if isinstance(module_name, str) and module_name.startswith("__system__"):
module_name = None
items.append(
{
"permission_id": permission_id,
"user_sub": user_sub,
"email": email,
"display_name": display_name,
"scope_type": row_scope_type,
"scope_id": scope_id,
"system": system_key,
"module": module_name,
"action": action,
"created_at": created_at,
}
)
return items, total
def revoke_by_permission_id(self, permission_id: str) -> int:
stmt = delete(UserScopePermission).where(UserScopePermission.id == permission_id)
result = self.db.execute(stmt)
self.db.commit()
return int(result.rowcount or 0)

View File

@@ -0,0 +1,97 @@
from sqlalchemy import func, or_, select
from sqlalchemy.orm import Session
from app.models.role import Role
class RolesRepository:
def __init__(self, db: Session) -> None:
self.db = db
def get_by_key(self, role_key: str) -> Role | None:
return self.db.scalar(select(Role).where(Role.role_key == role_key))
def get_by_id(self, role_id: str) -> Role | None:
return self.db.scalar(select(Role).where(Role.id == role_id))
def list(
self,
*,
keyword: str | None = None,
system_id: str | None = None,
status: str | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[Role], int]:
stmt = select(Role)
count_stmt = select(func.count()).select_from(Role)
if keyword:
pattern = f"%{keyword}%"
cond = or_(
Role.role_key.ilike(pattern),
Role.name.ilike(pattern),
Role.idp_role_name.ilike(pattern),
Role.description.ilike(pattern),
)
stmt = stmt.where(cond)
count_stmt = count_stmt.where(cond)
if system_id:
stmt = stmt.where(Role.system_id == system_id)
count_stmt = count_stmt.where(Role.system_id == system_id)
if status:
stmt = stmt.where(Role.status == status)
count_stmt = count_stmt.where(Role.status == status)
stmt = stmt.order_by(Role.created_at.desc()).limit(limit).offset(offset)
return list(self.db.scalars(stmt).all()), int(self.db.scalar(count_stmt) or 0)
def create(
self,
*,
role_key: str,
system_id: str,
name: str,
description: str | None,
idp_role_name: str,
status: str = "active",
) -> Role:
item = Role(
role_key=role_key,
system_id=system_id,
name=name,
description=description,
idp_role_name=idp_role_name,
status=status,
)
self.db.add(item)
self.db.commit()
self.db.refresh(item)
return item
def update(
self,
item: Role,
*,
system_id: str | None = None,
name: str | None = None,
description: str | None = None,
idp_role_name: str | None = None,
status: str | None = None,
) -> Role:
if system_id is not None:
item.system_id = system_id
if name is not None:
item.name = name
if description is not None:
item.description = description
if idp_role_name is not None:
item.idp_role_name = idp_role_name
if status is not None:
item.status = status
self.db.commit()
self.db.refresh(item)
return item
def delete(self, item: Role) -> None:
self.db.delete(item)
self.db.commit()

View File

@@ -0,0 +1,37 @@
from sqlalchemy import delete, select
from sqlalchemy.orm import Session
from app.models.role import Role
from app.models.site import Site
from app.models.site_role import SiteRole
from app.models.system import System
class SiteRolesRepository:
def __init__(self, db: Session) -> None:
self.db = db
def list_site_role_rows(self, site_id: str) -> list[tuple[SiteRole, Role, System]]:
stmt = (
select(SiteRole, Role, System)
.join(Role, Role.id == SiteRole.role_id)
.join(System, System.id == Role.system_id)
.where(SiteRole.site_id == site_id)
.order_by(System.name.asc(), Role.name.asc())
)
return list(self.db.execute(stmt).all())
def list_role_site_rows(self, role_id: str) -> list[tuple[SiteRole, Site]]:
stmt = (
select(SiteRole, Site)
.join(Site, Site.id == SiteRole.site_id)
.where(SiteRole.role_id == role_id)
.order_by(Site.display_name.asc())
)
return list(self.db.execute(stmt).all())
def set_site_roles(self, *, site_id: str, role_ids: list[str]) -> None:
self.db.execute(delete(SiteRole).where(SiteRole.site_id == site_id))
for role_id in role_ids:
self.db.add(SiteRole(site_id=site_id, role_id=role_id))
self.db.commit()

View File

@@ -9,11 +9,14 @@ class SitesRepository:
self.db = db
def get_by_key(self, site_key: str) -> Site | None:
stmt = select(Site).where(Site.site_key == site_key)
return self.db.scalar(stmt)
return self.db.scalar(select(Site).where(Site.site_key == site_key))
def get_by_id(self, site_id: str) -> Site | None:
return self.db.scalar(select(Site).where(Site.id == site_id))
def list(
self,
*,
keyword: str | None = None,
company_id: str | None = None,
limit: int = 100,
@@ -21,19 +24,30 @@ class SitesRepository:
) -> tuple[list[Site], int]:
stmt = select(Site)
count_stmt = select(func.count()).select_from(Site)
if keyword:
pattern = f"%{keyword}%"
cond = or_(Site.site_key.ilike(pattern), Site.name.ilike(pattern))
cond = or_(Site.site_key.ilike(pattern), Site.display_name.ilike(pattern), Site.domain.ilike(pattern))
stmt = stmt.where(cond)
count_stmt = count_stmt.where(cond)
if company_id:
stmt = stmt.where(Site.company_id == company_id)
count_stmt = count_stmt.where(Site.company_id == company_id)
stmt = stmt.order_by(Site.created_at.desc()).limit(limit).offset(offset)
return list(self.db.scalars(stmt).all()), int(self.db.scalar(count_stmt) or 0)
def create(self, site_key: str, company_id: str, name: str, status: str = "active") -> Site:
item = Site(site_key=site_key, company_id=company_id, name=name, status=status)
def create(
self,
*,
site_key: str,
company_id: str,
display_name: str,
domain: str | None,
status: str = "active",
) -> Site:
item = Site(site_key=site_key, company_id=company_id, display_name=display_name, domain=domain, status=status)
self.db.add(item)
self.db.commit()
self.db.refresh(item)
@@ -44,15 +58,25 @@ class SitesRepository:
item: Site,
*,
company_id: str | None = None,
name: str | None = None,
display_name: str | None = None,
domain: str | None = None,
idp_group_id: str | None = None,
status: str | None = None,
) -> Site:
if company_id is not None:
item.company_id = company_id
if name is not None:
item.name = name
if display_name is not None:
item.display_name = display_name
if domain is not None:
item.domain = domain
if idp_group_id is not None:
item.idp_group_id = idp_group_id
if status is not None:
item.status = status
self.db.commit()
self.db.refresh(item)
return item
def delete(self, item: Site) -> None:
self.db.delete(item)
self.db.commit()

View File

@@ -1,4 +1,4 @@
from sqlalchemy import func, select
from sqlalchemy import func, or_, select
from sqlalchemy.orm import Session
from app.models.system import System
@@ -9,34 +9,51 @@ class SystemsRepository:
self.db = db
def get_by_key(self, system_key: str) -> System | None:
stmt = select(System).where(System.system_key == system_key)
return self.db.scalar(stmt)
return self.db.scalar(select(System).where(System.system_key == system_key))
def get_by_id(self, system_id: str) -> System | None:
stmt = select(System).where(System.id == system_id)
return self.db.scalar(stmt)
return self.db.scalar(select(System).where(System.id == system_id))
def list(self, status: str | None = None, limit: int = 100, offset: int = 0) -> tuple[list[System], int]:
def list(self, *, keyword: str | None = None, status: str | None = None, limit: int = 100, offset: int = 0) -> tuple[list[System], int]:
stmt = select(System)
count_stmt = select(func.count()).select_from(System)
if keyword:
pattern = f"%{keyword}%"
cond = or_(System.system_key.ilike(pattern), System.name.ilike(pattern), System.idp_client_id.ilike(pattern))
stmt = stmt.where(cond)
count_stmt = count_stmt.where(cond)
if status:
stmt = stmt.where(System.status == status)
count_stmt = count_stmt.where(System.status == status)
stmt = stmt.order_by(System.created_at.desc()).limit(limit).offset(offset)
return list(self.db.scalars(stmt).all()), int(self.db.scalar(count_stmt) or 0)
def create(self, system_key: str, name: str, status: str = "active") -> System:
item = System(system_key=system_key, name=name, status=status)
def create(self, *, system_key: str, name: str, idp_client_id: str, status: str = "active") -> System:
item = System(system_key=system_key, name=name, idp_client_id=idp_client_id, status=status)
self.db.add(item)
self.db.commit()
self.db.refresh(item)
return item
def update(self, item: System, *, name: str | None = None, status: str | None = None) -> System:
def update(
self,
item: System,
*,
name: str | None = None,
idp_client_id: str | None = None,
status: str | None = None,
) -> System:
if name is not None:
item.name = name
if idp_client_id is not None:
item.idp_client_id = idp_client_id
if status is not None:
item.status = status
self.db.commit()
self.db.refresh(item)
return item
def delete(self, item: System) -> None:
self.db.delete(item)
self.db.commit()

View File

@@ -0,0 +1,54 @@
from sqlalchemy import delete, select
from sqlalchemy.orm import Session
from app.models.company import Company
from app.models.role import Role
from app.models.site import Site
from app.models.site_role import SiteRole
from app.models.system import System
from app.models.user import User
from app.models.user_site import UserSite
class UserSitesRepository:
def __init__(self, db: Session) -> None:
self.db = db
def list_user_site_rows(self, user_id: str) -> list[tuple[UserSite, Site, Company]]:
stmt = (
select(UserSite, Site, Company)
.join(Site, Site.id == UserSite.site_id)
.join(Company, Company.id == Site.company_id)
.where(UserSite.user_id == user_id)
.order_by(Company.display_name.asc(), Site.display_name.asc())
)
return list(self.db.execute(stmt).all())
def list_site_member_rows(self, site_id: str) -> list[tuple[UserSite, User]]:
stmt = (
select(UserSite, User)
.join(User, User.id == UserSite.user_id)
.where(UserSite.site_id == site_id)
.order_by(User.display_name.asc().nulls_last(), User.username.asc().nulls_last(), User.user_sub.asc())
)
return list(self.db.execute(stmt).all())
def set_user_sites(self, *, user_id: str, site_ids: list[str]) -> None:
self.db.execute(delete(UserSite).where(UserSite.user_id == user_id))
for site_id in site_ids:
self.db.add(UserSite(user_id=user_id, site_id=site_id))
self.db.commit()
def get_user_role_rows(self, user_id: str) -> list[tuple[Site, Company, Role, System]]:
stmt = (
select(Site, Company, Role, System)
.select_from(UserSite)
.join(Site, Site.id == UserSite.site_id)
.join(Company, Company.id == Site.company_id)
.join(SiteRole, SiteRole.site_id == Site.id)
.join(Role, Role.id == SiteRole.role_id)
.join(System, System.id == Role.system_id)
.where(UserSite.user_id == user_id)
.order_by(Company.display_name.asc(), Site.display_name.asc(), System.name.asc(), Role.name.asc())
)
return list(self.db.execute(stmt).all())

View File

@@ -9,15 +9,14 @@ class UsersRepository:
self.db = db
def get_by_sub(self, user_sub: str) -> User | None:
stmt = select(User).where(User.user_sub == user_sub)
return self.db.scalar(stmt)
return self.db.scalar(select(User).where(User.user_sub == user_sub))
def get_by_id(self, user_id: str) -> User | None:
stmt = select(User).where(User.id == user_id)
return self.db.scalar(stmt)
return self.db.scalar(select(User).where(User.id == user_id))
def list(
self,
*,
keyword: str | None = None,
is_active: bool | None = None,
limit: int = 50,
@@ -48,11 +47,13 @@ class UsersRepository:
def upsert_by_sub(
self,
*,
user_sub: str,
username: str | None,
email: str | None,
display_name: str | None,
is_active: bool,
status: str = "active",
idp_user_id: str | None = None,
) -> User:
user = self.get_by_sub(user_sub)
@@ -64,6 +65,7 @@ class UsersRepository:
email=email,
display_name=display_name,
is_active=is_active,
status=status,
)
self.db.add(user)
else:
@@ -73,6 +75,7 @@ class UsersRepository:
user.email = email
user.display_name = display_name
user.is_active = is_active
user.status = status
self.db.commit()
self.db.refresh(user)
@@ -86,6 +89,7 @@ class UsersRepository:
email: str | None = None,
display_name: str | None = None,
is_active: bool | None = None,
status: str | None = None,
) -> User:
if username is not None:
user.username = username
@@ -95,6 +99,13 @@ class UsersRepository:
user.display_name = display_name
if is_active is not None:
user.is_active = is_active
if status is not None:
user.status = status
self.db.commit()
self.db.refresh(user)
return user
def delete(self, user: User) -> None:
self.db.delete(user)
self.db.commit()

View File

@@ -1,113 +1,6 @@
from pydantic import BaseModel
from typing import Literal
from datetime import datetime
class SystemCreateRequest(BaseModel):
name: str
status: str = "active"
class SystemUpdateRequest(BaseModel):
name: str | None = None
status: str | None = None
class SystemItem(BaseModel):
id: str
system_key: str
name: str
status: str
class ModuleCreateRequest(BaseModel):
system_key: str
name: str
status: str = "active"
class ModuleUpdateRequest(BaseModel):
name: str | None = None
status: str | None = None
class ModuleItem(BaseModel):
id: str
system_key: str | None = None
module_key: str
name: str
status: str
class CompanyCreateRequest(BaseModel):
name: str
status: str = "active"
class CompanyUpdateRequest(BaseModel):
name: str | None = None
status: str | None = None
class CompanyItem(BaseModel):
id: str
company_key: str
name: str
status: str
class SiteCreateRequest(BaseModel):
company_key: str
name: str
status: str = "active"
class SiteUpdateRequest(BaseModel):
company_key: str | None = None
name: str | None = None
status: str | None = None
class SiteItem(BaseModel):
id: str
site_key: str
company_key: str
name: str
status: str
class MemberItem(BaseModel):
id: str
user_sub: str
username: str | None = None
email: str | None = None
display_name: str | None = None
is_active: bool
class MemberUpsertRequest(BaseModel):
user_sub: str | None = None
username: str | None = None
email: str | None = None
display_name: str | None = None
is_active: bool = True
sync_to_idp: bool = True
class MemberUpdateRequest(BaseModel):
username: str | None = None
email: str | None = None
display_name: str | None = None
is_active: bool | None = None
sync_to_idp: bool = True
class MemberPasswordResetResponse(BaseModel):
user_sub: str
temporary_password: str
class MemberPermissionGroupsUpdateRequest(BaseModel):
group_keys: list[str]
from pydantic import BaseModel, Field
class ListResponse(BaseModel):
@@ -117,62 +10,217 @@ class ListResponse(BaseModel):
offset: int
class PermissionGroupCreateRequest(BaseModel):
name: str
class CompanyCreateRequest(BaseModel):
display_name: str
legal_name: str | None = None
status: str = "active"
class PermissionGroupUpdateRequest(BaseModel):
name: str | None = None
class CompanyUpdateRequest(BaseModel):
display_name: str | None = None
legal_name: str | None = None
idp_group_id: str | None = None
status: str | None = None
class PermissionGroupItem(BaseModel):
class CompanyItem(BaseModel):
id: str
group_key: str
company_key: str
display_name: str
legal_name: str | None = None
idp_group_id: str | None = None
status: str
class SiteCreateRequest(BaseModel):
company_key: str
display_name: str
domain: str | None = None
status: str = "active"
class SiteUpdateRequest(BaseModel):
company_key: str | None = None
display_name: str | None = None
domain: str | None = None
idp_group_id: str | None = None
status: str | None = None
class SiteItem(BaseModel):
id: str
site_key: str
company_key: str
company_display_name: str
display_name: str
domain: str | None = None
idp_group_id: str | None = None
status: str
class SystemCreateRequest(BaseModel):
name: str
status: str
idp_client_id: str
status: str = "active"
class PermissionGroupPermissionItem(BaseModel):
class SystemUpdateRequest(BaseModel):
name: str | None = None
idp_client_id: str | None = None
status: str | None = None
class SystemItem(BaseModel):
id: str
system: str
module: str
action: Literal["view", "edit"]
scope_type: Literal["site"]
scope_id: str
class MemberPermissionGroupsResponse(BaseModel):
user_sub: str
group_keys: list[str]
class GroupBindingUpdateRequest(BaseModel):
site_keys: list[str]
system_keys: list[str]
module_keys: list[str]
member_subs: list[str]
actions: list[Literal["view", "edit"]]
class GroupBindingSnapshot(BaseModel):
group_key: str
site_keys: list[str]
system_keys: list[str]
module_keys: list[str]
member_subs: list[str]
actions: list[Literal["view", "edit"]]
class GroupRelationItem(BaseModel):
group_key: str
group_name: str
system_key: str
name: str
idp_client_id: str
status: str
class MemberRelationItem(BaseModel):
class RoleCreateRequest(BaseModel):
system_key: str
name: str
idp_role_name: str
description: str | None = None
status: str = "active"
class RoleUpdateRequest(BaseModel):
system_key: str | None = None
name: str | None = None
idp_role_name: str | None = None
description: str | None = None
status: str | None = None
class RoleItem(BaseModel):
id: str
role_key: str
system_key: str
system_name: str
name: str
idp_role_name: str
description: str | None = None
status: str
class MemberItem(BaseModel):
id: str
user_sub: str
idp_user_id: str | None = None
username: str | None = None
email: str | None = None
display_name: str | None = None
is_active: bool
status: str
class MemberUpsertRequest(BaseModel):
user_sub: str | None = None
username: str | None = None
email: str | None = None
display_name: str | None = None
is_active: bool = True
status: str = "active"
sync_to_idp: bool = True
class MemberUpdateRequest(BaseModel):
username: str | None = None
email: str | None = None
display_name: str | None = None
is_active: bool | None = None
status: str | None = None
sync_to_idp: bool = True
class MemberPasswordResetResponse(BaseModel):
user_sub: str
temporary_password: str
class SiteRoleAssignRequest(BaseModel):
role_keys: list[str] = Field(default_factory=list)
class SiteRoleItem(BaseModel):
id: str
role_key: str
role_name: str
system_key: str
system_name: str
class UserSiteAssignRequest(BaseModel):
site_keys: list[str] = Field(default_factory=list)
class UserSiteItem(BaseModel):
id: str
site_key: str
site_display_name: str
company_key: str
company_display_name: str
class UserEffectiveRoleItem(BaseModel):
site_key: str
site_display_name: str
company_key: str
company_display_name: str
system_key: str
system_name: str
role_key: str
role_name: str
idp_role_name: str
class UserEffectiveRolesResponse(BaseModel):
user_sub: str
roles: list[UserEffectiveRoleItem]
class SiteMembersResponse(BaseModel):
site_key: str
members: list[MemberItem]
class SiteRolesResponse(BaseModel):
site_key: str
roles: list[SiteRoleItem]
class UserSitesResponse(BaseModel):
user_sub: str
sites: list[UserSiteItem]
class CompanySitesResponse(BaseModel):
company_key: str
sites: list[SiteItem]
class SystemRolesResponse(BaseModel):
system_key: str
roles: list[RoleItem]
class RoleSitesResponse(BaseModel):
role_key: str
sites: list[UserSiteItem]
class ApiClientItem(BaseModel):
id: str
client_key: str
name: str
status: str
allowed_origins: list[str] = Field(default_factory=list)
allowed_ips: list[str] = Field(default_factory=list)
allowed_paths: list[str] = Field(default_factory=list)
rate_limit_per_min: int | None = None
expires_at: datetime | None = None
last_used_at: datetime | None = None
created_at: datetime
updated_at: datetime

View File

@@ -5,6 +5,7 @@ class InternalSystemItem(BaseModel):
id: str
system_key: str
name: str
idp_client_id: str
status: str
@@ -15,16 +16,19 @@ class InternalSystemListResponse(BaseModel):
offset: int
class InternalModuleItem(BaseModel):
class InternalRoleItem(BaseModel):
id: str
module_key: str
role_key: str
system_key: str
system_name: str
name: str
idp_role_name: str
description: str | None = None
status: str
class InternalModuleListResponse(BaseModel):
items: list[InternalModuleItem]
class InternalRoleListResponse(BaseModel):
items: list[InternalRoleItem]
total: int
limit: int
offset: int
@@ -33,7 +37,8 @@ class InternalModuleListResponse(BaseModel):
class InternalCompanyItem(BaseModel):
id: str
company_key: str
name: str
display_name: str
legal_name: str | None = None
status: str
@@ -47,8 +52,10 @@ class InternalCompanyListResponse(BaseModel):
class InternalSiteItem(BaseModel):
id: str
site_key: str
company_key: str | None = None
name: str
company_key: str
company_display_name: str
display_name: str
domain: str | None = None
status: str
@@ -66,6 +73,7 @@ class InternalMemberItem(BaseModel):
email: str | None = None
display_name: str | None = None
is_active: bool
status: str
class InternalMemberListResponse(BaseModel):
@@ -83,3 +91,21 @@ class InternalUpsertUserBySubResponse(BaseModel):
email: str | None = None
display_name: str | None = None
is_active: bool
status: str
class InternalUserRoleItem(BaseModel):
site_key: str
site_display_name: str
company_key: str
company_display_name: str
system_key: str
system_name: str
role_key: str
role_name: str
idp_role_name: str
class InternalUserRoleResponse(BaseModel):
user_sub: str
roles: list[InternalUserRoleItem]

View File

@@ -1,60 +1,18 @@
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
ActionType = Literal["view", "edit"]
ScopeType = Literal["site"]
class RoleSnapshotItem(BaseModel):
site_key: str
site_display_name: str
company_key: str
company_display_name: str
system_key: str
system_name: str
role_key: str
role_name: str
idp_role_name: str
class PermissionGrantRequest(BaseModel):
class RoleSnapshotResponse(BaseModel):
user_sub: str
email: str | None = None
display_name: str | None = None
scope_type: ScopeType
scope_id: str
system: str
module: str | None = None
action: ActionType
class PermissionRevokeRequest(BaseModel):
user_sub: str
scope_type: ScopeType
scope_id: str
system: str
module: str | None = None
action: ActionType
class PermissionItem(BaseModel):
scope_type: ScopeType
scope_id: str
system: str | None = None
module: str
action: ActionType
class PermissionSnapshotResponse(BaseModel):
user_sub: str
permissions: list[PermissionItem]
class DirectPermissionRow(BaseModel):
permission_id: str
user_sub: str
email: str | None = None
display_name: str | None = None
scope_type: ScopeType
scope_id: str
system: str | None = None
module: str | None = None
action: ActionType
created_at: datetime
class DirectPermissionListResponse(BaseModel):
items: list[DirectPermissionRow]
total: int
limit: int
offset: int
roles: list[RoleSnapshotItem]

View File

@@ -7,3 +7,4 @@ class UserUpsertBySubRequest(BaseModel):
email: str | None = None
display_name: str | None = None
is_active: bool = True
status: str = "active"

View File

@@ -1,13 +1,33 @@
from app.schemas.permissions import PermissionItem, PermissionSnapshotResponse
from app.schemas.permissions import RoleSnapshotItem, RoleSnapshotResponse
class PermissionService:
@staticmethod
def build_snapshot(user_sub: str, permissions: list[tuple[str, str, str | None, str, str]]) -> PermissionSnapshotResponse:
return PermissionSnapshotResponse(
def build_role_snapshot(user_sub: str, rows: list[tuple[str, str, str, str, str, str, str, str, str]]) -> RoleSnapshotResponse:
return RoleSnapshotResponse(
user_sub=user_sub,
permissions=[
PermissionItem(scope_type=s_type, scope_id=s_id, system=system, module=module, action=action)
for s_type, s_id, system, module, action in permissions
roles=[
RoleSnapshotItem(
site_key=site_key,
site_display_name=site_display_name,
company_key=company_key,
company_display_name=company_display_name,
system_key=system_key,
system_name=system_name,
role_key=role_key,
role_name=role_name,
idp_role_name=idp_role_name,
)
for (
site_key,
site_display_name,
company_key,
company_display_name,
system_key,
system_name,
role_key,
role_name,
idp_role_name,
) in rows
],
)