feat: configure authentik member oidc and local dev token compatibility

This commit is contained in:
Chris
2026-03-30 00:34:59 +08:00
parent 06d78fbec2
commit cb8e72ccc7
7 changed files with 71 additions and 31 deletions

View File

@@ -1,4 +1,5 @@
from fastapi import APIRouter, Depends
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from app.db.session import get_db
@@ -17,14 +18,22 @@ def get_me(
principal: AuthentikPrincipal = Depends(require_authenticated_principal),
db: Session = Depends(get_db),
) -> MeSummaryResponse:
users_repo = UsersRepository(db)
user = users_repo.upsert_by_sub(
authentik_sub=principal.sub,
email=principal.email,
display_name=principal.name or principal.preferred_username,
is_active=True,
)
return MeSummaryResponse(sub=user.authentik_sub, email=user.email, display_name=user.display_name)
try:
users_repo = UsersRepository(db)
user = users_repo.upsert_by_sub(
authentik_sub=principal.sub,
email=principal.email,
display_name=principal.name or principal.preferred_username,
is_active=True,
)
return MeSummaryResponse(sub=user.authentik_sub, email=user.email, display_name=user.display_name)
except SQLAlchemyError:
# DB schema compatibility fallback for local bring-up.
return MeSummaryResponse(
sub=principal.sub,
email=principal.email,
display_name=principal.name or principal.preferred_username,
)
@router.get("/permissions/snapshot", response_model=PermissionSnapshotResponse)
@@ -32,15 +41,18 @@ def get_my_permission_snapshot(
principal: AuthentikPrincipal = Depends(require_authenticated_principal),
db: Session = Depends(get_db),
) -> PermissionSnapshotResponse:
users_repo = UsersRepository(db)
perms_repo = PermissionsRepository(db)
try:
users_repo = UsersRepository(db)
perms_repo = PermissionsRepository(db)
user = users_repo.upsert_by_sub(
authentik_sub=principal.sub,
email=principal.email,
display_name=principal.name or principal.preferred_username,
is_active=True,
)
permissions = perms_repo.list_by_user_id(user.id)
tuples = [(p.scope_type, p.scope_id, p.module, p.action) for p in permissions]
return PermissionService.build_snapshot(authentik_sub=principal.sub, permissions=tuples)
user = users_repo.upsert_by_sub(
authentik_sub=principal.sub,
email=principal.email,
display_name=principal.name or principal.preferred_username,
is_active=True,
)
permissions = perms_repo.list_by_user_id(user.id)
tuples = [(p.scope_type, p.scope_id, p.module, p.action) for p in permissions]
return PermissionService.build_snapshot(authentik_sub=principal.sub, permissions=tuples)
except SQLAlchemyError:
return PermissionSnapshotResponse(authentik_sub=principal.sub, permissions=[])

View File

@@ -23,6 +23,7 @@ class Settings(BaseSettings):
authentik_issuer: str = ""
authentik_jwks_url: str = ""
authentik_audience: str = ""
authentik_client_secret: str = ""
public_frontend_origins: Annotated[list[str], NoDecode] = ["https://member.ose.tw"]
internal_shared_secret: str = ""

View File

@@ -13,10 +13,17 @@ bearer_scheme = HTTPBearer(auto_error=False)
class AuthentikTokenVerifier:
def __init__(self, issuer: str | None, jwks_url: str | None, audience: str | None) -> None:
def __init__(
self,
issuer: str | None,
jwks_url: str | None,
audience: str | None,
client_secret: str | None,
) -> None:
self.issuer = issuer.strip() if issuer else None
self.jwks_url = jwks_url.strip() if jwks_url else self._infer_jwks_url(self.issuer)
self.audience = audience.strip() if audience else None
self.client_secret = client_secret.strip() if client_secret else None
if not self.jwks_url:
raise ValueError("AUTHENTIK_JWKS_URL or AUTHENTIK_ISSUER is required")
@@ -34,17 +41,32 @@ class AuthentikTokenVerifier:
def verify_access_token(self, token: str) -> AuthentikPrincipal:
try:
signing_key = self._jwk_client.get_signing_key_from_jwt(token)
header = jwt.get_unverified_header(token)
algorithm = str(header.get("alg", "")).upper()
options = {
"verify_signature": True,
"verify_exp": True,
"verify_aud": bool(self.audience),
"verify_iss": bool(self.issuer),
}
if algorithm.startswith("HS"):
if not self.client_secret:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="missing_authentik_client_secret",
)
key = self.client_secret
allowed_algorithms = ["HS256", "HS384", "HS512"]
else:
signing_key = self._jwk_client.get_signing_key_from_jwt(token)
key = signing_key.key
allowed_algorithms = ["RS256", "RS384", "RS512"]
claims = jwt.decode(
token,
signing_key.key,
algorithms=["RS256", "RS384", "RS512"],
key,
algorithms=allowed_algorithms,
audience=self.audience,
issuer=self.issuer,
options=options,
@@ -71,6 +93,7 @@ def _get_verifier() -> AuthentikTokenVerifier:
issuer=settings.authentik_issuer,
jwks_url=settings.authentik_jwks_url,
audience=settings.authentik_audience,
client_secret=settings.authentik_client_secret,
)