diff --git a/backend/watcher.py b/backend/watcher.py index 528509f97..ca211fb15 100644 --- a/backend/watcher.py +++ b/backend/watcher.py @@ -26,6 +26,7 @@ from handler.scan_handler import MetadataSource, ScanType from logger.formatter import CYAN from logger.formatter import highlight as hl from logger.logger import log +from opentelemetry import trace from rq.job import Job from tasks.tasks import tasks_scheduler from utils import get_version @@ -34,6 +35,7 @@ sentry_sdk.init( dsn=SENTRY_DSN, release=f"romm@{get_version()}", ) +tracer = trace.get_tracer(__name__) structure_level = 2 if os.path.exists(cm.get_config().HIGH_PRIO_STRUCTURE_PATH) else 1 @@ -66,93 +68,94 @@ def process_changes(changes: Sequence[Change]) -> None: if not changes: return - # Find affected platform slugs. - fs_slugs: set[str] = set() - changes_platform_directory = False - for change in changes: - event_type, change_path = change - src_path = os.fsdecode(change_path) - event_src = src_path.split(LIBRARY_BASE_PATH)[-1] - event_src_parts = event_src.split("/") - if len(event_src_parts) <= structure_level: - log.warning( - f"Filesystem event path '{event_src}' does not have enough segments for structure_level {structure_level}. Skipping event." + with tracer.start_as_current_span("process_changes"): + # Find affected platform slugs. + fs_slugs: set[str] = set() + changes_platform_directory = False + for change in changes: + event_type, change_path = change + src_path = os.fsdecode(change_path) + event_src = src_path.split(LIBRARY_BASE_PATH)[-1] + event_src_parts = event_src.split("/") + if len(event_src_parts) <= structure_level: + log.warning( + f"Filesystem event path '{event_src}' does not have enough segments for structure_level {structure_level}. Skipping event." + ) + continue + + if len(event_src_parts) == structure_level + 1: + changes_platform_directory = True + + log.info(f"Filesystem event: {event_type} {event_src}") + fs_slugs.add(event_src_parts[structure_level]) + + if not fs_slugs: + log.info("No valid filesystem slugs found in changes, exiting...") + return + + # Check whether any metadata source is enabled. + source_mapping: dict[str, bool] = { + MetadataSource.IGDB: IGDB_API_ENABLED, + MetadataSource.SS: SS_API_ENABLED, + MetadataSource.MOBY: MOBY_API_ENABLED, + MetadataSource.RA: RA_API_ENABLED, + MetadataSource.LB: LAUNCHBOX_API_ENABLED, + MetadataSource.HASHEOUS: HASHEOUS_API_ENABLED, + MetadataSource.SGDB: STEAMGRIDDB_API_ENABLED, + } + metadata_sources = [source for source, flag in source_mapping.items() if flag] + if not metadata_sources: + log.warning("No metadata sources enabled, skipping rescan") + return + + # Get currently scheduled jobs for the scan_platforms function. + already_scheduled_jobs = [ + job + for job in tasks_scheduler.get_jobs() + if isinstance(job, Job) + and job.func_name == "endpoints.sockets.scan.scan_platforms" + ] + + # If a full rescan is already scheduled, skip further processing. + if any(job.args[0] == [] for job in already_scheduled_jobs): + log.info("Full rescan already scheduled") + return + + time_delta = timedelta(minutes=RESCAN_ON_FILESYSTEM_CHANGE_DELAY) + rescan_in_msg = f"rescanning in {hl(str(RESCAN_ON_FILESYSTEM_CHANGE_DELAY), color=CYAN)} minutes." + + # Any change to a platform directory should trigger a full rescan. + if changes_platform_directory: + log.info(f"Platform directory changed, {rescan_in_msg}") + tasks_scheduler.enqueue_in( + time_delta, + scan_platforms, + [], + scan_type=ScanType.UNIDENTIFIED, + metadata_sources=metadata_sources, ) - continue + return - if len(event_src_parts) == structure_level + 1: - changes_platform_directory = True + # Otherwise, process each platform slug. + for fs_slug in fs_slugs: + # TODO: Query platforms from the database in bulk. + db_platform = db_platform_handler.get_platform_by_fs_slug(fs_slug) + if not db_platform: + continue - log.info(f"Filesystem event: {event_type} {event_src}") - fs_slugs.add(event_src_parts[structure_level]) + # Skip if a scan is already scheduled for this platform. + if any(db_platform.id in job.args[0] for job in already_scheduled_jobs): + log.info(f"Scan already scheduled for {hl(fs_slug)}") + continue - if not fs_slugs: - log.info("No valid filesystem slugs found in changes, exiting...") - return - - # Check whether any metadata source is enabled. - source_mapping: dict[str, bool] = { - MetadataSource.IGDB: IGDB_API_ENABLED, - MetadataSource.SS: SS_API_ENABLED, - MetadataSource.MOBY: MOBY_API_ENABLED, - MetadataSource.RA: RA_API_ENABLED, - MetadataSource.LB: LAUNCHBOX_API_ENABLED, - MetadataSource.HASHEOUS: HASHEOUS_API_ENABLED, - MetadataSource.SGDB: STEAMGRIDDB_API_ENABLED, - } - metadata_sources = [source for source, flag in source_mapping.items() if flag] - if not metadata_sources: - log.warning("No metadata sources enabled, skipping rescan") - return - - # Get currently scheduled jobs for the scan_platforms function. - already_scheduled_jobs = [ - job - for job in tasks_scheduler.get_jobs() - if isinstance(job, Job) - and job.func_name == "endpoints.sockets.scan.scan_platforms" - ] - - # If a full rescan is already scheduled, skip further processing. - if any(job.args[0] == [] for job in already_scheduled_jobs): - log.info("Full rescan already scheduled") - return - - time_delta = timedelta(minutes=RESCAN_ON_FILESYSTEM_CHANGE_DELAY) - rescan_in_msg = f"rescanning in {hl(str(RESCAN_ON_FILESYSTEM_CHANGE_DELAY), color=CYAN)} minutes." - - # Any change to a platform directory should trigger a full rescan. - if changes_platform_directory: - log.info(f"Platform directory changed, {rescan_in_msg}") - tasks_scheduler.enqueue_in( - time_delta, - scan_platforms, - [], - scan_type=ScanType.UNIDENTIFIED, - metadata_sources=metadata_sources, - ) - return - - # Otherwise, process each platform slug. - for fs_slug in fs_slugs: - # TODO: Query platforms from the database in bulk. - db_platform = db_platform_handler.get_platform_by_fs_slug(fs_slug) - if not db_platform: - continue - - # Skip if a scan is already scheduled for this platform. - if any(db_platform.id in job.args[0] for job in already_scheduled_jobs): - log.info(f"Scan already scheduled for {hl(fs_slug)}") - continue - - log.info(f"Change detected in {hl(fs_slug)} folder, {rescan_in_msg}") - tasks_scheduler.enqueue_in( - time_delta, - scan_platforms, - [db_platform.id], - scan_type=ScanType.QUICK, - metadata_sources=metadata_sources, - ) + log.info(f"Change detected in {hl(fs_slug)} folder, {rescan_in_msg}") + tasks_scheduler.enqueue_in( + time_delta, + scan_platforms, + [db_platform.id], + scan_type=ScanType.QUICK, + metadata_sources=metadata_sources, + ) if __name__ == "__main__": diff --git a/docker/init_scripts/init b/docker/init_scripts/init index 4dc9b2e7a..60b322a63 100755 --- a/docker/init_scripts/init +++ b/docker/init_scripts/init @@ -206,7 +206,7 @@ start_bin_watcher() { info_log "Starting watcher" watchfiles \ --target-type command \ - 'python3 watcher.py' \ + "opentelemetry-instrument --service_name '${OTEL_SERVICE_NAME_PREFIX-}watcher' python3 watcher.py" \ /romm/library & WATCHER_PID=$! echo "${WATCHER_PID}" >/tmp/watcher.pid