Merge branch 'master' into trunk-io

This commit is contained in:
Georges-Antoine Assi
2024-05-23 10:05:35 -04:00
61 changed files with 518 additions and 457 deletions

View File

@@ -1,41 +1,58 @@
import functools
from sqlalchemy import delete, or_
from sqlalchemy.orm import Session, Query, joinedload
from decorators.database import begin_session
from models.platform import Platform
from models.rom import Rom
from sqlalchemy import delete, func, or_, select
from sqlalchemy.orm import Session
from .base_handler import DBBaseHandler
def with_query(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
session = kwargs.get("session")
if session is None:
raise ValueError("session is required")
kwargs["query"] = session.query(Platform).options(joinedload(Platform.roms))
return func(*args, **kwargs)
return wrapper
class DBPlatformsHandler(DBBaseHandler):
@begin_session
def add_platform(self, platform: Platform, session: Session = None) -> Platform:
return session.merge(platform)
@with_query
def add_platform(
self, platform: Platform, query: Query = None, session: Session = None
) -> Platform | None:
session.merge(platform)
session.flush()
return query.filter(Platform.fs_slug == platform.fs_slug).first()
@begin_session
@with_query
def get_platforms(
self, id: int | None = None, session: Session = None
) -> Platform | list[Platform] | None:
self, id: int = None, query: Query = None, session: Session = None
) -> list[Platform] | Platform | None:
return (
session.get(Platform, id)
query.get(id)
if id
else (
session.scalars(select(Platform).order_by(Platform.name.asc())) # type: ignore[attr-defined]
.unique()
.all()
)
else (session.scalars(query.order_by(Platform.name.asc())).unique().all())
)
@begin_session
@with_query
def get_platform_by_fs_slug(
self, fs_slug: str, session: Session = None
self, fs_slug: str, query: Query = None, session: Session = None
) -> Platform | None:
return session.scalars(
select(Platform).filter_by(fs_slug=fs_slug).limit(1)
).first()
return session.scalars(query.filter_by(fs_slug=fs_slug).limit(1)).first()
@begin_session
def delete_platform(self, id: int, session: Session = None) -> None:
def delete_platform(self, id: int, session: Session = None) -> int:
# Remove all roms from that platforms first
session.execute(
delete(Rom)
@@ -49,13 +66,7 @@ class DBPlatformsHandler(DBBaseHandler):
)
@begin_session
def get_rom_count(self, platform_id: int, session: Session = None) -> int:
return session.scalar(
select(func.count()).select_from(Rom).filter_by(platform_id=platform_id)
)
@begin_session
def purge_platforms(self, fs_platforms: list[str], session: Session = None) -> None:
def purge_platforms(self, fs_platforms: list[str], session: Session = None) -> int:
return session.execute(
delete(Platform)
.where(or_(Platform.fs_slug.not_in(fs_platforms), Platform.slug.is_(None))) # type: ignore[attr-defined]