[HOTFIX] Fix uploading cover art manually

This commit is contained in:
Georges-Antoine Assi
2025-07-18 23:29:03 -04:00
parent 28bc570da1
commit 73545ffa97
6 changed files with 103 additions and 64 deletions

View File

@@ -18,7 +18,6 @@ from logger.formatter import BLUE
from logger.formatter import highlight as hl
from logger.logger import log
from models.collection import Collection
from PIL import Image
from sqlalchemy.inspection import inspect
from utils.router import APIRouter
@@ -61,15 +60,13 @@ async def add_collection(
if artwork is not None and artwork.filename is not None:
file_ext = artwork.filename.split(".")[-1]
artwork_content = BytesIO(await artwork.read())
(
path_cover_l,
path_cover_s,
) = await fs_resource_handler.build_artwork_path(_added_collection, file_ext)
artwork_content = BytesIO(await artwork.read())
with Image.open(artwork_content) as img:
img.save(path_cover_l)
fs_resource_handler.resize_cover_to_small(img, save_path=path_cover_s)
) = await fs_resource_handler.store_artwork(
_added_collection, artwork_content, file_ext
)
else:
path_cover_s, path_cover_l = await fs_resource_handler.get_cover(
entity=_added_collection,
@@ -214,36 +211,38 @@ async def update_collection(
else:
if artwork is not None and artwork.filename is not None:
file_ext = artwork.filename.split(".")[-1]
artwork_content = BytesIO(await artwork.read())
(
path_cover_l,
path_cover_s,
) = await fs_resource_handler.build_artwork_path(collection, file_ext)
) = await fs_resource_handler.store_artwork(
collection, artwork_content, file_ext
)
cleaned_data["path_cover_l"] = path_cover_l
cleaned_data["path_cover_s"] = path_cover_s
artwork_content = BytesIO(await artwork.read())
with Image.open(artwork_content) as img:
img.save(path_cover_l)
fs_resource_handler.resize_cover_to_small(img, save_path=path_cover_s)
cleaned_data.update({"url_cover": ""})
cleaned_data.update(
{
"url_cover": "",
"path_cover_s": path_cover_s,
"path_cover_l": path_cover_l,
}
)
else:
if data.get(
"url_cover", ""
) != collection.url_cover or not fs_resource_handler.cover_exists(
collection, CoverSize.BIG
):
cleaned_data.update(
{"url_cover": data.get("url_cover", collection.url_cover)}
)
path_cover_s, path_cover_l = await fs_resource_handler.get_cover(
entity=collection,
overwrite=True,
url_cover=data.get("url_cover", ""), # type: ignore
)
cleaned_data.update(
{"path_cover_s": path_cover_s, "path_cover_l": path_cover_l}
{
"url_cover": data.get("url_cover", collection.url_cover),
"path_cover_s": path_cover_s,
"path_cover_l": path_cover_l,
}
)
updated_collection = db_collection_handler.update_collection(

View File

@@ -55,7 +55,6 @@ from logger.formatter import BLUE
from logger.formatter import highlight as hl
from logger.logger import log
from models.rom import RomFile
from PIL import Image
from pydantic import BaseModel
from starlette.requests import ClientDisconnect
from starlette.responses import FileResponse
@@ -697,47 +696,52 @@ async def update_rom(
else:
if artwork is not None and artwork.filename is not None:
file_ext = artwork.filename.split(".")[-1]
artwork_content = BytesIO(await artwork.read())
(
path_cover_l,
path_cover_s,
) = await fs_resource_handler.build_artwork_path(rom, file_ext)
) = await fs_resource_handler.store_artwork(rom, artwork_content, file_ext)
cleaned_data.update(
{"path_cover_s": path_cover_s, "path_cover_l": path_cover_l}
{
"url_cover": "",
"path_cover_s": path_cover_s,
"path_cover_l": path_cover_l,
}
)
artwork_content = BytesIO(await artwork.read())
with Image.open(artwork_content) as img:
img.save(path_cover_l)
fs_resource_handler.resize_cover_to_small(img, save_path=path_cover_s)
cleaned_data.update({"url_cover": ""})
else:
if data.get(
"url_cover", ""
) != rom.url_cover or not fs_resource_handler.cover_exists(
rom, CoverSize.BIG
):
cleaned_data.update({"url_cover": data.get("url_cover", rom.url_cover)})
path_cover_s, path_cover_l = await fs_resource_handler.get_cover(
entity=rom,
overwrite=True,
url_cover=str(data.get("url_cover") or ""),
)
cleaned_data.update(
{"path_cover_s": path_cover_s, "path_cover_l": path_cover_l}
{
"url_cover": data.get("url_cover", rom.url_cover),
"path_cover_s": path_cover_s,
"path_cover_l": path_cover_l,
}
)
if data.get(
"url_manual", ""
) != rom.url_manual or not fs_resource_handler.manual_exists(rom):
cleaned_data.update({"url_manual": data.get("url_manual", rom.url_manual)})
path_manual = await fs_resource_handler.get_manual(
rom=rom,
overwrite=True,
url_manual=str(data.get("url_manual") or ""),
)
cleaned_data.update({"path_manual": path_manual})
cleaned_data.update(
{
"url_manual": data.get("url_manual", rom.url_manual),
"path_manual": path_manual,
}
)
log.debug(
f"Updating {hl(cleaned_data.get('name', ''), color=BLUE)} [{hl(cleaned_data.get('fs_name', ''))}] with data {cleaned_data}"
@@ -836,7 +840,12 @@ async def add_rom_manuals(
rom=rom, overwrite=False, url_manual=None
)
db_rom_handler.update_rom(id, {"path_manual": path_manual})
db_rom_handler.update_rom(
id,
{
"path_manual": path_manual,
},
)
return Response()

View File

@@ -133,7 +133,6 @@ class FSHandler:
# Check for absolute paths
if path_path.is_absolute():
print("Path path:", path_path)
raise ValueError("Path must be relative, not absolute")
# Normalize path

View File

@@ -1,4 +1,6 @@
import os
from io import BytesIO
from pathlib import Path
import httpx
from config import RESOURCES_BASE_PATH
@@ -82,7 +84,7 @@ class FSResourcesHandler(FSHandler):
log.error(f"Unable to identify image {cover_file}: {str(exc)}")
return None
def _get_cover_path(self, entity: Rom | Collection, size: CoverSize) -> str:
def _get_cover_path(self, entity: Rom | Collection, size: CoverSize) -> str | None:
"""Returns rom cover filesystem path adapted to frontend folder structure
Args:
@@ -92,28 +94,33 @@ class FSResourcesHandler(FSHandler):
full_path = self.validate_path(f"{entity.fs_resources_path}/cover")
for matched_file in full_path.glob(f"{size.value}.*"):
return str(matched_file.relative_to(self.base_path))
return ""
return None
async def get_cover(
self, entity: Rom | Collection | None, overwrite: bool, url_cover: str | None
) -> tuple[str, str]:
) -> tuple[str | None, str | None]:
if not entity:
return "", ""
return None, None
small_cover_exists = self.cover_exists(entity, CoverSize.SMALL)
if url_cover and (overwrite or not small_cover_exists):
await self._store_cover(entity, url_cover, CoverSize.SMALL)
small_cover_exists = self.cover_exists(entity, CoverSize.SMALL)
path_cover_s = (
self._get_cover_path(entity, CoverSize.SMALL) if small_cover_exists else ""
self._get_cover_path(entity, CoverSize.SMALL)
if small_cover_exists
else None
)
big_cover_exists = self.cover_exists(entity, CoverSize.BIG)
if url_cover and (overwrite or not big_cover_exists):
await self._store_cover(entity, url_cover, CoverSize.BIG)
big_cover_exists = self.cover_exists(entity, CoverSize.BIG)
path_cover_l = (
self._get_cover_path(entity, CoverSize.BIG) if big_cover_exists else ""
self._get_cover_path(entity, CoverSize.BIG) if big_cover_exists else None
)
return path_cover_s, path_cover_l
@@ -126,9 +133,9 @@ class FSResourcesHandler(FSHandler):
return {"path_cover_s": "", "path_cover_l": ""}
async def build_artwork_path(
async def _build_artwork_path(
self, entity: Rom | Collection, file_ext: str
) -> tuple[str, str]:
) -> tuple[Path, Path]:
path_cover = f"{entity.fs_resources_path}/cover"
path_cover_l = self.validate_path(
f"{path_cover}/{CoverSize.BIG.value}.{file_ext}"
@@ -139,7 +146,27 @@ class FSResourcesHandler(FSHandler):
await self.make_directory(path_cover)
return str(path_cover_l), str(path_cover_s)
return path_cover_l, path_cover_s
async def store_artwork(
self, entity: Rom | Collection, artwork: BytesIO, file_ext: str
) -> tuple[str | None, str | None]:
"""Store artwork in filesystem and return paths."""
path_cover_l, path_cover_s = await self._build_artwork_path(entity, file_ext)
try:
with Image.open(artwork) as img:
img.save(path_cover_l)
self.resize_cover_to_small(img, save_path=str(path_cover_s))
except UnidentifiedImageError as exc:
log.error(
f"Unable to identify image for {entity.fs_resources_path}: {str(exc)}"
)
return None, None
return str(path_cover_s.relative_to(self.base_path)), str(
path_cover_l.relative_to(self.base_path)
)
async def _store_screenshot(self, rom: Rom, url_screenhot: str, idx: int):
"""Store roms resources in filesystem
@@ -216,7 +243,7 @@ class FSResourcesHandler(FSHandler):
log.error(f"Unable to fetch manual at {url_manual}: {str(exc)}")
return None
def _get_manual_path(self, rom: Rom) -> str:
def _get_manual_path(self, rom: Rom) -> str | None:
"""Returns rom manual filesystem path adapted to frontend folder structure
Args:
@@ -225,20 +252,21 @@ class FSResourcesHandler(FSHandler):
full_path = self.validate_path(f"{rom.fs_resources_path}/manual")
for matched_file in full_path.glob(f"{rom.id}.pdf"):
return str(matched_file.relative_to(self.base_path))
return ""
return None
async def get_manual(
self, rom: Rom | None, overwrite: bool, url_manual: str | None
) -> str:
) -> str | None:
if not rom:
return ""
return None
manual_exists = self.manual_exists(rom)
if url_manual and (overwrite or not manual_exists):
await self._store_manual(rom, url_manual)
manual_exists = self.manual_exists(rom)
path_manual = self._get_manual_path(rom) if manual_exists else ""
path_manual = self._get_manual_path(rom) if manual_exists else None
return path_manual
async def store_ra_badge(self, url: str, path: str) -> None:

View File

@@ -111,8 +111,8 @@ class TestFSResourcesHandler:
result_small = handler._get_cover_path(rom, CoverSize.SMALL)
result_big = handler._get_cover_path(rom, CoverSize.BIG)
assert result_small == ""
assert result_big == ""
assert result_small is None
assert result_big is None
def test_get_cover_path_with_existing_cover(self, handler: FSResourcesHandler):
"""Test _get_cover_path with existing cover files"""
@@ -142,14 +142,14 @@ class TestFSResourcesHandler:
async def test_get_cover_no_entity(self, handler: FSResourcesHandler):
"""Test get_cover with no entity"""
result = await handler.get_cover(None, False, "http://example.com/cover.png")
assert result == ("", "")
assert result == (None, None)
@pytest.mark.asyncio
async def test_get_cover_no_url(self, handler: FSResourcesHandler, rom: Rom):
"""Test get_cover with no URL"""
result = await handler.get_cover(rom, False, None)
# Should return empty strings since no covers exist and no URL provided
assert result == ("", "")
assert result == (None, None)
@pytest.mark.asyncio
async def test_get_cover_with_url_no_overwrite(
@@ -207,14 +207,17 @@ class TestFSResourcesHandler:
with patch.object(handler, "validate_path") as mock_validate:
mock_validate.side_effect = lambda x: Path(x)
result = await handler.build_artwork_path(rom, file_ext)
path_cover_l, path_cover_s = await handler._build_artwork_path(
rom, file_ext
)
expected_cover_path = f"{rom.fs_resources_path}/cover"
expected_big_path = f"{expected_cover_path}/big.{file_ext}"
expected_small_path = f"{expected_cover_path}/small.{file_ext}"
mock_make_dir.assert_called_once_with(expected_cover_path)
assert result == (expected_big_path, expected_small_path)
assert str(path_cover_l) == expected_big_path
assert str(path_cover_s) == expected_small_path
async def test_build_artwork_path_different_extensions(
self, handler: FSResourcesHandler, rom
@@ -227,11 +230,13 @@ class TestFSResourcesHandler:
with patch.object(handler, "validate_path") as mock_validate:
mock_validate.side_effect = lambda x: Path(x)
result = await handler.build_artwork_path(rom, ext)
path_cover_l, path_cover_s = await handler._build_artwork_path(
rom, ext
)
# Check that the extension is properly included
assert result[0].endswith(f"big.{ext}")
assert result[1].endswith(f"small.{ext}")
assert str(path_cover_l).endswith(f"big.{ext}")
assert str(path_cover_s).endswith(f"small.{ext}")
def test_get_screenshot_path(self, handler: FSResourcesHandler, rom: Rom):
"""Test _get_screenshot_path method"""
@@ -302,19 +307,19 @@ class TestFSResourcesHandler:
def test_get_manual_path_no_manual(self, handler: FSResourcesHandler, rom: Rom):
"""Test _get_manual_path when no manual exists"""
result = handler._get_manual_path(rom)
assert result == ""
assert result is None
@pytest.mark.asyncio
async def test_get_manual_no_rom(self, handler: FSResourcesHandler):
"""Test get_manual with no ROM"""
result = await handler.get_manual(None, False, "http://example.com/manual.pdf")
assert result == ""
assert result is None
@pytest.mark.asyncio
async def test_get_manual_no_url(self, handler: FSResourcesHandler, rom: Rom):
"""Test get_manual with no URL"""
result = await handler.get_manual(rom, False, None)
assert result == ""
assert result is None
@pytest.mark.asyncio
async def test_get_manual_with_url_no_overwrite(

View File

@@ -389,7 +389,6 @@ class IGDBHandler(MetadataHandler):
# self.platform_endpoint,
# f'fields {",".join(self.platforms_fields)}; limit 500;',
# )
# print(platforms)
@check_twitch_token
async def get_platform(self, slug: str) -> IGDBPlatform: