feat(admin): implement group-centric relations and system/module/company linkage views
This commit is contained in:
@@ -1,11 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
from sqlalchemy import delete, func, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.models.permission_group import PermissionGroup
|
||||
from app.models.permission_group_member import PermissionGroupMember
|
||||
from app.models.permission_group_permission import PermissionGroupPermission
|
||||
from app.models.user import User
|
||||
|
||||
|
||||
class PermissionGroupsRepository:
|
||||
@@ -125,6 +128,128 @@ class PermissionGroupsRepository:
|
||||
)
|
||||
return list(self.db.scalars(stmt).all())
|
||||
|
||||
def replace_group_bindings(
|
||||
self,
|
||||
*,
|
||||
group_id: str,
|
||||
site_keys: list[str],
|
||||
system_keys: list[str],
|
||||
module_keys: list[str],
|
||||
member_subs: list[str],
|
||||
actions: list[str],
|
||||
) -> None:
|
||||
normalized_sites = list(dict.fromkeys([s for s in site_keys if s]))
|
||||
normalized_actions = [a for a in list(dict.fromkeys(actions)) if a in {"view", "edit"}]
|
||||
normalized_member_subs = list(dict.fromkeys([s for s in member_subs if s]))
|
||||
|
||||
modules_by_system: dict[str, list[str]] = defaultdict(list)
|
||||
for full_module_key in list(dict.fromkeys([m for m in module_keys if m])):
|
||||
if "." not in full_module_key:
|
||||
continue
|
||||
system_key, module_name = full_module_key.split(".", 1)
|
||||
if module_name == "__system__":
|
||||
continue
|
||||
modules_by_system[system_key].append(module_name)
|
||||
|
||||
normalized_systems = set([s for s in system_keys if s])
|
||||
normalized_systems.update(modules_by_system.keys())
|
||||
|
||||
self.db.execute(delete(PermissionGroupPermission).where(PermissionGroupPermission.group_id == group_id))
|
||||
self.db.execute(delete(PermissionGroupMember).where(PermissionGroupMember.group_id == group_id))
|
||||
|
||||
for sub in normalized_member_subs:
|
||||
self.db.add(PermissionGroupMember(group_id=group_id, authentik_sub=sub))
|
||||
|
||||
for site_key in normalized_sites:
|
||||
for action in normalized_actions:
|
||||
for system_key in sorted(normalized_systems):
|
||||
module_names = modules_by_system.get(system_key) or ["__system__"]
|
||||
for module_name in module_names:
|
||||
self.db.add(
|
||||
PermissionGroupPermission(
|
||||
group_id=group_id,
|
||||
system=system_key,
|
||||
module=module_name,
|
||||
action=action,
|
||||
scope_type="site",
|
||||
scope_id=site_key,
|
||||
)
|
||||
)
|
||||
|
||||
self.db.commit()
|
||||
|
||||
def get_group_binding_snapshot(self, group_id: str, group_key: str) -> dict:
|
||||
permissions = self.list_group_permissions(group_id)
|
||||
site_keys = sorted({p.scope_id for p in permissions if p.scope_type == "site"})
|
||||
system_keys = sorted({p.system for p in permissions})
|
||||
actions = sorted({p.action for p in permissions if p.action in {"view", "edit"}})
|
||||
module_keys = sorted(
|
||||
{
|
||||
p.module if "." in p.module else f"{p.system}.{p.module}"
|
||||
for p in permissions
|
||||
if p.module and p.module != "__system__"
|
||||
}
|
||||
)
|
||||
member_subs = sorted(self.list_group_member_subs(group_id))
|
||||
return {
|
||||
"group_key": group_key,
|
||||
"site_keys": site_keys,
|
||||
"system_keys": system_keys,
|
||||
"module_keys": module_keys,
|
||||
"member_subs": member_subs,
|
||||
"actions": actions,
|
||||
}
|
||||
|
||||
def list_group_member_subs(self, group_id: str) -> list[str]:
|
||||
stmt = (
|
||||
select(PermissionGroupMember.authentik_sub)
|
||||
.where(PermissionGroupMember.group_id == group_id)
|
||||
.order_by(PermissionGroupMember.authentik_sub.asc())
|
||||
)
|
||||
return [row[0] for row in self.db.execute(stmt).all()]
|
||||
|
||||
def list_system_groups(self, system_key: str) -> list[PermissionGroup]:
|
||||
stmt = (
|
||||
select(PermissionGroup)
|
||||
.join(PermissionGroupPermission, PermissionGroupPermission.group_id == PermissionGroup.id)
|
||||
.where(PermissionGroupPermission.system == system_key)
|
||||
.order_by(PermissionGroup.name.asc())
|
||||
.distinct()
|
||||
)
|
||||
return list(self.db.scalars(stmt).all())
|
||||
|
||||
def list_system_members(self, system_key: str) -> list[User]:
|
||||
stmt = (
|
||||
select(User)
|
||||
.join(PermissionGroupMember, PermissionGroupMember.authentik_sub == User.authentik_sub)
|
||||
.join(PermissionGroupPermission, PermissionGroupPermission.group_id == PermissionGroupMember.group_id)
|
||||
.where(PermissionGroupPermission.system == system_key)
|
||||
.order_by(User.email.asc(), User.authentik_sub.asc())
|
||||
.distinct()
|
||||
)
|
||||
return list(self.db.scalars(stmt).all())
|
||||
|
||||
def list_module_groups(self, system_key: str, module_name: str) -> list[PermissionGroup]:
|
||||
stmt = (
|
||||
select(PermissionGroup)
|
||||
.join(PermissionGroupPermission, PermissionGroupPermission.group_id == PermissionGroup.id)
|
||||
.where(PermissionGroupPermission.system == system_key, PermissionGroupPermission.module == module_name)
|
||||
.order_by(PermissionGroup.name.asc())
|
||||
.distinct()
|
||||
)
|
||||
return list(self.db.scalars(stmt).all())
|
||||
|
||||
def list_module_members(self, system_key: str, module_name: str) -> list[User]:
|
||||
stmt = (
|
||||
select(User)
|
||||
.join(PermissionGroupMember, PermissionGroupMember.authentik_sub == User.authentik_sub)
|
||||
.join(PermissionGroupPermission, PermissionGroupPermission.group_id == PermissionGroupMember.group_id)
|
||||
.where(PermissionGroupPermission.system == system_key, PermissionGroupPermission.module == module_name)
|
||||
.order_by(User.email.asc(), User.authentik_sub.asc())
|
||||
.distinct()
|
||||
)
|
||||
return list(self.db.scalars(stmt).all())
|
||||
|
||||
def revoke_group_permission(
|
||||
self,
|
||||
group_id: str,
|
||||
|
||||
@@ -29,6 +29,8 @@ class PermissionsRepository:
|
||||
.join(Company, Company.id == UserScopePermission.company_id, isouter=True)
|
||||
.join(Site, Site.id == UserScopePermission.site_id, isouter=True)
|
||||
.where(UserScopePermission.user_id == user_id)
|
||||
.where(UserScopePermission.action.in_(["view", "edit"]))
|
||||
.where(UserScopePermission.scope_type == "site")
|
||||
)
|
||||
group_stmt = (
|
||||
select(
|
||||
@@ -42,6 +44,8 @@ class PermissionsRepository:
|
||||
.select_from(PermissionGroupPermission)
|
||||
.join(PermissionGroupMember, PermissionGroupMember.group_id == PermissionGroupPermission.group_id)
|
||||
.where(PermissionGroupMember.authentik_sub == authentik_sub)
|
||||
.where(PermissionGroupPermission.action.in_(["view", "edit"]))
|
||||
.where(PermissionGroupPermission.scope_type == "site")
|
||||
)
|
||||
rows = self.db.execute(direct_stmt).all() + self.db.execute(group_stmt).all()
|
||||
result: list[tuple[str, str, str | None, str, str]] = []
|
||||
@@ -50,6 +54,10 @@ class PermissionsRepository:
|
||||
source = row[0]
|
||||
if source == "group":
|
||||
_, scope_type, scope_id, system_key, module_key, action = row
|
||||
if module_key == "__system__":
|
||||
module_key = f"{system_key}.__system__"
|
||||
elif module_key and "." not in module_key:
|
||||
module_key = f"{system_key}.{module_key}"
|
||||
else:
|
||||
_, scope_type, company_key, site_key, module_key, action = row
|
||||
scope_id = company_key if scope_type == "company" else site_key
|
||||
@@ -147,6 +155,8 @@ class PermissionsRepository:
|
||||
.join(Module, Module.id == UserScopePermission.module_id)
|
||||
.join(Company, Company.id == UserScopePermission.company_id, isouter=True)
|
||||
.join(Site, Site.id == UserScopePermission.site_id, isouter=True)
|
||||
.where(UserScopePermission.action.in_(["view", "edit"]))
|
||||
.where(UserScopePermission.scope_type == "site")
|
||||
)
|
||||
count_stmt = (
|
||||
select(func.count())
|
||||
@@ -155,9 +165,11 @@ class PermissionsRepository:
|
||||
.join(Module, Module.id == UserScopePermission.module_id)
|
||||
.join(Company, Company.id == UserScopePermission.company_id, isouter=True)
|
||||
.join(Site, Site.id == UserScopePermission.site_id, isouter=True)
|
||||
.where(UserScopePermission.action.in_(["view", "edit"]))
|
||||
.where(UserScopePermission.scope_type == "site")
|
||||
)
|
||||
|
||||
if scope_type in {"company", "site"}:
|
||||
if scope_type == "site":
|
||||
stmt = stmt.where(UserScopePermission.scope_type == scope_type)
|
||||
count_stmt = count_stmt.where(UserScopePermission.scope_type == scope_type)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user