from functools import lru_cache from typing import Annotated from pydantic import field_validator from pydantic_settings import BaseSettings, NoDecode, SettingsConfigDict class Settings(BaseSettings): model_config = SettingsConfigDict(env_file=".env", extra="ignore") app_env: str = "development" port: int = 8000 db_host: str = "127.0.0.1" db_port: int = 54321 db_name: str = "member_center" db_user: str = "member_ose" db_password: str = "" authentik_base_url: str = "" authentik_admin_token: str = "" authentik_verify_tls: bool = False authentik_issuer: str = "" authentik_jwks_url: str = "" authentik_audience: str = "" authentik_client_id: str = "" authentik_client_secret: str = "" authentik_token_endpoint: str = "" authentik_userinfo_endpoint: str = "" # Keycloak (preferred when configured) keycloak_base_url: str = "" keycloak_realm: str = "" keycloak_verify_tls: bool = True keycloak_issuer: str = "" keycloak_jwks_url: str = "" keycloak_audience: str = "" keycloak_client_id: str = "" keycloak_client_secret: str = "" keycloak_token_endpoint: str = "" keycloak_userinfo_endpoint: str = "" keycloak_admin_client_id: str = "" keycloak_admin_client_secret: str = "" keycloak_admin_realm: str = "" public_frontend_origins: Annotated[list[str], NoDecode] = ["https://member.ose.tw"] internal_shared_secret: str = "" admin_required_groups: Annotated[list[str], NoDecode] = [] @field_validator("public_frontend_origins", mode="before") @classmethod def parse_origins(cls, value: str | list[str]) -> list[str]: if isinstance(value, list): return value if not value: return [] return [origin.strip() for origin in value.split(",") if origin.strip()] @field_validator("admin_required_groups", mode="before") @classmethod def parse_csv(cls, value: str | list[str]) -> list[str]: if isinstance(value, list): return [str(v).strip() for v in value if str(v).strip()] if not value: return [] return [part.strip() for part in value.split(",") if part.strip()] @property def database_url(self) -> str: return ( "postgresql+psycopg://" f"{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}" ) @property def use_keycloak(self) -> bool: return bool(self.keycloak_base_url and self.keycloak_realm) @property def idp_base_url(self) -> str: if self.use_keycloak: return self.keycloak_base_url.rstrip("/") return self.authentik_base_url.rstrip("/") @property def idp_verify_tls(self) -> bool: if self.use_keycloak: return self.keycloak_verify_tls return self.authentik_verify_tls @property def idp_issuer(self) -> str: if self.use_keycloak: if self.keycloak_issuer: return self.keycloak_issuer.rstrip("/") return f"{self.idp_base_url}/realms/{self.keycloak_realm}" return self.authentik_issuer.rstrip("/") @property def idp_jwks_url(self) -> str: if self.use_keycloak: if self.keycloak_jwks_url: return self.keycloak_jwks_url return f"{self.idp_issuer}/protocol/openid-connect/certs" return self.authentik_jwks_url @property def idp_audience(self) -> str: if self.use_keycloak: return self.keycloak_audience return self.authentik_audience or self.authentik_client_id @property def idp_client_id(self) -> str: if self.use_keycloak: return self.keycloak_client_id return self.authentik_client_id @property def idp_client_secret(self) -> str: if self.use_keycloak: return self.keycloak_client_secret return self.authentik_client_secret @property def idp_token_endpoint(self) -> str: if self.use_keycloak: if self.keycloak_token_endpoint: return self.keycloak_token_endpoint return f"{self.idp_issuer}/protocol/openid-connect/token" return self.authentik_token_endpoint or (f"{self.idp_base_url}/application/o/token/" if self.idp_base_url else "") @property def idp_userinfo_endpoint(self) -> str: if self.use_keycloak: if self.keycloak_userinfo_endpoint: return self.keycloak_userinfo_endpoint return f"{self.idp_issuer}/protocol/openid-connect/userinfo" return self.authentik_userinfo_endpoint or ( f"{self.idp_base_url}/application/o/userinfo/" if self.idp_base_url else "" ) @property def idp_authorize_endpoint(self) -> str: if self.use_keycloak: return f"{self.idp_issuer}/protocol/openid-connect/auth" return f"{self.idp_base_url}/application/o/authorize/" if self.idp_base_url else "" @lru_cache def get_settings() -> Settings: return Settings()