from sqlalchemy import func, or_, select from sqlalchemy.orm import Session from app.models.system import System class SystemsRepository: def __init__(self, db: Session) -> None: self.db = db def get_by_key(self, system_key: str) -> System | None: return self.db.scalar(select(System).where(System.system_key == system_key)) def get_by_id(self, system_id: str) -> System | None: return self.db.scalar(select(System).where(System.id == system_id)) def list(self, *, keyword: str | None = None, status: str | None = None, limit: int = 100, offset: int = 0) -> tuple[list[System], int]: stmt = select(System) count_stmt = select(func.count()).select_from(System) if keyword: pattern = f"%{keyword}%" cond = or_(System.system_key.ilike(pattern), System.name.ilike(pattern)) stmt = stmt.where(cond) count_stmt = count_stmt.where(cond) if status: stmt = stmt.where(System.status == status) count_stmt = count_stmt.where(System.status == status) stmt = stmt.order_by(System.created_at.desc()).limit(limit).offset(offset) return list(self.db.scalars(stmt).all()), int(self.db.scalar(count_stmt) or 0) def create(self, *, system_key: str, name: str, status: str = "active") -> System: item = System(system_key=system_key, name=name, status=status) self.db.add(item) self.db.commit() self.db.refresh(item) return item def update( self, item: System, *, name: str | None = None, status: str | None = None, ) -> System: if name is not None: item.name = name if status is not None: item.status = status self.db.commit() self.db.refresh(item) return item def delete(self, item: System) -> None: self.db.delete(item) self.db.commit()