78 lines
2.4 KiB
Python
78 lines
2.4 KiB
Python
from sqlalchemy import func, or_, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.company import Company
|
|
|
|
|
|
class CompaniesRepository:
|
|
def __init__(self, db: Session) -> None:
|
|
self.db = db
|
|
|
|
def get_by_key(self, company_key: str) -> Company | None:
|
|
return self.db.scalar(select(Company).where(Company.company_key == company_key))
|
|
|
|
def get_by_id(self, company_id: str) -> Company | None:
|
|
return self.db.scalar(select(Company).where(Company.id == company_id))
|
|
|
|
def list(self, keyword: str | None = None, limit: int = 100, offset: int = 0) -> tuple[list[Company], int]:
|
|
stmt = select(Company)
|
|
count_stmt = select(func.count()).select_from(Company)
|
|
if keyword:
|
|
pattern = f"%{keyword}%"
|
|
cond = or_(
|
|
Company.company_key.ilike(pattern),
|
|
Company.display_name.ilike(pattern),
|
|
Company.legal_name.ilike(pattern),
|
|
)
|
|
stmt = stmt.where(cond)
|
|
count_stmt = count_stmt.where(cond)
|
|
|
|
stmt = stmt.order_by(Company.created_at.desc()).limit(limit).offset(offset)
|
|
return list(self.db.scalars(stmt).all()), int(self.db.scalar(count_stmt) or 0)
|
|
|
|
def create(
|
|
self,
|
|
*,
|
|
company_key: str,
|
|
display_name: str,
|
|
legal_name: str | None,
|
|
provider_group_id: str | None = None,
|
|
status: str = "active",
|
|
) -> Company:
|
|
item = Company(
|
|
company_key=company_key,
|
|
display_name=display_name,
|
|
legal_name=legal_name,
|
|
provider_group_id=provider_group_id,
|
|
status=status,
|
|
)
|
|
self.db.add(item)
|
|
self.db.commit()
|
|
self.db.refresh(item)
|
|
return item
|
|
|
|
def update(
|
|
self,
|
|
item: Company,
|
|
*,
|
|
display_name: str | None = None,
|
|
legal_name: str | None = None,
|
|
provider_group_id: str | None = None,
|
|
status: str | None = None,
|
|
) -> Company:
|
|
if display_name is not None:
|
|
item.display_name = display_name
|
|
if legal_name is not None:
|
|
item.legal_name = legal_name
|
|
if provider_group_id is not None:
|
|
item.provider_group_id = provider_group_id
|
|
if status is not None:
|
|
item.status = status
|
|
self.db.commit()
|
|
self.db.refresh(item)
|
|
return item
|
|
|
|
def delete(self, item: Company) -> None:
|
|
self.db.delete(item)
|
|
self.db.commit()
|