mirror of
https://github.com/rommapp/romm.git
synced 2026-02-18 00:27:41 +01:00
feat: Add OpenTelemetry integration to file watcher
Run file watcher using `opentelemetry-instrument` to enable tracing for the watcher service.
This commit is contained in:
@@ -26,6 +26,7 @@ from handler.scan_handler import MetadataSource, ScanType
|
||||
from logger.formatter import CYAN
|
||||
from logger.formatter import highlight as hl
|
||||
from logger.logger import log
|
||||
from opentelemetry import trace
|
||||
from rq.job import Job
|
||||
from tasks.tasks import tasks_scheduler
|
||||
from utils import get_version
|
||||
@@ -34,6 +35,7 @@ sentry_sdk.init(
|
||||
dsn=SENTRY_DSN,
|
||||
release=f"romm@{get_version()}",
|
||||
)
|
||||
tracer = trace.get_tracer(__name__)
|
||||
|
||||
structure_level = 2 if os.path.exists(cm.get_config().HIGH_PRIO_STRUCTURE_PATH) else 1
|
||||
|
||||
@@ -66,93 +68,94 @@ def process_changes(changes: Sequence[Change]) -> None:
|
||||
if not changes:
|
||||
return
|
||||
|
||||
# Find affected platform slugs.
|
||||
fs_slugs: set[str] = set()
|
||||
changes_platform_directory = False
|
||||
for change in changes:
|
||||
event_type, change_path = change
|
||||
src_path = os.fsdecode(change_path)
|
||||
event_src = src_path.split(LIBRARY_BASE_PATH)[-1]
|
||||
event_src_parts = event_src.split("/")
|
||||
if len(event_src_parts) <= structure_level:
|
||||
log.warning(
|
||||
f"Filesystem event path '{event_src}' does not have enough segments for structure_level {structure_level}. Skipping event."
|
||||
with tracer.start_as_current_span("process_changes"):
|
||||
# Find affected platform slugs.
|
||||
fs_slugs: set[str] = set()
|
||||
changes_platform_directory = False
|
||||
for change in changes:
|
||||
event_type, change_path = change
|
||||
src_path = os.fsdecode(change_path)
|
||||
event_src = src_path.split(LIBRARY_BASE_PATH)[-1]
|
||||
event_src_parts = event_src.split("/")
|
||||
if len(event_src_parts) <= structure_level:
|
||||
log.warning(
|
||||
f"Filesystem event path '{event_src}' does not have enough segments for structure_level {structure_level}. Skipping event."
|
||||
)
|
||||
continue
|
||||
|
||||
if len(event_src_parts) == structure_level + 1:
|
||||
changes_platform_directory = True
|
||||
|
||||
log.info(f"Filesystem event: {event_type} {event_src}")
|
||||
fs_slugs.add(event_src_parts[structure_level])
|
||||
|
||||
if not fs_slugs:
|
||||
log.info("No valid filesystem slugs found in changes, exiting...")
|
||||
return
|
||||
|
||||
# Check whether any metadata source is enabled.
|
||||
source_mapping: dict[str, bool] = {
|
||||
MetadataSource.IGDB: IGDB_API_ENABLED,
|
||||
MetadataSource.SS: SS_API_ENABLED,
|
||||
MetadataSource.MOBY: MOBY_API_ENABLED,
|
||||
MetadataSource.RA: RA_API_ENABLED,
|
||||
MetadataSource.LB: LAUNCHBOX_API_ENABLED,
|
||||
MetadataSource.HASHEOUS: HASHEOUS_API_ENABLED,
|
||||
MetadataSource.SGDB: STEAMGRIDDB_API_ENABLED,
|
||||
}
|
||||
metadata_sources = [source for source, flag in source_mapping.items() if flag]
|
||||
if not metadata_sources:
|
||||
log.warning("No metadata sources enabled, skipping rescan")
|
||||
return
|
||||
|
||||
# Get currently scheduled jobs for the scan_platforms function.
|
||||
already_scheduled_jobs = [
|
||||
job
|
||||
for job in tasks_scheduler.get_jobs()
|
||||
if isinstance(job, Job)
|
||||
and job.func_name == "endpoints.sockets.scan.scan_platforms"
|
||||
]
|
||||
|
||||
# If a full rescan is already scheduled, skip further processing.
|
||||
if any(job.args[0] == [] for job in already_scheduled_jobs):
|
||||
log.info("Full rescan already scheduled")
|
||||
return
|
||||
|
||||
time_delta = timedelta(minutes=RESCAN_ON_FILESYSTEM_CHANGE_DELAY)
|
||||
rescan_in_msg = f"rescanning in {hl(str(RESCAN_ON_FILESYSTEM_CHANGE_DELAY), color=CYAN)} minutes."
|
||||
|
||||
# Any change to a platform directory should trigger a full rescan.
|
||||
if changes_platform_directory:
|
||||
log.info(f"Platform directory changed, {rescan_in_msg}")
|
||||
tasks_scheduler.enqueue_in(
|
||||
time_delta,
|
||||
scan_platforms,
|
||||
[],
|
||||
scan_type=ScanType.UNIDENTIFIED,
|
||||
metadata_sources=metadata_sources,
|
||||
)
|
||||
continue
|
||||
return
|
||||
|
||||
if len(event_src_parts) == structure_level + 1:
|
||||
changes_platform_directory = True
|
||||
# Otherwise, process each platform slug.
|
||||
for fs_slug in fs_slugs:
|
||||
# TODO: Query platforms from the database in bulk.
|
||||
db_platform = db_platform_handler.get_platform_by_fs_slug(fs_slug)
|
||||
if not db_platform:
|
||||
continue
|
||||
|
||||
log.info(f"Filesystem event: {event_type} {event_src}")
|
||||
fs_slugs.add(event_src_parts[structure_level])
|
||||
# Skip if a scan is already scheduled for this platform.
|
||||
if any(db_platform.id in job.args[0] for job in already_scheduled_jobs):
|
||||
log.info(f"Scan already scheduled for {hl(fs_slug)}")
|
||||
continue
|
||||
|
||||
if not fs_slugs:
|
||||
log.info("No valid filesystem slugs found in changes, exiting...")
|
||||
return
|
||||
|
||||
# Check whether any metadata source is enabled.
|
||||
source_mapping: dict[str, bool] = {
|
||||
MetadataSource.IGDB: IGDB_API_ENABLED,
|
||||
MetadataSource.SS: SS_API_ENABLED,
|
||||
MetadataSource.MOBY: MOBY_API_ENABLED,
|
||||
MetadataSource.RA: RA_API_ENABLED,
|
||||
MetadataSource.LB: LAUNCHBOX_API_ENABLED,
|
||||
MetadataSource.HASHEOUS: HASHEOUS_API_ENABLED,
|
||||
MetadataSource.SGDB: STEAMGRIDDB_API_ENABLED,
|
||||
}
|
||||
metadata_sources = [source for source, flag in source_mapping.items() if flag]
|
||||
if not metadata_sources:
|
||||
log.warning("No metadata sources enabled, skipping rescan")
|
||||
return
|
||||
|
||||
# Get currently scheduled jobs for the scan_platforms function.
|
||||
already_scheduled_jobs = [
|
||||
job
|
||||
for job in tasks_scheduler.get_jobs()
|
||||
if isinstance(job, Job)
|
||||
and job.func_name == "endpoints.sockets.scan.scan_platforms"
|
||||
]
|
||||
|
||||
# If a full rescan is already scheduled, skip further processing.
|
||||
if any(job.args[0] == [] for job in already_scheduled_jobs):
|
||||
log.info("Full rescan already scheduled")
|
||||
return
|
||||
|
||||
time_delta = timedelta(minutes=RESCAN_ON_FILESYSTEM_CHANGE_DELAY)
|
||||
rescan_in_msg = f"rescanning in {hl(str(RESCAN_ON_FILESYSTEM_CHANGE_DELAY), color=CYAN)} minutes."
|
||||
|
||||
# Any change to a platform directory should trigger a full rescan.
|
||||
if changes_platform_directory:
|
||||
log.info(f"Platform directory changed, {rescan_in_msg}")
|
||||
tasks_scheduler.enqueue_in(
|
||||
time_delta,
|
||||
scan_platforms,
|
||||
[],
|
||||
scan_type=ScanType.UNIDENTIFIED,
|
||||
metadata_sources=metadata_sources,
|
||||
)
|
||||
return
|
||||
|
||||
# Otherwise, process each platform slug.
|
||||
for fs_slug in fs_slugs:
|
||||
# TODO: Query platforms from the database in bulk.
|
||||
db_platform = db_platform_handler.get_platform_by_fs_slug(fs_slug)
|
||||
if not db_platform:
|
||||
continue
|
||||
|
||||
# Skip if a scan is already scheduled for this platform.
|
||||
if any(db_platform.id in job.args[0] for job in already_scheduled_jobs):
|
||||
log.info(f"Scan already scheduled for {hl(fs_slug)}")
|
||||
continue
|
||||
|
||||
log.info(f"Change detected in {hl(fs_slug)} folder, {rescan_in_msg}")
|
||||
tasks_scheduler.enqueue_in(
|
||||
time_delta,
|
||||
scan_platforms,
|
||||
[db_platform.id],
|
||||
scan_type=ScanType.QUICK,
|
||||
metadata_sources=metadata_sources,
|
||||
)
|
||||
log.info(f"Change detected in {hl(fs_slug)} folder, {rescan_in_msg}")
|
||||
tasks_scheduler.enqueue_in(
|
||||
time_delta,
|
||||
scan_platforms,
|
||||
[db_platform.id],
|
||||
scan_type=ScanType.QUICK,
|
||||
metadata_sources=metadata_sources,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -206,7 +206,7 @@ start_bin_watcher() {
|
||||
info_log "Starting watcher"
|
||||
watchfiles \
|
||||
--target-type command \
|
||||
'python3 watcher.py' \
|
||||
"opentelemetry-instrument --service_name '${OTEL_SERVICE_NAME_PREFIX-}watcher' python3 watcher.py" \
|
||||
/romm/library &
|
||||
WATCHER_PID=$!
|
||||
echo "${WATCHER_PID}" >/tmp/watcher.pid
|
||||
|
||||
Reference in New Issue
Block a user