Files
member-backend/app/services/idp_catalog_sync.py

405 lines
15 KiB
Python

from __future__ import annotations
import threading
import time
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.core.config import get_settings
from app.core.keygen import generate_key
from app.models.company import Company
from app.models.role import Role
from app.models.site import Site
from app.models.system import System
from app.repositories.companies_repo import CompaniesRepository
from app.repositories.roles_repo import RolesRepository
from app.repositories.sites_repo import SitesRepository
from app.repositories.systems_repo import SystemsRepository
from app.repositories.users_repo import UsersRepository
from app.services.idp_admin_service import ProviderAdminService
BUILTIN_CLIENT_IDS = {
"account",
"account-console",
"admin-cli",
"broker",
"realm-management",
"security-admin-console",
"master-realm",
}
_sync_lock = threading.Lock()
_last_synced_at = 0.0
_last_systems_synced_at = 0.0
_min_sync_interval_sec = 30.0
def _generate_unique_key(prefix: str, exists_check) -> str:
for salt in range(5000):
key = generate_key(prefix, salt)
if not exists_check(key):
return key
raise RuntimeError(f"failed_generate_{prefix.lower()}_key")
def _first_attr(attrs: dict | None, key: str) -> str | None:
if not isinstance(attrs, dict):
return None
raw = attrs.get(key)
if isinstance(raw, list) and raw:
value = str(raw[0]).strip()
return value or None
if isinstance(raw, str):
value = raw.strip()
return value or None
return None
def _flatten_groups(nodes: list[dict], inherited_company_key: str | None = None) -> tuple[dict[str, dict], dict[str, dict]]:
companies: dict[str, dict] = {}
sites: dict[str, dict] = {}
for node in nodes:
if not isinstance(node, dict):
continue
attrs = node.get("attributes")
group_id = str(node.get("id", "")).strip() or None
name = str(node.get("name", "")).strip()
children = node.get("subGroups") if isinstance(node.get("subGroups"), list) else []
company_key = _first_attr(attrs, "company_key")
if not company_key and name.startswith("CP"):
company_key = name
if _first_attr(attrs, "member_entity_type") == "company" and not company_key:
company_key = name or None
current_company_key = company_key or inherited_company_key
if company_key:
companies[company_key] = {
"company_key": company_key,
"name": _first_attr(attrs, "name") or _first_attr(attrs, "display_name") or name or company_key,
"status": _first_attr(attrs, "status") or "active",
"provider_group_id": group_id,
}
site_key = _first_attr(attrs, "site_key")
if not site_key and name.startswith("ST"):
site_key = name
if _first_attr(attrs, "member_entity_type") == "site" and not site_key:
site_key = name or None
if site_key:
sites[site_key] = {
"site_key": site_key,
"company_key": _first_attr(attrs, "company_key") or current_company_key,
"display_name": _first_attr(attrs, "display_name") or name or site_key,
"domain": _first_attr(attrs, "domain"),
"status": _first_attr(attrs, "status") or "active",
"provider_group_id": group_id,
}
child_companies, child_sites = _flatten_groups(children, current_company_key)
companies.update(child_companies)
sites.update(child_sites)
return companies, sites
def sync_from_provider(db: Session, *, force: bool = False) -> dict[str, int]:
global _last_synced_at
now = time.time()
if not force and now - _last_synced_at < _min_sync_interval_sec:
return {"synced": 0}
if not _sync_lock.acquire(blocking=False):
return {"synced": 0}
try:
now = time.time()
if not force and now - _last_synced_at < _min_sync_interval_sec:
return {"synced": 0}
idp = ProviderAdminService(get_settings())
companies_repo = CompaniesRepository(db)
sites_repo = SitesRepository(db)
systems_repo = SystemsRepository(db)
roles_repo = RolesRepository(db)
users_repo = UsersRepository(db)
companies_created = 0
companies_updated = 0
sites_created = 0
sites_updated = 0
systems_created = 0
systems_updated = 0
roles_created = 0
roles_updated = 0
users_created_or_updated = 0
group_tree = idp.list_groups_tree()
company_records, site_records = _flatten_groups(group_tree)
company_id_map: dict[str, str] = {}
for company_key, row in company_records.items():
company = companies_repo.get_by_key(company_key)
if company is None:
company = companies_repo.create(
company_key=company_key,
name=row["name"],
provider_group_id=row["provider_group_id"],
status=row["status"],
)
companies_created += 1
else:
company = companies_repo.update(
company,
name=row["name"],
provider_group_id=row["provider_group_id"],
status=row["status"],
)
companies_updated += 1
company_id_map[company_key] = company.id
for site_key, row in site_records.items():
company_key = row.get("company_key")
if not company_key:
continue
company_id = company_id_map.get(company_key)
if not company_id:
placeholder = companies_repo.get_by_key(company_key)
if placeholder is None:
placeholder = companies_repo.create(
company_key=company_key,
name=company_key,
provider_group_id=None,
status="active",
)
companies_created += 1
company_id = placeholder.id
company_id_map[company_key] = company_id
site = sites_repo.get_by_key(site_key)
if site is None:
sites_repo.create(
site_key=site_key,
company_id=company_id,
display_name=row["display_name"],
domain=row["domain"],
provider_group_id=row["provider_group_id"],
status=row["status"],
)
sites_created += 1
else:
sites_repo.update(
site,
company_id=company_id,
display_name=row["display_name"],
domain=row["domain"],
provider_group_id=row["provider_group_id"],
status=row["status"],
)
sites_updated += 1
client_rows = idp.list_clients()
system_map_by_client_id: dict[str, System] = {}
for client in client_rows:
client_uuid = str(client.get("id", "")).strip()
client_id = str(client.get("clientId", "")).strip()
if not client_uuid or not client_id:
continue
if client_id in BUILTIN_CLIENT_IDS:
continue
system = db.scalar(select(System).where(System.name == client_id))
system_name = str(client.get("name", "")).strip() or client_id
system_status = "active" if client.get("enabled", True) else "inactive"
if system is None:
system_key = _generate_unique_key("SY", lambda key: systems_repo.get_by_key(key) is not None)
system = systems_repo.create(
system_key=system_key,
name=system_name,
status=system_status,
)
systems_created += 1
else:
system = systems_repo.update(
system,
name=system_name,
status=system_status,
)
systems_updated += 1
system_map_by_client_id[client_id] = system
client_roles = idp.list_client_roles(client_uuid)
for role_row in client_roles:
if not isinstance(role_row, dict):
continue
role_name = str(role_row.get("name", "")).strip()
if not role_name:
continue
role_desc = str(role_row.get("description", "")).strip() or None
role_status = "active" if not role_row.get("composite", False) else "active"
role = db.scalar(
select(Role).where(
Role.system_id == system.id,
Role.name == role_name,
)
)
if role is None:
role_key = _generate_unique_key("RL", lambda key: roles_repo.get_by_key(key) is not None)
roles_repo.create(
role_key=role_key,
role_code=role_name,
system_id=system.id,
name=role_name,
description=role_desc,
status=role_status,
)
roles_created += 1
else:
roles_repo.update(
role,
role_code=role.role_code or role_name,
name=role_name,
description=role_desc,
status=role_status,
)
roles_updated += 1
for user in idp.list_users():
user_id = str(user.get("id", "")).strip()
if not user_id:
continue
display_name = (
str(user.get("firstName", "")).strip()
or str(user.get("username", "")).strip()
or str(user.get("email", "")).strip()
or user_id
)
users_repo.upsert_by_sub(
user_sub=user_id,
provider_user_id=user_id,
username=str(user.get("username", "")).strip() or None,
email=str(user.get("email", "")).strip() or None,
display_name=display_name,
is_active=bool(user.get("enabled", True)),
status="active" if user.get("enabled", True) else "inactive",
)
users_created_or_updated += 1
_last_synced_at = time.time()
return {
"synced": 1,
"companies_created": companies_created,
"companies_updated": companies_updated,
"sites_created": sites_created,
"sites_updated": sites_updated,
"systems_created": systems_created,
"systems_updated": systems_updated,
"roles_created": roles_created,
"roles_updated": roles_updated,
"users_upserted": users_created_or_updated,
}
finally:
_sync_lock.release()
def sync_systems_from_provider(db: Session, *, force: bool = False) -> dict[str, int]:
global _last_systems_synced_at
now = time.time()
if not force and now - _last_systems_synced_at < _min_sync_interval_sec:
return {"synced": 0}
if not _sync_lock.acquire(blocking=False):
return {"synced": 0}
try:
now = time.time()
if not force and now - _last_systems_synced_at < _min_sync_interval_sec:
return {"synced": 0}
idp = ProviderAdminService(get_settings())
systems_repo = SystemsRepository(db)
roles_repo = RolesRepository(db)
systems_created = 0
systems_updated = 0
roles_created = 0
roles_updated = 0
client_rows = idp.list_clients()
for client in client_rows:
client_uuid = str(client.get("id", "")).strip()
client_id = str(client.get("clientId", "")).strip()
if not client_uuid or not client_id:
continue
if client_id in BUILTIN_CLIENT_IDS:
continue
system = db.scalar(select(System).where(System.name == client_id))
system_name = str(client.get("name", "")).strip() or client_id
system_status = "active" if client.get("enabled", True) else "inactive"
if system is None:
system_key = _generate_unique_key("SY", lambda key: systems_repo.get_by_key(key) is not None)
system = systems_repo.create(
system_key=system_key,
name=system_name,
status=system_status,
)
systems_created += 1
else:
system = systems_repo.update(
system,
name=system_name,
status=system_status,
)
systems_updated += 1
client_roles = idp.list_client_roles(client_uuid)
for role_row in client_roles:
if not isinstance(role_row, dict):
continue
role_name = str(role_row.get("name", "")).strip()
if not role_name:
continue
role_desc = str(role_row.get("description", "")).strip() or None
role_status = "active" if not role_row.get("composite", False) else "active"
role = db.scalar(
select(Role).where(
Role.system_id == system.id,
Role.name == role_name,
)
)
if role is None:
role_key = _generate_unique_key("RL", lambda key: roles_repo.get_by_key(key) is not None)
roles_repo.create(
role_key=role_key,
role_code=role_name,
system_id=system.id,
name=role_name,
description=role_desc,
status=role_status,
)
roles_created += 1
else:
roles_repo.update(
role,
role_code=role.role_code or role_name,
name=role_name,
description=role_desc,
status=role_status,
)
roles_updated += 1
_last_systems_synced_at = time.time()
return {
"synced": 1,
"systems_created": systems_created,
"systems_updated": systems_updated,
"roles_created": roles_created,
"roles_updated": roles_updated,
}
finally:
_sync_lock.release()