from sqlalchemy import func, or_, select from sqlalchemy.orm import Session from app.models.company import Company class CompaniesRepository: def __init__(self, db: Session) -> None: self.db = db def get_by_key(self, company_key: str) -> Company | None: return self.db.scalar(select(Company).where(Company.company_key == company_key)) def get_by_id(self, company_id: str) -> Company | None: return self.db.scalar(select(Company).where(Company.id == company_id)) def list(self, keyword: str | None = None, limit: int = 100, offset: int = 0) -> tuple[list[Company], int]: stmt = select(Company) count_stmt = select(func.count()).select_from(Company) if keyword: pattern = f"%{keyword}%" cond = or_( Company.company_key.ilike(pattern), Company.display_name.ilike(pattern), Company.legal_name.ilike(pattern), ) stmt = stmt.where(cond) count_stmt = count_stmt.where(cond) stmt = stmt.order_by(Company.created_at.desc()).limit(limit).offset(offset) return list(self.db.scalars(stmt).all()), int(self.db.scalar(count_stmt) or 0) def create( self, *, company_key: str, display_name: str, legal_name: str | None, provider_group_id: str | None = None, status: str = "active", ) -> Company: item = Company( company_key=company_key, display_name=display_name, legal_name=legal_name, provider_group_id=provider_group_id, status=status, ) self.db.add(item) self.db.commit() self.db.refresh(item) return item def update( self, item: Company, *, display_name: str | None = None, legal_name: str | None = None, provider_group_id: str | None = None, status: str | None = None, ) -> Company: if display_name is not None: item.display_name = display_name if legal_name is not None: item.legal_name = legal_name if provider_group_id is not None: item.provider_group_id = provider_group_id if status is not None: item.status = status self.db.commit() self.db.refresh(item) return item def delete(self, item: Company) -> None: self.db.delete(item) self.db.commit()