mirror of
https://github.com/rommapp/romm.git
synced 2026-02-18 00:27:41 +01:00
342 lines
11 KiB
Python
342 lines
11 KiB
Python
from enum import Enum
|
|
from typing import Any
|
|
import emoji
|
|
from config.config_manager import config_manager as cm
|
|
from handler.database import db_platform_handler
|
|
from handler.filesystem import (
|
|
fs_asset_handler,
|
|
fs_firmware_handler,
|
|
fs_resource_handler,
|
|
fs_rom_handler,
|
|
)
|
|
from handler.metadata import meta_igdb_handler, meta_moby_handler
|
|
from logger.logger import log
|
|
from models.assets import Save, Screenshot, State
|
|
from models.platform import Platform
|
|
from models.rom import Rom
|
|
from models.user import User
|
|
from models.firmware import Firmware
|
|
|
|
|
|
class ScanType(Enum):
|
|
NEW_PLATFORMS = "new_platforms"
|
|
QUICK = "quick"
|
|
UNIDENTIFIED = "unidentified"
|
|
PARTIAL = "partial"
|
|
COMPLETE = "complete"
|
|
|
|
|
|
def _get_main_platform_igdb_id(platform: Platform):
|
|
cnfg = cm.get_config()
|
|
|
|
if platform.fs_slug in cnfg.PLATFORMS_VERSIONS.keys():
|
|
main_platform_slug = cnfg.PLATFORMS_VERSIONS[platform.fs_slug]
|
|
main_platform = db_platform_handler.get_platform_by_fs_slug(main_platform_slug)
|
|
if main_platform:
|
|
main_platform_igdb_id = main_platform.igdb_id
|
|
else:
|
|
main_platform_igdb_id = meta_igdb_handler.get_platform(main_platform_slug)[
|
|
"igdb_id"
|
|
]
|
|
if not main_platform_igdb_id:
|
|
main_platform_igdb_id = platform.igdb_id
|
|
else:
|
|
main_platform_igdb_id = platform.igdb_id
|
|
return main_platform_igdb_id
|
|
|
|
|
|
def scan_platform(
|
|
fs_slug: str,
|
|
fs_platforms: list[str],
|
|
) -> Platform:
|
|
"""Get platform details
|
|
|
|
Args:
|
|
fs_slug: short name of the platform
|
|
Returns
|
|
Platform object
|
|
"""
|
|
|
|
log.info(f"· {fs_slug}")
|
|
|
|
platform_attrs: dict[str, Any] = {}
|
|
platform_attrs["fs_slug"] = fs_slug
|
|
|
|
cnfg = cm.get_config()
|
|
swapped_platform_bindings = dict((v, k) for k, v in cnfg.PLATFORMS_BINDING.items())
|
|
|
|
# Sometimes users change the name of the folder, so we try to match it with the config
|
|
if fs_slug not in fs_platforms:
|
|
log.warning(
|
|
f" {fs_slug} not found in file system, trying to match via config..."
|
|
)
|
|
if fs_slug in swapped_platform_bindings.keys():
|
|
platform = db_platform_handler.get_platform_by_fs_slug(fs_slug)
|
|
if platform:
|
|
platform_attrs["fs_slug"] = swapped_platform_bindings[platform.slug]
|
|
|
|
try:
|
|
if fs_slug in cnfg.PLATFORMS_BINDING.keys():
|
|
platform_attrs["slug"] = cnfg.PLATFORMS_BINDING[fs_slug]
|
|
else:
|
|
platform_attrs["slug"] = fs_slug
|
|
except (KeyError, TypeError, AttributeError):
|
|
platform_attrs["slug"] = fs_slug
|
|
|
|
igdb_platform = meta_igdb_handler.get_platform(platform_attrs["slug"])
|
|
moby_platform = meta_moby_handler.get_platform(platform_attrs["slug"])
|
|
|
|
platform_attrs["name"] = platform_attrs["slug"].replace("-", " ").title()
|
|
platform_attrs.update({**moby_platform, **igdb_platform}) # Reverse order
|
|
|
|
if platform_attrs["igdb_id"] or platform_attrs["moby_id"]:
|
|
log.info(
|
|
emoji.emojize(f" Identified as {platform_attrs['name']} :video_game:")
|
|
)
|
|
else:
|
|
log.warning(emoji.emojize(f" {platform_attrs['slug']} not found :cross_mark:"))
|
|
|
|
return Platform(**platform_attrs)
|
|
|
|
|
|
def scan_firmware(
|
|
platform: Platform,
|
|
file_name: str,
|
|
firmware: Firmware | None = None,
|
|
) -> Firmware:
|
|
firmware_path = fs_firmware_handler.get_firmware_fs_structure(platform.fs_slug)
|
|
|
|
log.info(f"\t · {file_name}")
|
|
|
|
# Set default properties
|
|
firmware_attrs = {
|
|
"id": firmware.id if firmware else None,
|
|
"platform_id": platform.id,
|
|
}
|
|
|
|
file_size = fs_firmware_handler.get_firmware_file_size(
|
|
firmware_path=firmware_path,
|
|
file_name=file_name,
|
|
)
|
|
|
|
firmware_attrs.update(
|
|
{
|
|
"file_path": firmware_path,
|
|
"file_name": file_name,
|
|
"file_name_no_tags": fs_firmware_handler.get_file_name_with_no_tags(
|
|
file_name
|
|
),
|
|
"file_name_no_ext": fs_firmware_handler.get_file_name_with_no_extension(
|
|
file_name
|
|
),
|
|
"file_extension": fs_firmware_handler.parse_file_extension(file_name),
|
|
"file_size_bytes": file_size,
|
|
}
|
|
)
|
|
|
|
file_hashes = fs_firmware_handler.calculate_file_hashes(
|
|
firmware_path=firmware_path,
|
|
file_name=file_name,
|
|
)
|
|
|
|
firmware_attrs.update(**file_hashes)
|
|
|
|
return Firmware(**firmware_attrs)
|
|
|
|
|
|
async def scan_rom(
|
|
platform: Platform,
|
|
rom_attrs: dict,
|
|
scan_type: ScanType,
|
|
rom: Rom | None = None,
|
|
metadata_sources: list[str] = ["igdb", "moby"],
|
|
) -> Rom:
|
|
roms_path = fs_rom_handler.get_roms_fs_structure(platform.fs_slug)
|
|
|
|
log.info(f"\t · {rom_attrs['file_name']}")
|
|
|
|
if rom_attrs.get("multi", False):
|
|
for file in rom_attrs["files"]:
|
|
log.info(f"\t\t · {file}")
|
|
|
|
# Set default properties
|
|
rom_attrs.update(
|
|
{
|
|
"id": rom.id if rom else None,
|
|
"platform_id": platform.id,
|
|
"name": rom_attrs["file_name"],
|
|
"url_cover": "",
|
|
"url_screenshots": [],
|
|
}
|
|
)
|
|
|
|
# Update properties from existing rom if not a complete rescan
|
|
if rom and scan_type != ScanType.COMPLETE:
|
|
rom_attrs.update(
|
|
{
|
|
"igdb_id": rom.igdb_id,
|
|
"moby_id": rom.moby_id,
|
|
"sgdb_id": rom.sgdb_id,
|
|
"name": rom.name,
|
|
"slug": rom.slug,
|
|
"summary": rom.summary,
|
|
"igdb_metadata": rom.igdb_metadata,
|
|
"moby_metadata": rom.moby_metadata,
|
|
"url_cover": rom.url_cover,
|
|
"path_cover_s": rom.path_cover_s,
|
|
"path_cover_l": rom.path_cover_l,
|
|
"path_screenshots": rom.path_screenshots,
|
|
"url_screenshots": rom.url_screenshots,
|
|
}
|
|
)
|
|
|
|
# Update properties that don't require metadata
|
|
file_size = fs_rom_handler.get_rom_file_size(
|
|
multi=rom_attrs["multi"],
|
|
file_name=rom_attrs["file_name"],
|
|
multi_files=rom_attrs["files"],
|
|
roms_path=roms_path,
|
|
)
|
|
regs, rev, langs, other_tags = fs_rom_handler.parse_tags(rom_attrs["file_name"])
|
|
rom_attrs.update(
|
|
{
|
|
"file_path": roms_path,
|
|
"file_name": rom_attrs["file_name"],
|
|
"file_name_no_tags": fs_rom_handler.get_file_name_with_no_tags(
|
|
rom_attrs["file_name"]
|
|
),
|
|
"file_name_no_ext": fs_rom_handler.get_file_name_with_no_extension(
|
|
rom_attrs["file_name"]
|
|
),
|
|
"file_extension": fs_rom_handler.parse_file_extension(
|
|
rom_attrs["file_name"]
|
|
),
|
|
"file_size_bytes": file_size,
|
|
"multi": rom_attrs["multi"],
|
|
"regions": regs,
|
|
"revision": rev,
|
|
"languages": langs,
|
|
"tags": other_tags,
|
|
}
|
|
)
|
|
|
|
igdb_handler_rom = {}
|
|
moby_handler_rom = {}
|
|
|
|
if (
|
|
"igdb" in metadata_sources
|
|
and platform.igdb_id
|
|
and (
|
|
not rom
|
|
or scan_type == ScanType.COMPLETE
|
|
or (scan_type == ScanType.PARTIAL and not rom.igdb_id)
|
|
or (scan_type == ScanType.UNIDENTIFIED and not rom.igdb_id)
|
|
)
|
|
):
|
|
main_platform_igdb_id = _get_main_platform_igdb_id(platform)
|
|
igdb_handler_rom = await meta_igdb_handler.get_rom(
|
|
rom_attrs["file_name"], main_platform_igdb_id
|
|
)
|
|
|
|
if (
|
|
"moby" in metadata_sources
|
|
and platform.moby_id
|
|
and (
|
|
not rom
|
|
or scan_type == ScanType.COMPLETE
|
|
or (scan_type == ScanType.PARTIAL and not rom.moby_id)
|
|
or (scan_type == ScanType.UNIDENTIFIED and not rom.moby_id)
|
|
)
|
|
):
|
|
moby_handler_rom = await meta_moby_handler.get_rom(
|
|
rom_attrs["file_name"], platform.moby_id
|
|
)
|
|
|
|
# Reversed to prioritize IGDB
|
|
rom_attrs.update({**moby_handler_rom, **igdb_handler_rom})
|
|
|
|
# Return early if not found in IGDB or MobyGames
|
|
if not igdb_handler_rom.get("igdb_id") and not moby_handler_rom.get("moby_id"):
|
|
log.warning(
|
|
emoji.emojize(f"\t {rom_attrs['file_name']} not found :cross_mark:")
|
|
)
|
|
return Rom(**rom_attrs)
|
|
|
|
log.info(emoji.emojize(f"\t Identified as {rom_attrs['name']} :alien_monster:"))
|
|
|
|
# Update properties from IGDB
|
|
if (
|
|
not rom
|
|
or scan_type == ScanType.COMPLETE
|
|
or (
|
|
scan_type == ScanType.PARTIAL
|
|
and rom
|
|
and (not rom.igdb_id or not rom.moby_id)
|
|
)
|
|
or (
|
|
scan_type == ScanType.UNIDENTIFIED
|
|
and rom
|
|
and not rom.igdb_id
|
|
and not rom.moby_id
|
|
)
|
|
):
|
|
rom_attrs.update(
|
|
fs_resource_handler.get_rom_cover(
|
|
overwrite=False,
|
|
platform_fs_slug=platform.slug,
|
|
rom_name=rom_attrs["name"],
|
|
url_cover=rom_attrs["url_cover"],
|
|
)
|
|
)
|
|
rom_attrs.update(
|
|
fs_resource_handler.get_rom_screenshots(
|
|
platform_fs_slug=platform.slug,
|
|
rom_name=rom_attrs["name"],
|
|
url_screenshots=rom_attrs["url_screenshots"],
|
|
)
|
|
)
|
|
|
|
return Rom(**rom_attrs)
|
|
|
|
|
|
def _scan_asset(file_name: str, path: str):
|
|
log.info(f"\t\t · {file_name}")
|
|
|
|
file_size = fs_asset_handler.get_asset_size(file_name=file_name, asset_path=path)
|
|
|
|
return {
|
|
"file_path": path,
|
|
"file_name": file_name,
|
|
"file_name_no_tags": fs_asset_handler.get_file_name_with_no_tags(file_name),
|
|
"file_name_no_ext": fs_asset_handler.get_file_name_with_no_extension(file_name),
|
|
"file_extension": fs_asset_handler.parse_file_extension(file_name),
|
|
"file_size_bytes": file_size,
|
|
}
|
|
|
|
|
|
def scan_save(
|
|
file_name: str, user: User, platform_fs_slug: str, emulator: str = None
|
|
) -> Save:
|
|
saves_path = fs_asset_handler.build_saves_file_path(
|
|
user=user, platform_fs_slug=platform_fs_slug, emulator=emulator
|
|
)
|
|
return Save(**_scan_asset(file_name, saves_path))
|
|
|
|
|
|
def scan_state(
|
|
file_name: str, user: User, platform_fs_slug: str, emulator: str = None
|
|
) -> State:
|
|
states_path = fs_asset_handler.build_states_file_path(
|
|
user=user, platform_fs_slug=platform_fs_slug, emulator=emulator
|
|
)
|
|
return State(**_scan_asset(file_name, states_path))
|
|
|
|
|
|
def scan_screenshot(
|
|
file_name: str, user: User, platform_fs_slug: str = None
|
|
) -> Screenshot:
|
|
screenshots_path = fs_asset_handler.build_screenshots_file_path(
|
|
user=user, platform_fs_slug=platform_fs_slug
|
|
)
|
|
return Screenshot(**_scan_asset(file_name, screenshots_path))
|