from sqlalchemy import func, select from sqlalchemy.orm import Session from app.models.module import Module class ModulesRepository: def __init__(self, db: Session) -> None: self.db = db def get_by_key(self, module_key: str) -> Module | None: stmt = select(Module).where(Module.module_key == module_key) return self.db.scalar(stmt) def list(self, limit: int = 200, offset: int = 0) -> tuple[list[Module], int]: stmt = select(Module) count_stmt = select(func.count()).select_from(Module) stmt = stmt.order_by(Module.created_at.desc()).limit(limit).offset(offset) return list(self.db.scalars(stmt).all()), int(self.db.scalar(count_stmt) or 0) def create(self, module_key: str, system_key: str, name: str, status: str = "active") -> Module: item = Module(module_key=module_key, system_key=system_key, name=name, status=status) self.db.add(item) self.db.commit() self.db.refresh(item) return item def update(self, item: Module, *, name: str | None = None, status: str | None = None) -> Module: if name is not None: item.name = name if status is not None: item.status = status self.db.commit() self.db.refresh(item) return item