From aaf3f5364c98fd53f65c21778cb370dece462955 Mon Sep 17 00:00:00 2001 From: Georges-Antoine Assi Date: Tue, 21 Oct 2025 14:37:09 -0400 Subject: [PATCH] changes from bot review --- backend/endpoints/sockets/scan.py | 81 ++++++++++++++++++------------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/backend/endpoints/sockets/scan.py b/backend/endpoints/sockets/scan.py index 9ae018550..5f772a211 100644 --- a/backend/endpoints/sockets/scan.py +++ b/backend/endpoints/sockets/scan.py @@ -61,13 +61,28 @@ class ScanStats: scanned_firmware: int = 0 new_firmware: int = 0 - async def update(self, socket_manager: socketio.AsyncRedisManager, **kwargs): - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) + def __post_init__(self): + # Lock for thread-safe updates + self._lock = asyncio.Lock() - update_job_meta({"scan_stats": self.to_dict()}) - await socket_manager.emit("scan:update_stats", self.to_dict()) + async def update(self, socket_manager: socketio.AsyncRedisManager, **kwargs): + async with self._lock: + for key, value in kwargs.items(): + if hasattr(self, key): + setattr(self, key, value) + + update_job_meta({"scan_stats": self.to_dict()}) + await socket_manager.emit("scan:update_stats", self.to_dict()) + + async def increment(self, socket_manager: socketio.AsyncRedisManager, **kwargs): + async with self._lock: + for key, value in kwargs.items(): + if hasattr(self, key): + current_value = getattr(self, key) + setattr(self, key, current_value + value) + + update_job_meta({"scan_stats": self.to_dict()}) + await socket_manager.emit("scan:update_stats", self.to_dict()) def to_dict(self) -> dict[str, Any]: return { @@ -94,10 +109,10 @@ async def _identify_firmware( fs_fw: str, scan_stats: ScanStats, socket_manager: socketio.AsyncRedisManager, -) -> ScanStats: +) -> None: # Break early if the flag is set if redis_client.get(STOP_SCAN_FLAG): - return scan_stats + return firmware = db_firmware_handler.get_firmware_by_filename(platform.id, fs_fw) @@ -116,18 +131,16 @@ async def _identify_firmware( crc_hash=scanned_firmware.crc_hash, ) - await scan_stats.update( + await scan_stats.increment( socket_manager=socket_manager, - scanned_firmware=scan_stats.scanned_firmware + 1, - new_firmware=scan_stats.new_firmware + (1 if not firmware else 0), + scanned_firmware=1, + new_firmware=1 if not firmware else 0, ) scanned_firmware.missing_from_fs = False scanned_firmware.is_verified = is_verified db_firmware_handler.add_firmware(scanned_firmware) - return scan_stats - def _should_scan_rom(scan_type: ScanType, rom: Rom | None, roms_ids: list[int]) -> bool: """Decide if a rom should be scanned or not @@ -169,10 +182,10 @@ async def _identify_rom( metadata_sources: list[str], socket_manager: socketio.AsyncRedisManager, scan_stats: ScanStats, -) -> ScanStats: +) -> None: # Break early if the flag is set if redis_client.get(STOP_SCAN_FLAG): - return scan_stats + return if not _should_scan_rom(scan_type=scan_type, rom=rom, roms_ids=roms_ids): if rom: @@ -184,7 +197,7 @@ async def _identify_rom( if rom.missing_from_fs: db_rom_handler.update_rom(rom.id, {"missing_from_fs": False}) - return scan_stats + return # Update properties that don't require metadata fs_regions, fs_revisions, fs_languages, fs_other_tags = fs_rom_handler.parse_tags( @@ -220,7 +233,7 @@ async def _identify_rom( # Silly checks to make the type checker happy if not rom: - return scan_stats + return # Build rom files object before scanning log.debug(f"Calculating file hashes for {rom.fs_name}...") @@ -248,12 +261,11 @@ async def _identify_rom( socket_manager=socket_manager, ) - await scan_stats.update( + await scan_stats.increment( socket_manager=socket_manager, - scanned_roms=scan_stats.scanned_roms + 1, - new_roms=scan_stats.new_roms + (1 if not rom else 0), - identified_roms=scan_stats.identified_roms - + (1 if scanned_rom.is_identified else 0), + scanned_roms=1, + new_roms=1 if newly_added else 0, + identified_roms=1 if scanned_rom.is_identified else 0, ) _added_rom = db_rom_handler.add_rom(scanned_rom) @@ -345,8 +357,6 @@ async def _identify_rom( ), ) - return scan_stats - async def _identify_platform( platform_slug: str, @@ -369,12 +379,11 @@ async def _identify_platform( if platform: scanned_platform.id = platform.id - await scan_stats.update( + await scan_stats.increment( socket_manager=socket_manager, - scanned_platforms=scan_stats.scanned_platforms + 1, - new_platforms=scan_stats.new_platforms + (1 if not platform else 0), - identified_platforms=scan_stats.identified_platforms - + (1 if scanned_platform.is_identified else 0), + scanned_platforms=1, + new_platforms=1 if not platform else 0, + identified_platforms=1 if scanned_platform.is_identified else 0, ) platform = db_platform_handler.add_platform(scanned_platform) @@ -400,7 +409,7 @@ async def _identify_platform( log.info(f"{hl(str(len(fs_firmware)))} firmware files found") for fs_fw in fs_firmware: - scan_stats = await _identify_firmware( + await _identify_firmware( socket_manager=socket_manager, platform=platform, fs_fw=fs_fw, @@ -424,11 +433,10 @@ async def _identify_platform( # Create semaphore to limit concurrent ROM scanning scan_semaphore = asyncio.Semaphore(SCAN_WORKERS) - async def scan_rom_with_semaphore(fs_rom: FSRom, rom: Rom | None) -> ScanStats: - """Scan a single ROM with semaphore limiting and return stats delta""" + async def scan_rom_with_semaphore(fs_rom: FSRom, rom: Rom | None) -> None: + """Scan a single ROM with semaphore limiting""" async with scan_semaphore: - # Create a fresh stats object for this ROM to avoid race conditions - return await _identify_rom( + await _identify_rom( platform=platform, fs_rom=fs_rom, rom=rom, @@ -454,7 +462,10 @@ async def _identify_platform( ] # Wait for all ROMs in the batch to complete - await asyncio.gather(*scan_tasks, return_exceptions=True) + batched_results = await asyncio.gather(*scan_tasks, return_exceptions=True) + for result, fs_rom in zip(batched_results, fs_roms_batch, strict=False): + if isinstance(result, Exception): + log.error(f"Error scanning ROM {fs_rom['fs_name']}: {result}") missing_roms = db_rom_handler.mark_missing_roms( platform.id, [rom["fs_name"] for rom in fs_roms]