from sqlalchemy import and_, delete, func, literal, or_, select from sqlalchemy.orm import Session from app.models.company import Company from app.models.module import Module from app.models.permission_group_member import PermissionGroupMember from app.models.permission_group_permission import PermissionGroupPermission from app.models.site import Site from app.models.user import User from app.models.user_scope_permission import UserScopePermission class PermissionsRepository: def __init__(self, db: Session) -> None: self.db = db def list_by_user(self, user_id: str, authentik_sub: str) -> list[tuple[str, str, str | None, str, str]]: direct_stmt = ( select( literal("direct"), UserScopePermission.scope_type, Company.company_key, Site.site_key, Module.system_key, Module.module_key, UserScopePermission.action, ) .select_from(UserScopePermission) .join(Module, Module.id == UserScopePermission.module_id) .join(Company, Company.id == UserScopePermission.company_id, isouter=True) .join(Site, Site.id == UserScopePermission.site_id, isouter=True) .where(UserScopePermission.user_id == user_id) .where(UserScopePermission.action.in_(["view", "edit"])) .where(UserScopePermission.scope_type == "site") ) group_stmt = ( select( literal("group"), PermissionGroupPermission.scope_type, PermissionGroupPermission.scope_id, PermissionGroupPermission.system, PermissionGroupPermission.module, PermissionGroupPermission.action, ) .select_from(PermissionGroupPermission) .join(PermissionGroupMember, PermissionGroupMember.group_id == PermissionGroupPermission.group_id) .where(PermissionGroupMember.authentik_sub == authentik_sub) .where(PermissionGroupPermission.action.in_(["view", "edit"])) .where(PermissionGroupPermission.scope_type == "site") ) rows = self.db.execute(direct_stmt).all() + self.db.execute(group_stmt).all() result: list[tuple[str, str, str | None, str, str]] = [] dedup = set() for row in rows: source = row[0] if source == "group": _, scope_type, scope_id, system_key, module_key, action = row if module_key == "__system__": module_key = f"__system__{system_key}" else: _, scope_type, company_key, site_key, system_key, module_key, action = row scope_id = company_key if scope_type == "company" else site_key key = (scope_type, scope_id or "", system_key, module_key, action) if key in dedup: continue dedup.add(key) result.append(key) return result def create_if_not_exists( self, user_id: str, module_id: str, action: str, scope_type: str, company_id: str | None, site_id: str | None, ) -> UserScopePermission: where_expr = [ UserScopePermission.user_id == user_id, UserScopePermission.module_id == module_id, UserScopePermission.action == action, UserScopePermission.scope_type == scope_type, ] if scope_type == "company": where_expr.append(UserScopePermission.company_id == company_id) else: where_expr.append(UserScopePermission.site_id == site_id) existing = self.db.scalar(select(UserScopePermission).where(and_(*where_expr))) if existing: return existing item = UserScopePermission( user_id=user_id, module_id=module_id, action=action, scope_type=scope_type, company_id=company_id, site_id=site_id, ) self.db.add(item) self.db.commit() self.db.refresh(item) return item def revoke( self, user_id: str, module_id: str, action: str, scope_type: str, company_id: str | None, site_id: str | None, ) -> int: stmt = delete(UserScopePermission).where( UserScopePermission.user_id == user_id, UserScopePermission.module_id == module_id, UserScopePermission.action == action, UserScopePermission.scope_type == scope_type, or_( and_(scope_type == "company", UserScopePermission.company_id == company_id), and_(scope_type == "site", UserScopePermission.site_id == site_id), ), ) result = self.db.execute(stmt) self.db.commit() return int(result.rowcount or 0) def list_direct_permissions( self, *, keyword: str | None = None, scope_type: str | None = None, limit: int = 200, offset: int = 0, ) -> tuple[list[dict], int]: stmt = ( select( UserScopePermission.id, User.authentik_sub, User.email, User.display_name, UserScopePermission.scope_type, Company.company_key, Site.site_key, Module.system_key, Module.module_key, UserScopePermission.action, UserScopePermission.created_at, ) .select_from(UserScopePermission) .join(User, User.id == UserScopePermission.user_id) .join(Module, Module.id == UserScopePermission.module_id) .join(Company, Company.id == UserScopePermission.company_id, isouter=True) .join(Site, Site.id == UserScopePermission.site_id, isouter=True) .where(UserScopePermission.action.in_(["view", "edit"])) .where(UserScopePermission.scope_type == "site") ) count_stmt = ( select(func.count()) .select_from(UserScopePermission) .join(User, User.id == UserScopePermission.user_id) .join(Module, Module.id == UserScopePermission.module_id) .join(Company, Company.id == UserScopePermission.company_id, isouter=True) .join(Site, Site.id == UserScopePermission.site_id, isouter=True) .where(UserScopePermission.action.in_(["view", "edit"])) .where(UserScopePermission.scope_type == "site") ) if scope_type == "site": stmt = stmt.where(UserScopePermission.scope_type == scope_type) count_stmt = count_stmt.where(UserScopePermission.scope_type == scope_type) if keyword: pattern = f"%{keyword}%" cond = or_( User.authentik_sub.ilike(pattern), User.email.ilike(pattern), User.display_name.ilike(pattern), Module.module_key.ilike(pattern), Company.company_key.ilike(pattern), Site.site_key.ilike(pattern), UserScopePermission.action.ilike(pattern), ) stmt = stmt.where(cond) count_stmt = count_stmt.where(cond) stmt = stmt.order_by(UserScopePermission.created_at.desc()).limit(limit).offset(offset) rows = self.db.execute(stmt).all() total = int(self.db.scalar(count_stmt) or 0) items: list[dict] = [] for row in rows: ( permission_id, authentik_sub, email, display_name, row_scope_type, company_key, site_key, system_key, module_key, action, created_at, ) = row scope_id = company_key if row_scope_type == "company" else site_key module_name = module_key if isinstance(module_name, str) and module_name.startswith("__system__"): module_name = None items.append( { "permission_id": permission_id, "authentik_sub": authentik_sub, "email": email, "display_name": display_name, "scope_type": row_scope_type, "scope_id": scope_id, "system": system_key, "module": module_name, "action": action, "created_at": created_at, } ) return items, total def revoke_by_permission_id(self, permission_id: str) -> int: stmt = delete(UserScopePermission).where(UserScopePermission.id == permission_id) result = self.db.execute(stmt) self.db.commit() return int(result.rowcount or 0)