298 lines
12 KiB
Python
298 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import re
|
|
from typing import Any
|
|
|
|
from app.application.observability.service import ObservabilityService
|
|
from app.domain.mappers import to_experiment, to_release, to_site, to_variant
|
|
from app.domain.observability import SystemEvent
|
|
from app.domain.runtime import RuntimeExperimentContext, RuntimeVariantCandidate, choose_variant
|
|
from app.repositories.directus.experiments import ExperimentRepository
|
|
from app.repositories.directus.releases import ReleaseRepository
|
|
from app.repositories.directus.sites import SiteRepository
|
|
from app.repositories.directus.variants import VariantRepository
|
|
from app.schemas.runtime import (
|
|
RuntimeAssignRequest,
|
|
RuntimeAssignResponse,
|
|
RuntimeBootstrapRequest,
|
|
RuntimeBootstrapResponse,
|
|
RuntimeExperimentInput,
|
|
RuntimeEventRequest,
|
|
RuntimeEventResponse,
|
|
RuntimePayloadRequest,
|
|
RuntimePayloadResponse,
|
|
RuntimeVariantCandidateInput,
|
|
)
|
|
|
|
|
|
class RuntimeService:
|
|
"""First-pass runtime orchestration service.
|
|
|
|
The current version focuses on:
|
|
- deterministic assignment rules
|
|
- stable API contracts for the SDK
|
|
- keeping runtime logic out of the frontend
|
|
|
|
Later we can swap the input source from request payloads to repositories
|
|
without changing the external response shapes.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
observability: ObservabilityService | None = None,
|
|
site_repository: SiteRepository | None = None,
|
|
experiment_repository: ExperimentRepository | None = None,
|
|
variant_repository: VariantRepository | None = None,
|
|
release_repository: ReleaseRepository | None = None,
|
|
) -> None:
|
|
self.observability = observability or ObservabilityService()
|
|
self.site_repository = site_repository or SiteRepository()
|
|
self.experiment_repository = experiment_repository or ExperimentRepository()
|
|
self.variant_repository = variant_repository or VariantRepository()
|
|
self.release_repository = release_repository or ReleaseRepository()
|
|
|
|
async def bootstrap(self, request: RuntimeBootstrapRequest) -> RuntimeBootstrapResponse:
|
|
site = await self._resolve_site(request.site_id, request.site_key)
|
|
if not site:
|
|
return RuntimeBootstrapResponse(
|
|
site_id=request.site_id,
|
|
site_key=request.site_key,
|
|
url=request.url,
|
|
visitor_id=request.visitor_id,
|
|
candidate_experiments=[],
|
|
)
|
|
|
|
experiments = await self.experiment_repository.list(
|
|
params={
|
|
"filter[site_id][_eq]": site.id,
|
|
"filter[status][_eq]": "running",
|
|
}
|
|
)
|
|
|
|
candidate_experiments = []
|
|
for raw_experiment in experiments:
|
|
experiment = to_experiment(raw_experiment)
|
|
if not self._matches_targeting(request.url, request.user_agent, experiment.targeting_config):
|
|
continue
|
|
|
|
raw_releases = await self.release_repository.list(
|
|
params={
|
|
"filter[experiment_id][_eq]": experiment.id,
|
|
"filter[status][_eq]": "published",
|
|
"sort[]": "-version_no",
|
|
"limit": 1,
|
|
}
|
|
)
|
|
release = to_release(raw_releases[0]) if raw_releases else None
|
|
|
|
# Build a traffic weight index from the release snapshot when available.
|
|
# This ensures assignment weights are stable and match what was published,
|
|
# even if variant weights have since been edited.
|
|
snapshot_weight: dict[str, int] = {}
|
|
snapshot_key: dict[str, str] = {}
|
|
if release and isinstance(release.runtime_payload, dict):
|
|
for sv in release.runtime_payload.get("variants", []):
|
|
vid = str(sv.get("variant_id", ""))
|
|
if vid:
|
|
snapshot_weight[vid] = int(sv.get("traffic_weight", 0))
|
|
snapshot_key[vid] = str(sv.get("variant_key", ""))
|
|
|
|
raw_variants = await self.variant_repository.list(
|
|
params={"filter[experiment_id][_eq]": experiment.id}
|
|
)
|
|
variants = [to_variant(item) for item in raw_variants]
|
|
if not variants:
|
|
continue
|
|
|
|
candidate_experiments.append(
|
|
RuntimeExperimentInput(
|
|
experiment_id=experiment.id,
|
|
experiment_key=experiment.experiment_key,
|
|
status=experiment.status,
|
|
site_key=site.site_key,
|
|
release_id=release.id if release else None,
|
|
release_version=release.version_no if release else None,
|
|
payload=release.runtime_payload if release else None,
|
|
variants=[
|
|
RuntimeVariantCandidateInput(
|
|
id=variant.id,
|
|
variant_key=snapshot_key.get(variant.id, variant.variant_key),
|
|
traffic_weight=snapshot_weight.get(variant.id, variant.traffic_weight),
|
|
content_config=variant.content_config,
|
|
)
|
|
for variant in variants
|
|
],
|
|
)
|
|
)
|
|
|
|
return RuntimeBootstrapResponse(
|
|
site_id=site.id,
|
|
site_key=site.site_key,
|
|
url=request.url,
|
|
visitor_id=request.visitor_id,
|
|
candidate_experiments=candidate_experiments,
|
|
)
|
|
|
|
async def assign(self, request: RuntimeAssignRequest) -> RuntimeAssignResponse | None:
|
|
experiment = RuntimeExperimentContext(
|
|
experiment_id=request.experiment.experiment_id,
|
|
experiment_key=request.experiment.experiment_key,
|
|
status=request.experiment.status,
|
|
site_key=request.experiment.site_key,
|
|
assignment_salt=request.experiment.assignment_salt,
|
|
release_id=request.experiment.release_id,
|
|
release_version=request.experiment.release_version,
|
|
payload=request.experiment.payload,
|
|
variants=[
|
|
RuntimeVariantCandidate(
|
|
id=variant.id,
|
|
variant_key=variant.variant_key,
|
|
traffic_weight=variant.traffic_weight,
|
|
content_config=variant.content_config,
|
|
)
|
|
for variant in request.experiment.variants
|
|
],
|
|
)
|
|
|
|
decision = choose_variant(
|
|
experiment_id=experiment.experiment_id,
|
|
experiment_key=experiment.experiment_key,
|
|
visitor_id=request.visitor_id,
|
|
site_key=experiment.site_key,
|
|
assignment_salt=experiment.assignment_salt,
|
|
variants=experiment.variants,
|
|
)
|
|
if not decision:
|
|
return None
|
|
|
|
await self.observability.record_system_event(
|
|
SystemEvent(
|
|
category="runtime_assignment",
|
|
event_name="assignment_decided",
|
|
experiment_id=decision.experiment_id,
|
|
experiment_key=decision.experiment_key,
|
|
variant_id=decision.variant_id,
|
|
variant_key=decision.variant_key,
|
|
visitor_id=request.visitor_id,
|
|
payload={"bucket": decision.bucket, "reason": decision.reason},
|
|
)
|
|
)
|
|
|
|
return RuntimeAssignResponse(
|
|
experiment_id=decision.experiment_id,
|
|
experiment_key=decision.experiment_key,
|
|
variant_id=decision.variant_id,
|
|
variant_key=decision.variant_key,
|
|
bucket=decision.bucket,
|
|
reason=decision.reason,
|
|
)
|
|
|
|
async def payload(self, request: RuntimePayloadRequest) -> RuntimePayloadResponse:
|
|
assignment = await self.assign(
|
|
RuntimeAssignRequest(
|
|
visitor_id=request.visitor_id,
|
|
experiment=request.experiment,
|
|
)
|
|
)
|
|
|
|
return RuntimePayloadResponse(
|
|
experiment_id=request.experiment.experiment_id,
|
|
experiment_key=request.experiment.experiment_key,
|
|
release_id=request.experiment.release_id,
|
|
release_version=request.experiment.release_version,
|
|
assigned_variant_id=assignment.variant_id if assignment else None,
|
|
assigned_variant_key=assignment.variant_key if assignment else None,
|
|
payload=request.experiment.payload,
|
|
)
|
|
|
|
async def ingest_event(self, request: RuntimeEventRequest) -> RuntimeEventResponse:
|
|
# Event forwarding/queueing will be introduced when GA4/GTM ingest and
|
|
# audit storage are wired in. We preserve the external contract now.
|
|
await self.observability.record_system_event(
|
|
SystemEvent(
|
|
category="runtime_event",
|
|
event_name=request.event_name,
|
|
site_id=request.site_id,
|
|
site_key=request.site_key,
|
|
experiment_id=request.experiment_id,
|
|
experiment_key=request.experiment_key,
|
|
variant_id=request.variant_id,
|
|
variant_key=request.variant_key,
|
|
visitor_id=request.visitor_id,
|
|
payload=request.payload,
|
|
)
|
|
)
|
|
return RuntimeEventResponse(
|
|
accepted=True,
|
|
event_name=request.event_name,
|
|
)
|
|
|
|
async def _resolve_site(self, site_id: str | None, site_key: str | None):
|
|
if site_id:
|
|
item = await self.site_repository.get(site_id)
|
|
if item:
|
|
return to_site(item)
|
|
|
|
if site_key:
|
|
items = await self.site_repository.list(
|
|
params={"filter[site_key][_eq]": site_key}
|
|
)
|
|
if items:
|
|
return to_site(items[0])
|
|
|
|
return None
|
|
|
|
def _matches_targeting(
|
|
self,
|
|
request_url: str,
|
|
user_agent: str | None,
|
|
targeting_config: dict[str, Any] | list[Any] | None,
|
|
) -> bool:
|
|
if not isinstance(targeting_config, dict):
|
|
return True
|
|
|
|
# Device targeting — empty list = all devices
|
|
device_targets = targeting_config.get("device_targets")
|
|
if device_targets:
|
|
detected = self._detect_device(user_agent or "")
|
|
if detected not in device_targets:
|
|
return False
|
|
|
|
# URL rules (all must match — AND logic)
|
|
url_rules = targeting_config.get("url_rules")
|
|
if url_rules:
|
|
return all(self._matches_url_rule(request_url, rule) for rule in url_rules)
|
|
|
|
# Fallback: base_url prefix check
|
|
base_url = targeting_config.get("base_url")
|
|
if base_url:
|
|
return str(request_url).startswith(str(base_url))
|
|
|
|
return True
|
|
|
|
def _detect_device(self, user_agent: str) -> str:
|
|
ua = user_agent.lower()
|
|
if "ipad" in ua or ("android" in ua and "mobile" not in ua):
|
|
return "tablet"
|
|
if "mobile" in ua or "iphone" in ua or "android" in ua:
|
|
return "mobile"
|
|
return "desktop"
|
|
|
|
def _matches_url_rule(self, url: str, rule: dict[str, Any]) -> bool:
|
|
operator = rule.get("operator", "contains")
|
|
value = str(rule.get("value", ""))
|
|
if not value:
|
|
return True
|
|
if operator == "contains":
|
|
return value in url
|
|
if operator == "equals":
|
|
return url == value
|
|
if operator == "starts_with":
|
|
return url.startswith(value)
|
|
if operator == "regex":
|
|
try:
|
|
return bool(re.search(value, url))
|
|
except re.error:
|
|
return False
|
|
return True
|