232 lines
8.8 KiB
Python
232 lines
8.8 KiB
Python
from sqlalchemy import and_, delete, func, literal, or_, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.company import Company
|
|
from app.models.module import Module
|
|
from app.models.permission_group_member import PermissionGroupMember
|
|
from app.models.permission_group_permission import PermissionGroupPermission
|
|
from app.models.site import Site
|
|
from app.models.user import User
|
|
from app.models.user_scope_permission import UserScopePermission
|
|
|
|
|
|
class PermissionsRepository:
|
|
def __init__(self, db: Session) -> None:
|
|
self.db = db
|
|
|
|
def list_by_user(self, user_id: str, user_sub: str) -> list[tuple[str, str, str | None, str, str]]:
|
|
direct_stmt = (
|
|
select(
|
|
literal("direct"),
|
|
UserScopePermission.scope_type,
|
|
Company.company_key,
|
|
Site.site_key,
|
|
Module.system_key,
|
|
Module.module_key,
|
|
UserScopePermission.action,
|
|
)
|
|
.select_from(UserScopePermission)
|
|
.join(Module, Module.id == UserScopePermission.module_id)
|
|
.join(Company, Company.id == UserScopePermission.company_id, isouter=True)
|
|
.join(Site, Site.id == UserScopePermission.site_id, isouter=True)
|
|
.where(UserScopePermission.user_id == user_id)
|
|
.where(UserScopePermission.action.in_(["view", "edit"]))
|
|
.where(UserScopePermission.scope_type == "site")
|
|
)
|
|
group_stmt = (
|
|
select(
|
|
literal("group"),
|
|
PermissionGroupPermission.scope_type,
|
|
PermissionGroupPermission.scope_id,
|
|
PermissionGroupPermission.system,
|
|
PermissionGroupPermission.module,
|
|
PermissionGroupPermission.action,
|
|
)
|
|
.select_from(PermissionGroupPermission)
|
|
.join(PermissionGroupMember, PermissionGroupMember.group_id == PermissionGroupPermission.group_id)
|
|
.where(PermissionGroupMember.user_sub == user_sub)
|
|
.where(PermissionGroupPermission.action.in_(["view", "edit"]))
|
|
.where(PermissionGroupPermission.scope_type == "site")
|
|
)
|
|
rows = self.db.execute(direct_stmt).all() + self.db.execute(group_stmt).all()
|
|
result: list[tuple[str, str, str | None, str, str]] = []
|
|
dedup = set()
|
|
for row in rows:
|
|
source = row[0]
|
|
if source == "group":
|
|
_, scope_type, scope_id, system_key, module_key, action = row
|
|
if module_key == "__system__":
|
|
module_key = f"__system__{system_key}"
|
|
else:
|
|
_, scope_type, company_key, site_key, system_key, module_key, action = row
|
|
scope_id = company_key if scope_type == "company" else site_key
|
|
key = (scope_type, scope_id or "", system_key, module_key, action)
|
|
if key in dedup:
|
|
continue
|
|
dedup.add(key)
|
|
result.append(key)
|
|
return result
|
|
|
|
def create_if_not_exists(
|
|
self,
|
|
user_id: str,
|
|
module_id: str,
|
|
action: str,
|
|
scope_type: str,
|
|
company_id: str | None,
|
|
site_id: str | None,
|
|
) -> UserScopePermission:
|
|
where_expr = [
|
|
UserScopePermission.user_id == user_id,
|
|
UserScopePermission.module_id == module_id,
|
|
UserScopePermission.action == action,
|
|
UserScopePermission.scope_type == scope_type,
|
|
]
|
|
if scope_type == "company":
|
|
where_expr.append(UserScopePermission.company_id == company_id)
|
|
else:
|
|
where_expr.append(UserScopePermission.site_id == site_id)
|
|
|
|
existing = self.db.scalar(select(UserScopePermission).where(and_(*where_expr)))
|
|
if existing:
|
|
return existing
|
|
|
|
item = UserScopePermission(
|
|
user_id=user_id,
|
|
module_id=module_id,
|
|
action=action,
|
|
scope_type=scope_type,
|
|
company_id=company_id,
|
|
site_id=site_id,
|
|
)
|
|
self.db.add(item)
|
|
self.db.commit()
|
|
self.db.refresh(item)
|
|
return item
|
|
|
|
def revoke(
|
|
self,
|
|
user_id: str,
|
|
module_id: str,
|
|
action: str,
|
|
scope_type: str,
|
|
company_id: str | None,
|
|
site_id: str | None,
|
|
) -> int:
|
|
stmt = delete(UserScopePermission).where(
|
|
UserScopePermission.user_id == user_id,
|
|
UserScopePermission.module_id == module_id,
|
|
UserScopePermission.action == action,
|
|
UserScopePermission.scope_type == scope_type,
|
|
or_(
|
|
and_(scope_type == "company", UserScopePermission.company_id == company_id),
|
|
and_(scope_type == "site", UserScopePermission.site_id == site_id),
|
|
),
|
|
)
|
|
result = self.db.execute(stmt)
|
|
self.db.commit()
|
|
return int(result.rowcount or 0)
|
|
|
|
def list_direct_permissions(
|
|
self,
|
|
*,
|
|
keyword: str | None = None,
|
|
scope_type: str | None = None,
|
|
limit: int = 200,
|
|
offset: int = 0,
|
|
) -> tuple[list[dict], int]:
|
|
stmt = (
|
|
select(
|
|
UserScopePermission.id,
|
|
User.user_sub,
|
|
User.email,
|
|
User.display_name,
|
|
UserScopePermission.scope_type,
|
|
Company.company_key,
|
|
Site.site_key,
|
|
Module.system_key,
|
|
Module.module_key,
|
|
UserScopePermission.action,
|
|
UserScopePermission.created_at,
|
|
)
|
|
.select_from(UserScopePermission)
|
|
.join(User, User.id == UserScopePermission.user_id)
|
|
.join(Module, Module.id == UserScopePermission.module_id)
|
|
.join(Company, Company.id == UserScopePermission.company_id, isouter=True)
|
|
.join(Site, Site.id == UserScopePermission.site_id, isouter=True)
|
|
.where(UserScopePermission.action.in_(["view", "edit"]))
|
|
.where(UserScopePermission.scope_type == "site")
|
|
)
|
|
count_stmt = (
|
|
select(func.count())
|
|
.select_from(UserScopePermission)
|
|
.join(User, User.id == UserScopePermission.user_id)
|
|
.join(Module, Module.id == UserScopePermission.module_id)
|
|
.join(Company, Company.id == UserScopePermission.company_id, isouter=True)
|
|
.join(Site, Site.id == UserScopePermission.site_id, isouter=True)
|
|
.where(UserScopePermission.action.in_(["view", "edit"]))
|
|
.where(UserScopePermission.scope_type == "site")
|
|
)
|
|
|
|
if scope_type == "site":
|
|
stmt = stmt.where(UserScopePermission.scope_type == scope_type)
|
|
count_stmt = count_stmt.where(UserScopePermission.scope_type == scope_type)
|
|
|
|
if keyword:
|
|
pattern = f"%{keyword}%"
|
|
cond = or_(
|
|
User.user_sub.ilike(pattern),
|
|
User.email.ilike(pattern),
|
|
User.display_name.ilike(pattern),
|
|
Module.module_key.ilike(pattern),
|
|
Company.company_key.ilike(pattern),
|
|
Site.site_key.ilike(pattern),
|
|
UserScopePermission.action.ilike(pattern),
|
|
)
|
|
stmt = stmt.where(cond)
|
|
count_stmt = count_stmt.where(cond)
|
|
|
|
stmt = stmt.order_by(UserScopePermission.created_at.desc()).limit(limit).offset(offset)
|
|
rows = self.db.execute(stmt).all()
|
|
total = int(self.db.scalar(count_stmt) or 0)
|
|
items: list[dict] = []
|
|
for row in rows:
|
|
(
|
|
permission_id,
|
|
user_sub,
|
|
email,
|
|
display_name,
|
|
row_scope_type,
|
|
company_key,
|
|
site_key,
|
|
system_key,
|
|
module_key,
|
|
action,
|
|
created_at,
|
|
) = row
|
|
scope_id = company_key if row_scope_type == "company" else site_key
|
|
module_name = module_key
|
|
if isinstance(module_name, str) and module_name.startswith("__system__"):
|
|
module_name = None
|
|
items.append(
|
|
{
|
|
"permission_id": permission_id,
|
|
"user_sub": user_sub,
|
|
"email": email,
|
|
"display_name": display_name,
|
|
"scope_type": row_scope_type,
|
|
"scope_id": scope_id,
|
|
"system": system_key,
|
|
"module": module_name,
|
|
"action": action,
|
|
"created_at": created_at,
|
|
}
|
|
)
|
|
return items, total
|
|
|
|
def revoke_by_permission_id(self, permission_id: str) -> int:
|
|
stmt = delete(UserScopePermission).where(UserScopePermission.id == permission_id)
|
|
result = self.db.execute(stmt)
|
|
self.db.commit()
|
|
return int(result.rowcount or 0)
|