from __future__ import annotations from typing import Any from app.repositories.directus.client import DirectusClient class DirectusActivityRepository: """Read Directus built-in /activity for experiment-related collections.""" TRACKED_COLLECTIONS = {"experiments", "variants", "experiment_releases", "variant_changes"} def __init__(self, client: DirectusClient | None = None) -> None: self.client = client or DirectusClient() async def list_for_items( self, item_ids: list[str], access_token: str | None = None, ) -> list[dict[str, Any]]: """Return Directus activity entries for the given item IDs across tracked collections.""" if not item_ids: return [] params: dict[str, Any] = { "filter[collection][_in]": ",".join(self.TRACKED_COLLECTIONS), "filter[item][_in]": ",".join(item_ids), "sort": "-timestamp", "limit": "200", "fields": "id,action,collection,item,timestamp,user.email,user.id", } return await self.client.list_activity(params=params, access_token=access_token)