from __future__ import annotations from typing import Any from app.repositories.directus.activity import DirectusActivityRepository from app.repositories.directus.variants import VariantRepository from app.repositories.directus.releases import ReleaseRepository ACTION_LABELS: dict[str, str] = { "create": "建立", "update": "更新", "delete": "刪除", } COLLECTION_LABELS: dict[str, str] = { "experiments": "實驗設定", "variants": "變體", "experiment_releases": "版本", "variant_changes": "視覺變更", } class ActivityService: def __init__( self, activity_repository: DirectusActivityRepository | None = None, variant_repository: VariantRepository | None = None, release_repository: ReleaseRepository | None = None, ) -> None: self.activity_repo = activity_repository or DirectusActivityRepository() self.variant_repo = variant_repository or VariantRepository() self.release_repo = release_repository or ReleaseRepository() async def list_for_experiment( self, experiment_id: str, access_token: str | None = None, ) -> list[dict[str, Any]]: # Collect all item IDs related to this experiment item_ids: list[str] = [experiment_id] variants = await self.variant_repo.list( params={"filter[experiment_id][_eq]": experiment_id, "fields": "id"}, access_token=access_token, ) variant_ids = [str(v["id"]) for v in variants] item_ids.extend(variant_ids) releases = await self.release_repo.list( params={"filter[experiment_id][_eq]": experiment_id, "fields": "id"}, access_token=access_token, ) release_ids = [str(r["id"]) for r in releases] item_ids.extend(release_ids) raw_entries = await self.activity_repo.list_for_items( item_ids=item_ids, access_token=access_token, ) result = [] for entry in raw_entries: action = str(entry.get("action", "")) collection = str(entry.get("collection", "")) user = entry.get("user") or {} actor_email = user.get("email") if isinstance(user, dict) else None actor_id = user.get("id") if isinstance(user, dict) else str(user) if user else None result.append({ "id": entry.get("id"), "action": action, "action_label": ACTION_LABELS.get(action, action), "collection": collection, "collection_label": COLLECTION_LABELS.get(collection, collection), "item": str(entry.get("item", "")), "timestamp": entry.get("timestamp"), "actor_email": actor_email, "actor_id": actor_id, }) return result