Files
romm/backend/handler/filesystem/roms_handler.py
2024-07-15 18:27:29 -04:00

206 lines
6.5 KiB
Python

import binascii
import hashlib
import os
import re
import shutil
from pathlib import Path
from config import LIBRARY_BASE_PATH
from config.config_manager import config_manager as cm
from exceptions.fs_exceptions import RomAlreadyExistsException, RomsNotFoundException
from models.platform import Platform
from utils.filesystem import iter_directories, iter_files
from .base_handler import (
LANGUAGES_BY_SHORTCODE,
LANGUAGES_NAME_KEYS,
REGIONS_BY_SHORTCODE,
REGIONS_NAME_KEYS,
TAG_REGEX,
FSHandler,
)
class FSRomsHandler(FSHandler):
def __init__(self) -> None:
pass
def remove_file(self, file_name: str, file_path: str):
try:
os.remove(f"{LIBRARY_BASE_PATH}/{file_path}/{file_name}")
except IsADirectoryError:
shutil.rmtree(f"{LIBRARY_BASE_PATH}/{file_path}/{file_name}")
def parse_tags(self, file_name: str) -> tuple:
rev = ""
regs = []
langs = []
other_tags = []
tags = [tag[0] or tag[1] for tag in TAG_REGEX.findall(file_name)]
tags = [tag for subtags in tags for tag in subtags.split(",")]
tags = [tag.strip() for tag in tags]
for tag in tags:
if tag.lower() in REGIONS_BY_SHORTCODE.keys():
regs.append(REGIONS_BY_SHORTCODE[tag.lower()])
continue
if tag.lower() in REGIONS_NAME_KEYS:
regs.append(tag)
continue
if tag.lower() in LANGUAGES_BY_SHORTCODE.keys():
langs.append(LANGUAGES_BY_SHORTCODE[tag.lower()])
continue
if tag.lower() in LANGUAGES_NAME_KEYS:
langs.append(tag)
continue
if "reg" in tag.lower():
match = re.match(r"^reg[\s|-](.*)$", tag, re.IGNORECASE)
if match:
regs.append(
REGIONS_BY_SHORTCODE[match.group(1).lower()]
if match.group(1).lower() in REGIONS_BY_SHORTCODE.keys()
else match.group(1)
)
continue
if "rev" in tag.lower():
match = re.match(r"^rev[\s|-](.*)$", tag, re.IGNORECASE)
if match:
rev = match.group(1)
continue
other_tags.append(tag)
return regs, rev, langs, other_tags
def _exclude_multi_roms(self, roms) -> list[str]:
excluded_names = cm.get_config().EXCLUDED_MULTI_FILES
filtered_files: list = []
for rom in roms:
if rom in excluded_names:
filtered_files.append(rom)
return [f for f in roms if f not in filtered_files]
def _calculate_rom_size(self, data: bytes) -> int:
return {
"crc_hash": (binascii.crc32(data) & 0xFFFFFFFF)
.to_bytes(4, byteorder="big")
.hex(),
"md5_hash": hashlib.md5(data).hexdigest(),
"sha1_hash": hashlib.sha1(data).hexdigest(),
}
def get_rom_files(self, rom: str, roms_path: str) -> list[str]:
rom_files: list[str] = []
# Check if rom is a multi-part rom
if os.path.isdir(f"{roms_path}/{rom}"):
multi_files = os.listdir(f"{roms_path}/{rom}")
for file in multi_files:
with open(Path(roms_path, rom, file), "rb") as f:
data = f.read()
rom_files.append(
{
"filename": file,
**self._calculate_rom_size(data),
}
)
else:
with open(Path(roms_path, rom), "rb") as f:
data = f.read()
rom_files.append(
{
"filename": rom,
**self._calculate_rom_size(data),
}
)
return rom_files
def get_roms(self, platform: Platform):
"""Gets all filesystem roms for a platform
Args:
platform: platform where roms belong
Returns:
list with all the filesystem roms for a platform found in the LIBRARY_BASE_PATH
"""
roms_path = self.get_roms_fs_structure(platform.fs_slug)
roms_file_path = f"{LIBRARY_BASE_PATH}/{roms_path}"
try:
fs_single_roms = [f for _, f in iter_files(roms_file_path)]
except IndexError as exc:
raise RomsNotFoundException(platform.fs_slug) from exc
try:
fs_multi_roms = [d for _, d in iter_directories(roms_file_path)]
except IndexError as exc:
raise RomsNotFoundException(platform.fs_slug) from exc
fs_roms: list[dict] = [
{"multi": False, "file_name": rom}
for rom in self._exclude_files(fs_single_roms, "single")
] + [
{"multi": True, "file_name": rom}
for rom in self._exclude_multi_roms(fs_multi_roms)
]
return [
dict(
rom,
files=self.get_rom_files(rom["file_name"], roms_file_path),
)
for rom in fs_roms
]
def get_rom_file_size(
self,
roms_path: str,
file_name: str,
multi: bool,
multi_files: list[str] | None = None,
):
if multi_files is None:
multi_files = []
files = (
[f"{LIBRARY_BASE_PATH}/{roms_path}/{file_name}"]
if not multi
else [
f"{LIBRARY_BASE_PATH}/{roms_path}/{file_name}/{file}"
for file in multi_files
]
)
return sum([os.stat(file).st_size for file in files])
def file_exists(self, path: str, file_name: str):
"""Check if file exists in filesystem
Args:
path: path to file
file_name: name of file
Returns
True if file exists in filesystem else False
"""
return bool(os.path.exists(f"{LIBRARY_BASE_PATH}/{path}/{file_name}"))
def rename_file(self, old_name: str, new_name: str, file_path: str):
if new_name != old_name:
if self.file_exists(path=file_path, file_name=new_name):
raise RomAlreadyExistsException(new_name)
os.rename(
f"{LIBRARY_BASE_PATH}/{file_path}/{old_name}",
f"{LIBRARY_BASE_PATH}/{file_path}/{new_name}",
)
def build_upload_file_path(self, fs_slug: str):
file_path = self.get_roms_fs_structure(fs_slug)
return f"{LIBRARY_BASE_PATH}/{file_path}"