Files
romm/backend/handler/database/roms_handler.py

1099 lines
35 KiB
Python

import functools
from collections.abc import Iterable, Sequence
from typing import Any
from sqlalchemy import (
Integer,
Row,
String,
Text,
and_,
case,
cast,
delete,
false,
func,
literal,
not_,
or_,
select,
text,
update,
)
from sqlalchemy.orm import Query, Session, joinedload, noload, selectinload
from sqlalchemy.sql.elements import KeyedColumnElement
from config import ROMM_DB_DRIVER
from decorators.database import begin_session
from handler.metadata.base_handler import UniversalPlatformSlug as UPS
from models.assets import Save, Screenshot, State
from models.platform import Platform
from models.rom import Rom, RomFile, RomMetadata, RomNote, RomUser
from utils.database import (
json_array_contains_all,
json_array_contains_any,
json_array_contains_value,
)
from .base_handler import DBBaseHandler
EJS_SUPPORTED_PLATFORMS = [
UPS._3DO,
UPS.AMIGA,
UPS.AMIGA_CD,
UPS.AMIGA_CD32,
UPS.ARCADE,
UPS.NEOGEOAES,
UPS.NEOGEOMVS,
UPS.ATARI2600,
UPS.ATARI5200,
UPS.ATARI7800,
UPS.C_PLUS_4,
UPS.C64,
UPS.CPET,
UPS.C64,
UPS.C128,
UPS.COLECOVISION,
UPS.JAGUAR,
UPS.LYNX,
UPS.NEO_GEO_POCKET,
UPS.NEO_GEO_POCKET_COLOR,
UPS.NES,
UPS.FAMICOM,
UPS.FDS,
UPS.N64,
UPS.N64DD,
UPS.NDS,
UPS.NINTENDO_DSI,
UPS.GB,
UPS.GBA,
UPS.PC_FX,
UPS.PHILIPS_CD_I,
UPS.PSX,
UPS.PSP,
UPS.SEGACD,
UPS.GENESIS,
UPS.SMS,
UPS.GAMEGEAR,
UPS.SATURN,
UPS.SNES,
UPS.SFAM,
UPS.TG16,
UPS.VIC_20,
UPS.VIRTUALBOY,
UPS.WONDERSWAN,
UPS.WONDERSWAN_COLOR,
]
STRIP_ARTICLES_REGEX = r"^(the|a|an)\s+"
def _create_metadata_id_case(
prefix: str, id_column: KeyedColumnElement, platform_id_column: KeyedColumnElement
):
return case(
(
id_column.isnot(None),
func.concat(
f"{prefix}-",
platform_id_column,
"-",
id_column,
),
),
else_=None,
)
def with_details(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
kwargs["query"] = select(Rom).options(
# Ensure platform is loaded for main ROM objects
joinedload(Rom.platform),
selectinload(Rom.saves).options(
noload(Save.rom),
noload(Save.user),
),
selectinload(Rom.states).options(
noload(State.rom),
noload(State.user),
),
selectinload(Rom.screenshots).options(
noload(Screenshot.rom),
),
selectinload(Rom.rom_users).options(noload(RomUser.rom)),
selectinload(Rom.metadatum).options(noload(RomMetadata.rom)),
selectinload(Rom.files).options(noload(RomFile.rom)),
selectinload(Rom.sibling_roms).options(
noload(Rom.platform), noload(Rom.metadatum)
),
selectinload(Rom.collections),
)
return func(*args, **kwargs)
return wrapper
def with_simple(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
kwargs["query"] = select(Rom).options(
# Ensure platform is loaded for main ROM objects
joinedload(Rom.platform),
# Display properties for the current user (last_played)
selectinload(Rom.rom_users).options(noload(RomUser.rom)),
# Sort table by metadata (first_release_date)
selectinload(Rom.metadatum).options(noload(RomMetadata.rom)),
# Required for multi-file ROM actions and 3DS QR code
selectinload(Rom.files).options(noload(RomFile.rom)),
# Show sibling rom badges on cards
selectinload(Rom.sibling_roms).options(
noload(Rom.platform), noload(Rom.metadatum)
),
)
return func(*args, **kwargs)
return wrapper
class DBRomsHandler(DBBaseHandler):
@begin_session
@with_details
def add_rom(self, rom: Rom, query: Query = None, session: Session = None) -> Rom:
rom = session.merge(rom)
session.flush()
return session.scalar(query.filter_by(id=rom.id).limit(1))
@begin_session
@with_details
def get_rom(
self, id: int, *, query: Query = None, session: Session = None
) -> Rom | None:
return session.scalar(query.filter_by(id=id).limit(1))
@begin_session
@with_details
def get_roms_by_ids(
self, ids: list[int], *, query: Query = None, session: Session = None
) -> Sequence[Rom]:
"""Get multiple ROMs by their IDs."""
if not ids:
return []
return session.scalars(query.filter(Rom.id.in_(ids))).all()
def filter_by_platform_id(self, query: Query, platform_id: int):
return query.filter(Rom.platform_id == platform_id)
def filter_by_collection_id(
self, query: Query, session: Session, collection_id: int
):
from . import db_collection_handler
collection = db_collection_handler.get_collection(collection_id)
if collection:
return query.filter(Rom.id.in_(collection.rom_ids))
return query
def filter_by_virtual_collection_id(
self, query: Query, session: Session, virtual_collection_id: str
):
from . import db_collection_handler
v_collection = db_collection_handler.get_virtual_collection(
virtual_collection_id
)
if v_collection:
return query.filter(Rom.id.in_(v_collection.rom_ids))
return query
def filter_by_smart_collection_id(
self, query: Query, session: Session, smart_collection_id: int, user_id: int
):
from . import db_collection_handler
smart_collection = db_collection_handler.get_smart_collection(
smart_collection_id
)
if smart_collection:
# Ensure the latest ROMs are loaded
smart_collection = smart_collection.update_properties(user_id)
return query.filter(Rom.id.in_(smart_collection.rom_ids))
return query
def filter_by_search_term(self, query: Query, search_term: str):
return query.filter(
or_(
Rom.fs_name.ilike(f"%{search_term}%"),
Rom.name.ilike(f"%{search_term}%"),
)
)
def filter_by_matched(self, query: Query, value: bool) -> Query:
"""Filter based on whether the rom is matched to a metadata provider."""
predicate = or_(
Rom.igdb_id.isnot(None),
Rom.moby_id.isnot(None),
Rom.ss_id.isnot(None),
Rom.ra_id.isnot(None),
Rom.launchbox_id.isnot(None),
Rom.hasheous_id.isnot(None),
Rom.tgdb_id.isnot(None),
Rom.flashpoint_id.isnot(None),
)
if not value:
predicate = not_(predicate)
return query.filter(predicate)
def filter_by_favorite(
self, query: Query, session: Session, value: bool, user_id: int | None
) -> Query:
"""Filter based on whether the rom is in the user's favorites collection."""
if not user_id:
return query
from . import db_collection_handler
favorites_collection = db_collection_handler.get_favorite_collection(user_id)
if favorites_collection:
predicate = Rom.id.in_(favorites_collection.rom_ids)
if not value:
predicate = not_(predicate)
return query.filter(predicate)
# If no favorites collection exists, return the original query if non-favorites
# were requested, or an empty query if favorites were requested.
if not value:
return query
return query.filter(false())
def filter_by_duplicate(self, query: Query, value: bool) -> Query:
"""Filter based on whether the rom has duplicates."""
predicate = Rom.sibling_roms.any()
if not value:
predicate = not_(predicate)
return query.filter(predicate)
def filter_by_playable(self, query: Query, value: bool) -> Query:
"""Filter based on whether the rom is playable on supported platforms."""
predicate = Platform.slug.in_(EJS_SUPPORTED_PLATFORMS)
if not value:
predicate = not_(predicate)
return query.join(Platform).filter(predicate)
def filter_by_has_ra(self, query: Query, value: bool) -> Query:
predicate = Rom.ra_id.isnot(None)
if not value:
predicate = not_(predicate)
return query.filter(predicate)
def filter_by_missing_from_fs(self, query: Query, value: bool) -> Query:
predicate = Rom.missing_from_fs.isnot(False)
if not value:
predicate = not_(predicate)
return query.filter(predicate)
def filter_by_verified(self, query: Query):
keys_to_check = [
"tosec_match",
"mame_arcade_match",
"mame_mess_match",
"nointro_match",
"redump_match",
"whdload_match",
"ra_match",
"fbneo_match",
"puredos_match",
]
if ROMM_DB_DRIVER == "postgresql":
conditions = " OR ".join(
f"(hasheous_metadata->>'{key}')::boolean" for key in keys_to_check
)
return query.filter(text(conditions))
else:
return query.filter(
or_(*(Rom.hasheous_metadata[key].as_boolean() for key in keys_to_check))
)
def filter_by_genres(
self,
query: Query,
*,
session: Session,
values: Sequence[str],
match_all: bool = False,
) -> Query:
op = json_array_contains_all if match_all else json_array_contains_any
return query.filter(op(RomMetadata.genres, values, session=session))
def filter_by_franchises(
self,
query: Query,
*,
session: Session,
values: Sequence[str],
match_all: bool = False,
) -> Query:
op = json_array_contains_all if match_all else json_array_contains_any
return query.filter(op(RomMetadata.franchises, values, session=session))
def filter_by_collections(
self,
query: Query,
*,
session: Session,
values: Sequence[str],
match_all: bool = False,
) -> Query:
op = json_array_contains_all if match_all else json_array_contains_any
return query.filter(op(RomMetadata.collections, values, session=session))
def filter_by_companies(
self,
query: Query,
*,
session: Session,
values: Sequence[str],
match_all: bool = False,
) -> Query:
op = json_array_contains_all if match_all else json_array_contains_any
return query.filter(op(RomMetadata.companies, values, session=session))
def filter_by_age_ratings(
self,
query: Query,
*,
session: Session,
values: Sequence[str],
match_all: bool = False,
) -> Query:
op = json_array_contains_all if match_all else json_array_contains_any
return query.filter(op(RomMetadata.age_ratings, values, session=session))
def filter_by_status(self, query: Query, selected_status: str):
status_filter = RomUser.status == selected_status
if selected_status == "now_playing":
status_filter = RomUser.now_playing.is_(True)
elif selected_status == "backlogged":
status_filter = RomUser.backlogged.is_(True)
elif selected_status == "hidden":
status_filter = RomUser.hidden.is_(True)
if selected_status == "hidden":
return query.filter(status_filter)
return query.filter(status_filter, RomUser.hidden.is_(False))
def filter_by_regions(
self,
query: Query,
*,
session: Session,
values: Sequence[str],
match_all: bool = False,
) -> Query:
op = json_array_contains_all if match_all else json_array_contains_any
return query.filter(op(Rom.regions, values, session=session))
def filter_by_languages(
self,
query: Query,
*,
session: Session,
values: Sequence[str],
match_all: bool = False,
) -> Query:
op = json_array_contains_all if match_all else json_array_contains_any
return query.filter(op(Rom.languages, values, session=session))
@begin_session
def filter_roms(
self,
query: Query,
platform_id: int | None = None,
collection_id: int | None = None,
virtual_collection_id: str | None = None,
smart_collection_id: int | None = None,
search_term: str | None = None,
matched: bool | None = None,
favorite: bool | None = None,
duplicate: bool | None = None,
playable: bool | None = None,
has_ra: bool | None = None,
missing: bool | None = None,
verified: bool | None = None,
group_by_meta_id: bool = False,
genres: Sequence[str] | None = None,
franchises: Sequence[str] | None = None,
collections: Sequence[str] | None = None,
companies: Sequence[str] | None = None,
age_ratings: Sequence[str] | None = None,
selected_status: str | None = None,
regions: Sequence[str] | None = None,
languages: Sequence[str] | None = None,
# Logic operators for multi-value filters
genres_logic: str = "any",
franchises_logic: str = "any",
collections_logic: str = "any",
companies_logic: str = "any",
age_ratings_logic: str = "any",
regions_logic: str = "any",
languages_logic: str = "any",
user_id: int | None = None,
session: Session = None,
) -> Query[Rom]:
from handler.scan_handler import MetadataSource
if platform_id:
query = self.filter_by_platform_id(query, platform_id)
if collection_id:
query = self.filter_by_collection_id(query, session, collection_id)
if virtual_collection_id:
query = self.filter_by_virtual_collection_id(
query, session, virtual_collection_id
)
if smart_collection_id and user_id:
query = self.filter_by_smart_collection_id(
query, session, smart_collection_id, user_id
)
if search_term:
query = self.filter_by_search_term(query, search_term)
if matched is not None:
query = self.filter_by_matched(query, value=matched)
if favorite is not None:
query = self.filter_by_favorite(
query, session=session, value=favorite, user_id=user_id
)
if duplicate is not None:
query = self.filter_by_duplicate(query, value=duplicate)
if playable is not None:
query = self.filter_by_playable(query, value=playable)
if has_ra is not None:
query = self.filter_by_has_ra(query, value=has_ra)
if missing is not None:
query = self.filter_by_missing_from_fs(query, value=missing)
# TODO: Correctly support true/false values.
if verified:
query = self.filter_by_verified(query)
# BEWARE YE WHO ENTERS HERE 💀
if group_by_meta_id:
# Convert NULL is_main_sibling to 0 (false) so it sorts after true values
is_main_sibling_order = (
func.coalesce(cast(RomUser.is_main_sibling, Integer), 0).desc()
if user_id
else literal(1)
)
# Create a subquery that identifies the primary ROM in each group
# Priority order: is_main_sibling (desc), then by fs_name_no_ext (asc)
base_subquery = query.subquery()
group_subquery = (
select(base_subquery.c.id)
.select_from(base_subquery)
.with_only_columns(
base_subquery.c.id,
base_subquery.c.fs_name_no_ext,
base_subquery.c.platform_id,
base_subquery.c.igdb_id,
base_subquery.c.ss_id,
base_subquery.c.moby_id,
base_subquery.c.ra_id,
base_subquery.c.hasheous_id,
base_subquery.c.launchbox_id,
base_subquery.c.tgdb_id,
base_subquery.c.flashpoint_id,
)
.outerjoin(
RomUser,
and_(
base_subquery.c.id == RomUser.rom_id, RomUser.user_id == user_id
),
)
.add_columns(
func.row_number()
.over(
partition_by=func.coalesce(
_create_metadata_id_case(
MetadataSource.IGDB,
base_subquery.c.igdb_id,
base_subquery.c.platform_id,
),
_create_metadata_id_case(
MetadataSource.SS,
base_subquery.c.ss_id,
base_subquery.c.platform_id,
),
_create_metadata_id_case(
MetadataSource.MOBY,
base_subquery.c.moby_id,
base_subquery.c.platform_id,
),
_create_metadata_id_case(
MetadataSource.RA,
base_subquery.c.ra_id,
base_subquery.c.platform_id,
),
_create_metadata_id_case(
MetadataSource.HASHEOUS,
base_subquery.c.hasheous_id,
base_subquery.c.platform_id,
),
_create_metadata_id_case(
MetadataSource.LAUNCHBOX,
base_subquery.c.launchbox_id,
base_subquery.c.platform_id,
),
_create_metadata_id_case(
MetadataSource.TGDB,
base_subquery.c.tgdb_id,
base_subquery.c.platform_id,
),
_create_metadata_id_case(
MetadataSource.FLASHPOINT,
base_subquery.c.flashpoint_id,
base_subquery.c.platform_id,
),
_create_metadata_id_case(
"romm",
base_subquery.c.id,
base_subquery.c.platform_id,
),
),
order_by=[
is_main_sibling_order,
base_subquery.c.fs_name_no_ext.asc(),
],
)
.label("row_num"),
)
.subquery()
)
# Add a filter to the original query to only include the primary ROM from each group
query = query.filter(
Rom.id.in_(
session.query(group_subquery.c.id).filter(
group_subquery.c.row_num == 1
)
)
)
# Optimize JOINs - only join tables when needed
needs_metadata_join = any(
[genres, franchises, collections, companies, age_ratings]
)
if needs_metadata_join:
query = query.outerjoin(RomMetadata)
# Apply metadata filters efficiently
if genres:
query = self.filter_by_genres(
query, session=session, values=genres, match_all=(genres_logic == "all")
)
if franchises:
query = self.filter_by_franchises(
query,
session=session,
values=franchises,
match_all=(franchises_logic == "all"),
)
if collections:
query = self.filter_by_collections(
query,
session=session,
values=collections,
match_all=(collections_logic == "all"),
)
if companies:
query = self.filter_by_companies(
query,
session=session,
values=companies,
match_all=(companies_logic == "all"),
)
if age_ratings:
query = self.filter_by_age_ratings(
query,
session=session,
values=age_ratings,
match_all=(age_ratings_logic == "all"),
)
# Apply rom-level filters
if regions:
query = self.filter_by_regions(
query,
session=session,
values=regions,
match_all=(regions_logic == "all"),
)
if languages:
query = self.filter_by_languages(
query,
session=session,
values=languages,
match_all=(languages_logic == "all"),
)
# The RomUser table is already joined if user_id is set
if selected_status and user_id:
query = self.filter_by_status(query, selected_status)
elif user_id:
query = query.filter(
or_(RomUser.hidden.is_(False), RomUser.hidden.is_(None))
)
return query
@with_simple
@begin_session
def get_roms_query(
self,
*,
order_by: str = "name",
order_dir: str = "asc",
user_id: int | None = None,
query: Query = None,
session: Session = None,
) -> tuple[Query[Rom], Any]:
if user_id:
query = query.outerjoin(
RomUser, and_(RomUser.rom_id == Rom.id, RomUser.user_id == user_id)
)
if user_id and hasattr(RomUser, order_by) and not hasattr(Rom, order_by):
order_attr = getattr(RomUser, order_by)
query = query.filter(RomUser.user_id == user_id, order_attr.isnot(None))
elif hasattr(RomMetadata, order_by) and not hasattr(Rom, order_by):
order_attr = getattr(RomMetadata, order_by)
query = query.outerjoin(RomMetadata, RomMetadata.rom_id == Rom.id).filter(
order_attr.isnot(None)
)
elif hasattr(Rom, order_by):
order_attr = getattr(Rom, order_by)
else:
order_attr = Rom.name
order_attr_column = order_attr
# Ignore case when the order attribute is a number
if isinstance(order_attr.type, (String, Text)):
# Remove any leading articles
order_attr = func.trim(
func.lower(order_attr).regexp_replace(STRIP_ARTICLES_REGEX, "", "i")
)
if order_dir.lower() == "desc":
order_attr = order_attr.desc()
else:
order_attr = order_attr.asc()
return query.order_by(order_attr), order_attr_column
@begin_session
def get_roms_scalar(
self,
*,
session: Session = None,
**kwargs,
) -> Sequence[Rom]:
query, _ = self.get_roms_query(
order_by=kwargs.get("order_by", "name"),
order_dir=kwargs.get("order_dir", "asc"),
user_id=kwargs.get("user_id", None),
)
roms = self.filter_roms(
query=query,
platform_id=kwargs.get("platform_id", None),
collection_id=kwargs.get("collection_id", None),
virtual_collection_id=kwargs.get("virtual_collection_id", None),
search_term=kwargs.get("search_term", None),
matched=kwargs.get("matched", None),
favorite=kwargs.get("favorite", None),
duplicate=kwargs.get("duplicate", None),
playable=kwargs.get("playable", None),
has_ra=kwargs.get("has_ra", None),
missing=kwargs.get("missing", None),
verified=kwargs.get("verified", None),
genres=kwargs.get("genres", None),
franchises=kwargs.get("franchises", None),
collections=kwargs.get("collections", None),
companies=kwargs.get("companies", None),
age_ratings=kwargs.get("age_ratings", None),
selected_status=kwargs.get("selected_status", None),
regions=kwargs.get("regions", None),
languages=kwargs.get("languages", None),
user_id=kwargs.get("user_id", None),
)
return session.scalars(roms).all()
@begin_session
def with_char_index(
self, query: Query, order_by_attr: Any, session: Session = None
) -> list[Row[tuple[str, int]]]:
if isinstance(order_by_attr.type, (String, Text)):
# Remove any leading articles
order_by_attr = func.trim(
func.lower(order_by_attr).regexp_replace(STRIP_ARTICLES_REGEX, "", "i")
)
else:
order_by_attr = func.trim(
func.lower(Rom.name).regexp_replace(STRIP_ARTICLES_REGEX, "", "i")
)
# Get the row number and first letter for each item
subquery = (
query.with_only_columns(Rom.id, Rom.name) # type: ignore
.add_columns( # type: ignore
func.substring(
order_by_attr,
1,
1,
).label("letter"),
func.row_number().over(order_by=order_by_attr).label("position"),
)
.subquery()
)
# Get the minimum position for each letter
return (
session.query(
subquery.c.letter, func.min(subquery.c.position - 1).label("position")
)
.group_by(subquery.c.letter)
.order_by(subquery.c.letter)
.all()
)
@begin_session
@with_details
def get_roms_by_fs_name(
self,
platform_id: int,
fs_names: Iterable[str],
query: Query = None,
session: Session = None,
) -> dict[str, Rom]:
"""Retrieve a dictionary of roms by their filesystem names."""
roms = (
session.scalars(
query.filter(Rom.fs_name.in_(fs_names)).filter_by(
platform_id=platform_id
)
)
.unique()
.all()
)
return {rom.fs_name: rom for rom in roms}
@begin_session
def update_rom(self, id: int, data: dict, session: Session = None) -> Rom:
session.execute(
update(Rom)
.where(Rom.id == id)
.values(**data)
.execution_options(synchronize_session="evaluate")
)
return session.query(Rom).filter_by(id=id).one()
@begin_session
def delete_rom(self, id: int, session: Session = None) -> None:
session.execute(
delete(Rom)
.where(Rom.id == id)
.execution_options(synchronize_session="evaluate")
)
@begin_session
def mark_missing_roms(
self, platform_id: int, fs_roms_to_keep: list[str], session: Session = None
) -> Sequence[Rom]:
missing_roms = (
session.scalars(
select(Rom)
.order_by(Rom.fs_name.asc())
.where(
and_(
Rom.platform_id == platform_id,
Rom.fs_name.not_in(fs_roms_to_keep),
)
)
)
.unique()
.all()
)
session.execute(
update(Rom)
.where(
and_(
Rom.platform_id == platform_id, Rom.fs_name.not_in(fs_roms_to_keep)
)
)
.values(**{"missing_from_fs": True})
.execution_options(synchronize_session="evaluate")
)
return missing_roms
@begin_session
def add_rom_user(
self, rom_id: int, user_id: int, session: Session = None
) -> RomUser:
return session.merge(RomUser(rom_id=rom_id, user_id=user_id))
@begin_session
def get_rom_user(
self, rom_id: int, user_id: int, session: Session = None
) -> RomUser | None:
return session.scalar(
select(RomUser).filter_by(rom_id=rom_id, user_id=user_id).limit(1)
)
@begin_session
def get_rom_user_by_id(self, id: int, session: Session = None) -> RomUser | None:
return session.scalar(select(RomUser).filter_by(id=id).limit(1))
@begin_session
def update_rom_user(
self, id: int, data: dict, session: Session = None
) -> RomUser | None:
session.execute(
update(RomUser)
.where(RomUser.id == id)
.values(**data)
.execution_options(synchronize_session="evaluate")
)
rom_user = self.get_rom_user_by_id(id)
if not rom_user:
return None
if not data.get("is_main_sibling", False):
return rom_user
rom = self.get_rom(rom_user.rom_id)
if not rom:
return rom_user
session.execute(
update(RomUser)
.where(
and_(
RomUser.rom_id.in_(r.id for r in rom.sibling_roms),
RomUser.user_id == rom_user.user_id,
)
)
.values(is_main_sibling=False)
)
return session.query(RomUser).filter_by(id=id).one()
@begin_session
def add_rom_file(self, rom_file: RomFile, session: Session = None) -> RomFile:
return session.merge(rom_file)
@begin_session
def get_rom_files(self, rom_id: int, session: Session = None) -> Sequence[RomFile]:
return session.scalars(select(RomFile).filter_by(rom_id=rom_id)).all()
@begin_session
def get_rom_file_by_id(self, id: int, session: Session = None) -> RomFile | None:
return session.scalar(select(RomFile).filter_by(id=id).limit(1))
@begin_session
def update_rom_file(self, id: int, data: dict, session: Session = None) -> RomFile:
session.execute(
update(RomFile)
.where(RomFile.id == id)
.values(**data)
.execution_options(synchronize_session="evaluate")
)
return session.query(RomFile).filter_by(id=id).one()
@begin_session
def purge_rom_files(
self, rom_id: int, session: Session = None
) -> Sequence[RomFile]:
purged_rom_files = (
session.scalars(select(RomFile).filter_by(rom_id=rom_id)).unique().all()
)
session.execute(
delete(RomFile)
.where(RomFile.rom_id == rom_id)
.execution_options(synchronize_session="evaluate")
)
return purged_rom_files
# Note management methods
@begin_session
def get_rom_notes(
self,
rom_id: int,
user_id: int,
public_only: bool = False,
search: str = "",
tags: list[str] | None = None,
session: Session = None,
) -> Sequence[RomNote]:
query = session.query(RomNote).filter(RomNote.rom_id == rom_id)
if public_only:
query = query.filter(RomNote.is_public)
else:
# Include user's own notes (private + public) and public notes from others
query = query.filter(or_(RomNote.user_id == user_id, RomNote.is_public))
if search:
query = query.filter(
or_(RomNote.title.contains(search), RomNote.content.contains(search))
)
if tags:
for tag in tags:
query = query.filter(
json_array_contains_value(RomNote.tags, tag, session=session)
)
return query.order_by(RomNote.updated_at.desc()).all()
@begin_session
def create_rom_note(
self,
rom_id: int,
user_id: int,
title: str,
content: str = "",
is_public: bool = False,
tags: list[str] | None = None,
session: Session = None,
) -> dict:
note = RomNote(
rom_id=rom_id,
user_id=user_id,
title=title,
content=content,
is_public=is_public,
tags=tags or [],
)
session.add(note)
session.flush() # To get the ID
# Return dict to avoid detached instance issues
return {
"id": note.id,
"title": note.title,
"content": note.content,
"is_public": note.is_public,
"tags": note.tags,
"created_at": note.created_at,
"updated_at": note.updated_at,
"rom_id": note.rom_id,
"user_id": note.user_id,
}
@begin_session
def update_rom_note(
self,
note_id: int,
user_id: int,
session: Session = None,
**fields,
) -> dict | None:
note = (
session.query(RomNote)
.filter(RomNote.id == note_id, RomNote.user_id == user_id)
.first()
)
if not note:
return None
for field, value in fields.items():
if hasattr(note, field):
setattr(note, field, value)
session.flush() # Ensure changes are committed
# Return dict to avoid detached instance issues
return {
"id": note.id,
"title": note.title,
"content": note.content,
"is_public": note.is_public,
"tags": note.tags,
"created_at": note.created_at,
"updated_at": note.updated_at,
"rom_id": note.rom_id,
"user_id": note.user_id,
}
@begin_session
def delete_rom_note(
self, note_id: int, user_id: int, session: Session = None
) -> bool:
result = session.execute(
delete(RomNote).where(
and_(RomNote.id == note_id, RomNote.user_id == user_id)
)
)
return result.rowcount > 0
@begin_session
@with_details
def get_rom_by_metadata_id(
self,
igdb: int | None = None,
moby: int | None = None,
ss: int | None = None,
ra: int | None = None,
launchbox: int | None = None,
hasheous: int | None = None,
tgdb: int | None = None,
flashpoint: str | None = None,
hltb: int | None = None,
*,
query: Query = None,
session: Session = None,
) -> Rom | None:
"""Get a ROM by any metadata ID."""
filters = []
param_map = [
(igdb, Rom.igdb_id),
(moby, Rom.moby_id),
(ss, Rom.ss_id),
(ra, Rom.ra_id),
(launchbox, Rom.launchbox_id),
(hasheous, Rom.hasheous_id),
(tgdb, Rom.tgdb_id),
(flashpoint, Rom.flashpoint_id),
(hltb, Rom.hltb_id),
]
for value, column in param_map:
if value is not None:
filters.append(column == value)
if not filters:
return None
# Use OR to find ROM matching any of the provided metadata IDs
return session.scalar(query.filter(or_(*filters)).limit(1))