Add in-memory read cache with CUD-based invalidation

This commit is contained in:
Chris
2026-04-03 02:32:38 +08:00
parent fa624127c8
commit ed413ce39d
8 changed files with 196 additions and 20 deletions

View File

@@ -53,6 +53,7 @@ from app.security.admin_guard import require_admin_principal
from app.security.api_client_auth import hash_api_key
from app.services.idp_admin_service import ProviderAdminService
from app.services.idp_catalog_sync import sync_from_provider
from app.services.runtime_cache import runtime_cache
from app.core.config import get_settings
router = APIRouter(
@@ -161,9 +162,16 @@ def list_companies(
limit: int = Query(default=100, ge=1, le=500),
offset: int = Query(default=0, ge=0),
) -> ListResponse:
cache_key = f"admin:companies:{keyword or ''}:{limit}:{offset}"
cached = runtime_cache.get(cache_key)
if isinstance(cached, ListResponse):
return cached
repo = CompaniesRepository(db)
items, total = repo.list(keyword=keyword, limit=limit, offset=offset)
return ListResponse(items=[_company_item(i) for i in items], total=total, limit=limit, offset=offset)
result = ListResponse(items=[_company_item(i) for i in items], total=total, limit=limit, offset=offset)
runtime_cache.set(cache_key, result, ttl_seconds=20)
return result
@router.post("/companies", response_model=CompanyItem)
@@ -251,6 +259,11 @@ def list_sites(
limit: int = Query(default=100, ge=1, le=500),
offset: int = Query(default=0, ge=0),
) -> ListResponse:
cache_key = f"admin:sites:{keyword or ''}:{company_key or ''}:{limit}:{offset}"
cached = runtime_cache.get(cache_key)
if isinstance(cached, ListResponse):
return cached
companies_repo = CompaniesRepository(db)
sites_repo = SitesRepository(db)
company_id = None
@@ -264,7 +277,9 @@ def list_sites(
company_map = {c.id: c for c in companies}
items, total = sites_repo.list(keyword=keyword, company_id=company_id, limit=limit, offset=offset)
response_items = [_site_item(i, company_map[i.company_id]) for i in items if i.company_id in company_map]
return ListResponse(items=response_items, total=total, limit=limit, offset=offset)
result = ListResponse(items=response_items, total=total, limit=limit, offset=offset)
runtime_cache.set(cache_key, result, ttl_seconds=20)
return result
@router.post("/sites", response_model=SiteItem)
@@ -375,9 +390,16 @@ def list_systems(
limit: int = Query(default=100, ge=1, le=500),
offset: int = Query(default=0, ge=0),
) -> ListResponse:
cache_key = f"admin:systems:{keyword or ''}:{status_filter or ''}:{limit}:{offset}"
cached = runtime_cache.get(cache_key)
if isinstance(cached, ListResponse):
return cached
repo = SystemsRepository(db)
items, total = repo.list(keyword=keyword, status=status_filter, limit=limit, offset=offset)
return ListResponse(items=[_system_item(i) for i in items], total=total, limit=limit, offset=offset)
result = ListResponse(items=[_system_item(i) for i in items], total=total, limit=limit, offset=offset)
runtime_cache.set(cache_key, result, ttl_seconds=20)
return result
@router.post("/systems", response_model=SystemItem)
@@ -404,6 +426,11 @@ def list_roles(
limit: int = Query(default=100, ge=1, le=500),
offset: int = Query(default=0, ge=0),
) -> ListResponse:
cache_key = f"admin:roles:{keyword or ''}:{system_key or ''}:{status_filter or ''}:{limit}:{offset}"
cached = runtime_cache.get(cache_key)
if isinstance(cached, ListResponse):
return cached
systems_repo = SystemsRepository(db)
roles_repo = RolesRepository(db)
@@ -433,7 +460,9 @@ def list_roles(
for row in rows
if row.system_id in system_map
]
return ListResponse(items=items, total=total, limit=limit, offset=offset)
result = ListResponse(items=items, total=total, limit=limit, offset=offset)
runtime_cache.set(cache_key, result, ttl_seconds=20)
return result
@router.post("/roles", response_model=RoleItem)
@@ -751,9 +780,17 @@ def list_members(
limit: int = Query(default=100, ge=1, le=500),
offset: int = Query(default=0, ge=0),
) -> ListResponse:
is_active_key = "" if is_active is None else ("1" if is_active else "0")
cache_key = f"admin:members:{keyword or ''}:{is_active_key}:{limit}:{offset}"
cached = runtime_cache.get(cache_key)
if isinstance(cached, ListResponse):
return cached
repo = UsersRepository(db)
rows, total = repo.list(keyword=keyword, is_active=is_active, limit=limit, offset=offset)
return ListResponse(items=[_member_item(r) for r in rows], total=total, limit=limit, offset=offset)
result = ListResponse(items=[_member_item(r) for r in rows], total=total, limit=limit, offset=offset)
runtime_cache.set(cache_key, result, ttl_seconds=20)
return result
@router.post("/members", response_model=MemberItem)
@@ -950,14 +987,21 @@ def list_api_clients(
limit: int = Query(default=100, ge=1, le=500),
offset: int = Query(default=0, ge=0),
) -> ListResponse:
cache_key = f"admin:api_clients:{keyword or ''}:{status_filter or ''}:{limit}:{offset}"
cached = runtime_cache.get(cache_key)
if isinstance(cached, ListResponse):
return cached
repo = ApiClientsRepository(db)
items, total = repo.list(keyword=keyword, status=status_filter, limit=limit, offset=offset)
return ListResponse(
result = ListResponse(
items=[ApiClientItem.model_validate(i, from_attributes=True) for i in items],
total=total,
limit=limit,
offset=offset,
)
runtime_cache.set(cache_key, result, ttl_seconds=20)
return result
@router.post("/sync/from-provider")

View File

@@ -12,6 +12,7 @@ from app.schemas.users import UserUpsertBySubRequest
from app.security.api_client_auth import require_api_client
from app.services.idp_admin_service import ProviderAdminService
from app.services.permission_service import PermissionService
from app.services.runtime_cache import runtime_cache
router = APIRouter(prefix="/internal", tags=["internal"], dependencies=[Depends(require_api_client)])
@@ -68,8 +69,13 @@ def _build_user_role_rows(db: Session, user_sub: str) -> list[tuple[str, str, st
@router.get("/users/{user_sub}/roles", response_model=InternalUserRoleResponse)
def get_user_roles(user_sub: str, db: Session = Depends(get_db)) -> InternalUserRoleResponse:
cache_key = f"internal:user_roles:{user_sub}"
cached = runtime_cache.get(cache_key)
if isinstance(cached, InternalUserRoleResponse):
return cached
rows = _build_user_role_rows(db, user_sub)
return InternalUserRoleResponse(
result = InternalUserRoleResponse(
user_sub=user_sub,
roles=[
InternalUserRoleItem(
@@ -94,6 +100,8 @@ def get_user_roles(user_sub: str, db: Session = Depends(get_db)) -> InternalUser
) in rows
],
)
runtime_cache.set(cache_key, result, ttl_seconds=30)
return result
@router.get("/permissions/{user_sub}/snapshot", response_model=RoleSnapshotResponse)
@@ -101,8 +109,15 @@ def get_permission_snapshot(
user_sub: str,
db: Session = Depends(get_db),
) -> RoleSnapshotResponse:
cache_key = f"internal:permissions_snapshot:{user_sub}"
cached = runtime_cache.get(cache_key)
if isinstance(cached, RoleSnapshotResponse):
return cached
rows = _build_user_role_rows(db, user_sub)
return PermissionService.build_role_snapshot(user_sub=user_sub, rows=rows)
result = PermissionService.build_role_snapshot(user_sub=user_sub, rows=rows)
runtime_cache.set(cache_key, result, ttl_seconds=30)
return result
@router.post("/provider/users/ensure", response_model=ProviderEnsureUserResponse)

View File

@@ -9,6 +9,7 @@ from app.schemas.auth import ProviderPrincipal, MeSummaryResponse
from app.schemas.permissions import RoleSnapshotResponse
from app.security.idp_jwt import require_authenticated_principal
from app.services.permission_service import PermissionService
from app.services.runtime_cache import runtime_cache
router = APIRouter(prefix="/me", tags=["me"])
@@ -18,6 +19,10 @@ def get_me(
principal: ProviderPrincipal = Depends(require_authenticated_principal),
db: Session = Depends(get_db),
) -> MeSummaryResponse:
cache_key = f"me:{principal.sub}"
cached = runtime_cache.get(cache_key)
if isinstance(cached, MeSummaryResponse):
return cached
try:
users_repo = UsersRepository(db)
user = users_repo.upsert_by_sub(
@@ -28,13 +33,17 @@ def get_me(
is_active=True,
status="active",
)
return MeSummaryResponse(sub=user.user_sub, email=user.email, display_name=user.display_name)
result = MeSummaryResponse(sub=user.user_sub, email=user.email, display_name=user.display_name)
runtime_cache.set(cache_key, result, ttl_seconds=30)
return result
except SQLAlchemyError:
return MeSummaryResponse(
result = MeSummaryResponse(
sub=principal.sub,
email=principal.email,
display_name=principal.name or principal.preferred_username,
)
runtime_cache.set(cache_key, result, ttl_seconds=15)
return result
@router.get("/permissions/snapshot", response_model=RoleSnapshotResponse)
@@ -42,6 +51,10 @@ def get_my_permission_snapshot(
principal: ProviderPrincipal = Depends(require_authenticated_principal),
db: Session = Depends(get_db),
) -> RoleSnapshotResponse:
cache_key = f"me:permissions_snapshot:{principal.sub}"
cached = runtime_cache.get(cache_key)
if isinstance(cached, RoleSnapshotResponse):
return cached
try:
users_repo = UsersRepository(db)
user_sites_repo = UserSitesRepository(db)
@@ -68,6 +81,10 @@ def get_my_permission_snapshot(
)
for site, company, role, system in rows
]
return PermissionService.build_role_snapshot(user_sub=principal.sub, rows=serialized)
result = PermissionService.build_role_snapshot(user_sub=principal.sub, rows=serialized)
runtime_cache.set(cache_key, result, ttl_seconds=30)
return result
except SQLAlchemyError:
return RoleSnapshotResponse(user_sub=principal.sub, roles=[])
result = RoleSnapshotResponse(user_sub=principal.sub, roles=[])
runtime_cache.set(cache_key, result, ttl_seconds=10)
return result

View File

@@ -7,6 +7,7 @@ from app.api.internal_catalog import router as internal_catalog_router
from app.api.internal import router as internal_router
from app.api.me import router as me_router
from app.core.config import get_settings
from app.services.runtime_cache import runtime_cache
app = FastAPI(title="memberapi.ose.tw", version="0.1.0")
@@ -20,6 +21,18 @@ app.add_middleware(
)
@app.middleware("http")
async def invalidate_runtime_cache_on_cud(request, call_next):
response = await call_next(request)
if (
request.method in {"POST", "PUT", "PATCH", "DELETE"}
and request.url.path.startswith(("/admin", "/internal"))
and response.status_code < 400
):
runtime_cache.bump_revision()
return response
@app.get("/healthz", tags=["health"])
def healthz() -> dict[str, str]:
return {"status": "ok"}

View File

@@ -57,6 +57,7 @@ class UsersRepository:
provider_user_id: str | None = None,
) -> User:
user = self.get_by_sub(user_sub)
changed = False
if user is None:
user = User(
user_sub=user_sub,
@@ -68,15 +69,28 @@ class UsersRepository:
status=status,
)
self.db.add(user)
changed = True
else:
if provider_user_id is not None:
if provider_user_id is not None and user.provider_user_id != provider_user_id:
user.provider_user_id = provider_user_id
changed = True
if user.username != username:
user.username = username
changed = True
if user.email != email:
user.email = email
changed = True
if user.display_name != display_name:
user.display_name = display_name
changed = True
if user.is_active != is_active:
user.is_active = is_active
changed = True
if user.status != status:
user.status = status
changed = True
if changed:
self.db.commit()
self.db.refresh(user)
return user

View File

@@ -0,0 +1,71 @@
from __future__ import annotations
from dataclasses import dataclass
from threading import RLock
import time
from typing import Callable, TypeVar
T = TypeVar("T")
@dataclass
class _CacheEntry:
value: object
expires_at: float
revision: int
class RuntimeCache:
"""Simple in-memory cache for local/prototype use.
Cache is globally invalidated by `bump_revision()` which we call after CUD.
"""
def __init__(self) -> None:
self._lock = RLock()
self._revision = 0
self._entries: dict[str, _CacheEntry] = {}
def get(self, key: str) -> object | None:
now = time.time()
with self._lock:
entry = self._entries.get(key)
if not entry:
return None
if entry.expires_at <= now or entry.revision != self._revision:
self._entries.pop(key, None)
return None
return entry.value
def set(self, key: str, value: object, ttl_seconds: int = 30) -> object:
now = time.time()
with self._lock:
self._entries[key] = _CacheEntry(
value=value,
expires_at=now + max(ttl_seconds, 1),
revision=self._revision,
)
if len(self._entries) > 2000:
self._entries.clear()
return value
def get_or_set(self, key: str, factory: Callable[[], T], ttl_seconds: int = 30) -> T:
cached = self.get(key)
if cached is not None:
return cached # type: ignore[return-value]
return self.set(key, factory(), ttl_seconds=ttl_seconds) # type: ignore[return-value]
def bump_revision(self) -> int:
with self._lock:
self._revision += 1
if self._revision > 1_000_000_000:
self._revision = 1
self._entries.clear()
return self._revision
def revision(self) -> int:
with self._lock:
return self._revision
runtime_cache = RuntimeCache()

View File

@@ -30,6 +30,7 @@
- 補齊策略:僅在手動同步按鈕(`POST /admin/sync/from-provider`)或 CUD 流程時同步。
- 站台角色指派(`PUT /admin/sites/{site_key}/roles``PUT /admin/roles/{role_key}/sites`)會即時同步到 Keycloak Group Role Mapping。
- 使用者加入 Site 時,透過同步邏輯使其在 IdP 端取得對應角色能力。
- 讀取效能:後端採用 memory cache後續可換 Redis`GET` 先讀快取;`POST/PUT/PATCH/DELETE` 成功後自動失效快取。
## 後台安全線
- `/admin/*` 必須 Bearer token。

View File

@@ -23,3 +23,4 @@
- [x] 系統改為 Keycloak 唯一來源(後台停用 system CRUD
- [x] Role CRUD 同步 Provider Client Role新增/修改/刪除會同步到 Provider
- [x] Site/Role 關聯指派同步 Keycloak Group Role Mapping雙向指派入口皆同步
- [x] 後端讀取快取memory與 CUD 自動失效機制(可後續切 Redis