diff --git a/backend/endpoints/collections.py b/backend/endpoints/collections.py index da6407809..b969f3fef 100644 --- a/backend/endpoints/collections.py +++ b/backend/endpoints/collections.py @@ -18,7 +18,6 @@ from logger.formatter import BLUE from logger.formatter import highlight as hl from logger.logger import log from models.collection import Collection -from PIL import Image from sqlalchemy.inspection import inspect from utils.router import APIRouter @@ -61,15 +60,13 @@ async def add_collection( if artwork is not None and artwork.filename is not None: file_ext = artwork.filename.split(".")[-1] + artwork_content = BytesIO(await artwork.read()) ( path_cover_l, path_cover_s, - ) = await fs_resource_handler.build_artwork_path(_added_collection, file_ext) - - artwork_content = BytesIO(await artwork.read()) - with Image.open(artwork_content) as img: - img.save(path_cover_l) - fs_resource_handler.resize_cover_to_small(img, save_path=path_cover_s) + ) = await fs_resource_handler.store_artwork( + _added_collection, artwork_content, file_ext + ) else: path_cover_s, path_cover_l = await fs_resource_handler.get_cover( entity=_added_collection, @@ -214,36 +211,38 @@ async def update_collection( else: if artwork is not None and artwork.filename is not None: file_ext = artwork.filename.split(".")[-1] + artwork_content = BytesIO(await artwork.read()) ( path_cover_l, path_cover_s, - ) = await fs_resource_handler.build_artwork_path(collection, file_ext) + ) = await fs_resource_handler.store_artwork( + collection, artwork_content, file_ext + ) - cleaned_data["path_cover_l"] = path_cover_l - cleaned_data["path_cover_s"] = path_cover_s - - artwork_content = BytesIO(await artwork.read()) - with Image.open(artwork_content) as img: - img.save(path_cover_l) - fs_resource_handler.resize_cover_to_small(img, save_path=path_cover_s) - - cleaned_data.update({"url_cover": ""}) + cleaned_data.update( + { + "url_cover": "", + "path_cover_s": path_cover_s, + "path_cover_l": path_cover_l, + } + ) else: if data.get( "url_cover", "" ) != collection.url_cover or not fs_resource_handler.cover_exists( collection, CoverSize.BIG ): - cleaned_data.update( - {"url_cover": data.get("url_cover", collection.url_cover)} - ) path_cover_s, path_cover_l = await fs_resource_handler.get_cover( entity=collection, overwrite=True, url_cover=data.get("url_cover", ""), # type: ignore ) cleaned_data.update( - {"path_cover_s": path_cover_s, "path_cover_l": path_cover_l} + { + "url_cover": data.get("url_cover", collection.url_cover), + "path_cover_s": path_cover_s, + "path_cover_l": path_cover_l, + } ) updated_collection = db_collection_handler.update_collection( diff --git a/backend/endpoints/rom.py b/backend/endpoints/rom.py index ba42f0626..4f6143d50 100644 --- a/backend/endpoints/rom.py +++ b/backend/endpoints/rom.py @@ -55,7 +55,6 @@ from logger.formatter import BLUE from logger.formatter import highlight as hl from logger.logger import log from models.rom import RomFile -from PIL import Image from pydantic import BaseModel from starlette.requests import ClientDisconnect from starlette.responses import FileResponse @@ -697,47 +696,52 @@ async def update_rom( else: if artwork is not None and artwork.filename is not None: file_ext = artwork.filename.split(".")[-1] + artwork_content = BytesIO(await artwork.read()) ( path_cover_l, path_cover_s, - ) = await fs_resource_handler.build_artwork_path(rom, file_ext) + ) = await fs_resource_handler.store_artwork(rom, artwork_content, file_ext) cleaned_data.update( - {"path_cover_s": path_cover_s, "path_cover_l": path_cover_l} + { + "url_cover": "", + "path_cover_s": path_cover_s, + "path_cover_l": path_cover_l, + } ) - - artwork_content = BytesIO(await artwork.read()) - with Image.open(artwork_content) as img: - img.save(path_cover_l) - fs_resource_handler.resize_cover_to_small(img, save_path=path_cover_s) - - cleaned_data.update({"url_cover": ""}) else: if data.get( "url_cover", "" ) != rom.url_cover or not fs_resource_handler.cover_exists( rom, CoverSize.BIG ): - cleaned_data.update({"url_cover": data.get("url_cover", rom.url_cover)}) path_cover_s, path_cover_l = await fs_resource_handler.get_cover( entity=rom, overwrite=True, url_cover=str(data.get("url_cover") or ""), ) cleaned_data.update( - {"path_cover_s": path_cover_s, "path_cover_l": path_cover_l} + { + "url_cover": data.get("url_cover", rom.url_cover), + "path_cover_s": path_cover_s, + "path_cover_l": path_cover_l, + } ) if data.get( "url_manual", "" ) != rom.url_manual or not fs_resource_handler.manual_exists(rom): - cleaned_data.update({"url_manual": data.get("url_manual", rom.url_manual)}) path_manual = await fs_resource_handler.get_manual( rom=rom, overwrite=True, url_manual=str(data.get("url_manual") or ""), ) - cleaned_data.update({"path_manual": path_manual}) + cleaned_data.update( + { + "url_manual": data.get("url_manual", rom.url_manual), + "path_manual": path_manual, + } + ) log.debug( f"Updating {hl(cleaned_data.get('name', ''), color=BLUE)} [{hl(cleaned_data.get('fs_name', ''))}] with data {cleaned_data}" @@ -836,7 +840,12 @@ async def add_rom_manuals( rom=rom, overwrite=False, url_manual=None ) - db_rom_handler.update_rom(id, {"path_manual": path_manual}) + db_rom_handler.update_rom( + id, + { + "path_manual": path_manual, + }, + ) return Response() diff --git a/backend/handler/filesystem/base_handler.py b/backend/handler/filesystem/base_handler.py index 9baeab9d7..c0e9dfa99 100644 --- a/backend/handler/filesystem/base_handler.py +++ b/backend/handler/filesystem/base_handler.py @@ -133,7 +133,6 @@ class FSHandler: # Check for absolute paths if path_path.is_absolute(): - print("Path path:", path_path) raise ValueError("Path must be relative, not absolute") # Normalize path diff --git a/backend/handler/filesystem/resources_handler.py b/backend/handler/filesystem/resources_handler.py index 89d68f416..a71dd1981 100644 --- a/backend/handler/filesystem/resources_handler.py +++ b/backend/handler/filesystem/resources_handler.py @@ -1,4 +1,6 @@ import os +from io import BytesIO +from pathlib import Path import httpx from config import RESOURCES_BASE_PATH @@ -82,7 +84,7 @@ class FSResourcesHandler(FSHandler): log.error(f"Unable to identify image {cover_file}: {str(exc)}") return None - def _get_cover_path(self, entity: Rom | Collection, size: CoverSize) -> str: + def _get_cover_path(self, entity: Rom | Collection, size: CoverSize) -> str | None: """Returns rom cover filesystem path adapted to frontend folder structure Args: @@ -92,28 +94,33 @@ class FSResourcesHandler(FSHandler): full_path = self.validate_path(f"{entity.fs_resources_path}/cover") for matched_file in full_path.glob(f"{size.value}.*"): return str(matched_file.relative_to(self.base_path)) - return "" + + return None async def get_cover( self, entity: Rom | Collection | None, overwrite: bool, url_cover: str | None - ) -> tuple[str, str]: + ) -> tuple[str | None, str | None]: if not entity: - return "", "" + return None, None small_cover_exists = self.cover_exists(entity, CoverSize.SMALL) if url_cover and (overwrite or not small_cover_exists): await self._store_cover(entity, url_cover, CoverSize.SMALL) small_cover_exists = self.cover_exists(entity, CoverSize.SMALL) + path_cover_s = ( - self._get_cover_path(entity, CoverSize.SMALL) if small_cover_exists else "" + self._get_cover_path(entity, CoverSize.SMALL) + if small_cover_exists + else None ) big_cover_exists = self.cover_exists(entity, CoverSize.BIG) if url_cover and (overwrite or not big_cover_exists): await self._store_cover(entity, url_cover, CoverSize.BIG) big_cover_exists = self.cover_exists(entity, CoverSize.BIG) + path_cover_l = ( - self._get_cover_path(entity, CoverSize.BIG) if big_cover_exists else "" + self._get_cover_path(entity, CoverSize.BIG) if big_cover_exists else None ) return path_cover_s, path_cover_l @@ -126,9 +133,9 @@ class FSResourcesHandler(FSHandler): return {"path_cover_s": "", "path_cover_l": ""} - async def build_artwork_path( + async def _build_artwork_path( self, entity: Rom | Collection, file_ext: str - ) -> tuple[str, str]: + ) -> tuple[Path, Path]: path_cover = f"{entity.fs_resources_path}/cover" path_cover_l = self.validate_path( f"{path_cover}/{CoverSize.BIG.value}.{file_ext}" @@ -139,7 +146,27 @@ class FSResourcesHandler(FSHandler): await self.make_directory(path_cover) - return str(path_cover_l), str(path_cover_s) + return path_cover_l, path_cover_s + + async def store_artwork( + self, entity: Rom | Collection, artwork: BytesIO, file_ext: str + ) -> tuple[str | None, str | None]: + """Store artwork in filesystem and return paths.""" + path_cover_l, path_cover_s = await self._build_artwork_path(entity, file_ext) + + try: + with Image.open(artwork) as img: + img.save(path_cover_l) + self.resize_cover_to_small(img, save_path=str(path_cover_s)) + except UnidentifiedImageError as exc: + log.error( + f"Unable to identify image for {entity.fs_resources_path}: {str(exc)}" + ) + return None, None + + return str(path_cover_s.relative_to(self.base_path)), str( + path_cover_l.relative_to(self.base_path) + ) async def _store_screenshot(self, rom: Rom, url_screenhot: str, idx: int): """Store roms resources in filesystem @@ -216,7 +243,7 @@ class FSResourcesHandler(FSHandler): log.error(f"Unable to fetch manual at {url_manual}: {str(exc)}") return None - def _get_manual_path(self, rom: Rom) -> str: + def _get_manual_path(self, rom: Rom) -> str | None: """Returns rom manual filesystem path adapted to frontend folder structure Args: @@ -225,20 +252,21 @@ class FSResourcesHandler(FSHandler): full_path = self.validate_path(f"{rom.fs_resources_path}/manual") for matched_file in full_path.glob(f"{rom.id}.pdf"): return str(matched_file.relative_to(self.base_path)) - return "" + + return None async def get_manual( self, rom: Rom | None, overwrite: bool, url_manual: str | None - ) -> str: + ) -> str | None: if not rom: - return "" + return None manual_exists = self.manual_exists(rom) if url_manual and (overwrite or not manual_exists): await self._store_manual(rom, url_manual) manual_exists = self.manual_exists(rom) - path_manual = self._get_manual_path(rom) if manual_exists else "" + path_manual = self._get_manual_path(rom) if manual_exists else None return path_manual async def store_ra_badge(self, url: str, path: str) -> None: diff --git a/backend/handler/filesystem/tests/test_resources_handler.py b/backend/handler/filesystem/tests/test_resources_handler.py index e31739d4a..b35744176 100644 --- a/backend/handler/filesystem/tests/test_resources_handler.py +++ b/backend/handler/filesystem/tests/test_resources_handler.py @@ -111,8 +111,8 @@ class TestFSResourcesHandler: result_small = handler._get_cover_path(rom, CoverSize.SMALL) result_big = handler._get_cover_path(rom, CoverSize.BIG) - assert result_small == "" - assert result_big == "" + assert result_small is None + assert result_big is None def test_get_cover_path_with_existing_cover(self, handler: FSResourcesHandler): """Test _get_cover_path with existing cover files""" @@ -142,14 +142,14 @@ class TestFSResourcesHandler: async def test_get_cover_no_entity(self, handler: FSResourcesHandler): """Test get_cover with no entity""" result = await handler.get_cover(None, False, "http://example.com/cover.png") - assert result == ("", "") + assert result == (None, None) @pytest.mark.asyncio async def test_get_cover_no_url(self, handler: FSResourcesHandler, rom: Rom): """Test get_cover with no URL""" result = await handler.get_cover(rom, False, None) # Should return empty strings since no covers exist and no URL provided - assert result == ("", "") + assert result == (None, None) @pytest.mark.asyncio async def test_get_cover_with_url_no_overwrite( @@ -207,14 +207,17 @@ class TestFSResourcesHandler: with patch.object(handler, "validate_path") as mock_validate: mock_validate.side_effect = lambda x: Path(x) - result = await handler.build_artwork_path(rom, file_ext) + path_cover_l, path_cover_s = await handler._build_artwork_path( + rom, file_ext + ) expected_cover_path = f"{rom.fs_resources_path}/cover" expected_big_path = f"{expected_cover_path}/big.{file_ext}" expected_small_path = f"{expected_cover_path}/small.{file_ext}" mock_make_dir.assert_called_once_with(expected_cover_path) - assert result == (expected_big_path, expected_small_path) + assert str(path_cover_l) == expected_big_path + assert str(path_cover_s) == expected_small_path async def test_build_artwork_path_different_extensions( self, handler: FSResourcesHandler, rom @@ -227,11 +230,13 @@ class TestFSResourcesHandler: with patch.object(handler, "validate_path") as mock_validate: mock_validate.side_effect = lambda x: Path(x) - result = await handler.build_artwork_path(rom, ext) + path_cover_l, path_cover_s = await handler._build_artwork_path( + rom, ext + ) # Check that the extension is properly included - assert result[0].endswith(f"big.{ext}") - assert result[1].endswith(f"small.{ext}") + assert str(path_cover_l).endswith(f"big.{ext}") + assert str(path_cover_s).endswith(f"small.{ext}") def test_get_screenshot_path(self, handler: FSResourcesHandler, rom: Rom): """Test _get_screenshot_path method""" @@ -302,19 +307,19 @@ class TestFSResourcesHandler: def test_get_manual_path_no_manual(self, handler: FSResourcesHandler, rom: Rom): """Test _get_manual_path when no manual exists""" result = handler._get_manual_path(rom) - assert result == "" + assert result is None @pytest.mark.asyncio async def test_get_manual_no_rom(self, handler: FSResourcesHandler): """Test get_manual with no ROM""" result = await handler.get_manual(None, False, "http://example.com/manual.pdf") - assert result == "" + assert result is None @pytest.mark.asyncio async def test_get_manual_no_url(self, handler: FSResourcesHandler, rom: Rom): """Test get_manual with no URL""" result = await handler.get_manual(rom, False, None) - assert result == "" + assert result is None @pytest.mark.asyncio async def test_get_manual_with_url_no_overwrite( diff --git a/backend/handler/metadata/igdb_handler.py b/backend/handler/metadata/igdb_handler.py index a407ee761..e744e8ccb 100644 --- a/backend/handler/metadata/igdb_handler.py +++ b/backend/handler/metadata/igdb_handler.py @@ -389,7 +389,6 @@ class IGDBHandler(MetadataHandler): # self.platform_endpoint, # f'fields {",".join(self.platforms_fields)}; limit 500;', # ) - # print(platforms) @check_twitch_token async def get_platform(self, slug: str) -> IGDBPlatform: