from __future__ import annotations from typing import Any import httpx from app.core.config import settings class DirectusClient: """Small wrapper around Directus HTTP APIs used by FastAPI repositories. We keep this adapter thin on purpose: - business rules stay in services/use-cases - repository classes decide which collection to access - this client only knows how to talk to Directus """ def __init__( self, base_url: str | None = None, token: str | None = None, timeout: float = 15.0, ) -> None: self.base_url = (base_url or settings.directus_base_url).rstrip("/") self.token = token or settings.directus_admin_token self.timeout = timeout or settings.directus_timeout def _headers(self) -> dict[str, str]: headers = {"Content-Type": "application/json"} if self.token: headers["Authorization"] = f"Bearer {self.token}" return headers def _headers_for_token(self, token: str | None = None) -> dict[str, str]: headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" elif self.token: headers["Authorization"] = f"Bearer {self.token}" return headers async def list_items( self, collection: str, params: dict[str, Any] | None = None, access_token: str | None = None, ) -> list[dict[str, Any]]: """Fetch a collection list from Directus.""" async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/items/{collection}", headers=self._headers_for_token(access_token), params=params, ) response.raise_for_status() payload = response.json() return payload.get("data", []) async def get_item( self, collection: str, item_id: str, params: dict[str, Any] | None = None, access_token: str | None = None, ) -> dict[str, Any] | None: """Fetch a single item from Directus.""" async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/items/{collection}/{item_id}", headers=self._headers_for_token(access_token), params=params, ) if response.status_code == 404: return None response.raise_for_status() payload = response.json() return payload.get("data") async def create_item( self, collection: str, data: dict[str, Any], access_token: str | None = None, ) -> dict[str, Any]: """Create a Directus item so activity/revision can still be preserved.""" async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( f"{self.base_url}/items/{collection}", headers=self._headers_for_token(access_token), json=data, ) response.raise_for_status() payload = response.json() return payload["data"] async def update_item( self, collection: str, item_id: str, data: dict[str, Any], access_token: str | None = None, ) -> dict[str, Any]: """Update a Directus item after FastAPI finishes validation and orchestration.""" async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.patch( f"{self.base_url}/items/{collection}/{item_id}", headers=self._headers_for_token(access_token), json=data, ) response.raise_for_status() payload = response.json() return payload["data"] async def delete_item( self, collection: str, item_id: str, access_token: str | None = None, ) -> None: """Delete a Directus item.""" async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.delete( f"{self.base_url}/items/{collection}/{item_id}", headers=self._headers_for_token(access_token), ) response.raise_for_status() async def list_activity( self, params: dict[str, Any] | None = None, access_token: str | None = None, ) -> list[dict[str, Any]]: """Query Directus /activity — the built-in audit trail for all collections.""" async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.base_url}/activity", headers=self._headers_for_token(access_token), params=params, ) response.raise_for_status() payload = response.json() return payload.get("data", []) async def get_current_user( self, access_token: str, fields: list[str] | None = None, ) -> dict[str, Any] | None: """Resolve Directus /users/me with a bearer token issued by Directus.""" url = f"{self.base_url}/users/me" if fields: # Directus accepts comma-delimited `fields=...` query strings, but in # this environment it falls back to returning only `{id}` when the # commas are percent-encoded by the HTTP client. Build the query # string explicitly so auth/role bootstrap remains deterministic. url = f"{url}?fields={','.join(fields)}" async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( url, headers=self._headers_for_token(access_token), ) if response.status_code == 401: return None response.raise_for_status() payload = response.json() return payload.get("data")