from sqlalchemy import func, or_, select from sqlalchemy.orm import Session from app.models.user import User class UsersRepository: def __init__(self, db: Session) -> None: self.db = db def get_by_sub(self, user_sub: str) -> User | None: return self.db.scalar(select(User).where(User.user_sub == user_sub)) def get_by_id(self, user_id: str) -> User | None: return self.db.scalar(select(User).where(User.id == user_id)) def list( self, *, keyword: str | None = None, is_active: bool | None = None, limit: int = 50, offset: int = 0, ) -> tuple[list[User], int]: stmt = select(User) count_stmt = select(func.count()).select_from(User) if keyword: pattern = f"%{keyword}%" cond = or_( User.user_sub.ilike(pattern), User.username.ilike(pattern), User.email.ilike(pattern), User.display_name.ilike(pattern), ) stmt = stmt.where(cond) count_stmt = count_stmt.where(cond) if is_active is not None: stmt = stmt.where(User.is_active == is_active) count_stmt = count_stmt.where(User.is_active == is_active) stmt = stmt.order_by(User.created_at.desc()).limit(limit).offset(offset) items = list(self.db.scalars(stmt).all()) total = int(self.db.scalar(count_stmt) or 0) return items, total def upsert_by_sub( self, *, user_sub: str, username: str | None, email: str | None, display_name: str | None, is_active: bool, status: str = "active", provider_user_id: str | None = None, ) -> User: user = self.get_by_sub(user_sub) if user is None: user = User( user_sub=user_sub, provider_user_id=provider_user_id, username=username, email=email, display_name=display_name, is_active=is_active, status=status, ) self.db.add(user) else: if provider_user_id is not None: user.provider_user_id = provider_user_id user.username = username user.email = email user.display_name = display_name user.is_active = is_active user.status = status self.db.commit() self.db.refresh(user) return user def update_member( self, user: User, *, username: str | None = None, email: str | None = None, display_name: str | None = None, is_active: bool | None = None, status: str | None = None, ) -> User: if username is not None: user.username = username if email is not None: user.email = email if display_name is not None: user.display_name = display_name if is_active is not None: user.is_active = is_active if status is not None: user.status = status self.db.commit() self.db.refresh(user) return user def delete(self, user: User) -> None: self.db.delete(user) self.db.commit()