mirror of
https://github.com/rommapp/romm.git
synced 2026-02-18 00:27:41 +01:00
misc: Add type hints to task classes
Small change including type hints for the Task base classes, and related fixes to related tests.
This commit is contained in:
@@ -13,6 +13,8 @@ tasks_scheduler = Scheduler(queue=low_prio_queue, connection=low_prio_queue.conn
|
||||
|
||||
|
||||
class Task(ABC):
|
||||
"""Base class for all RQ tasks."""
|
||||
|
||||
title: str
|
||||
description: str
|
||||
enabled: bool
|
||||
@@ -37,12 +39,14 @@ class Task(ABC):
|
||||
async def run(self, *args: Any, **kwargs: Any) -> Any: ...
|
||||
|
||||
|
||||
class PeriodicTask(Task):
|
||||
def __init__(self, *args, func: str, **kwargs):
|
||||
class PeriodicTask(Task, ABC):
|
||||
"""Base class for periodic tasks that can be scheduled."""
|
||||
|
||||
def __init__(self, *args: Any, func: str, **kwargs: Any):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.func = func
|
||||
|
||||
def _get_existing_job(self):
|
||||
def _get_existing_job(self) -> Job | None:
|
||||
existing_jobs = tasks_scheduler.get_jobs()
|
||||
for job in existing_jobs:
|
||||
if isinstance(job, Job) and job.func_name == self.func:
|
||||
@@ -50,15 +54,26 @@ class PeriodicTask(Task):
|
||||
|
||||
return None
|
||||
|
||||
def init(self):
|
||||
def init(self) -> Job | None:
|
||||
"""Initialize the task by scheduling or unscheduling it based on its state.
|
||||
|
||||
Returns the scheduled job if it was successfully scheduled, or None if it was already
|
||||
scheduled or unscheduled.
|
||||
"""
|
||||
job = self._get_existing_job()
|
||||
|
||||
if self.enabled and not job:
|
||||
return self.schedule()
|
||||
elif job and not self.enabled:
|
||||
return self.unschedule()
|
||||
self.unschedule()
|
||||
return None
|
||||
return None
|
||||
|
||||
def schedule(self):
|
||||
def schedule(self) -> Job | None:
|
||||
"""Schedule the task if it is enabled and not already scheduled.
|
||||
|
||||
Returns the scheduled job if successful, or None otherwise.
|
||||
"""
|
||||
if not self.enabled:
|
||||
raise SchedulerException(f"Scheduled {self.description} is not enabled.")
|
||||
|
||||
@@ -75,19 +90,25 @@ class PeriodicTask(Task):
|
||||
|
||||
return None
|
||||
|
||||
def unschedule(self):
|
||||
job = self._get_existing_job()
|
||||
def unschedule(self) -> bool:
|
||||
"""Unschedule the task if it is currently scheduled.
|
||||
|
||||
Returns whether the unscheduling was successful.
|
||||
"""
|
||||
job = self._get_existing_job()
|
||||
if not job:
|
||||
log.info(f"{self.description.capitalize()} is not scheduled.")
|
||||
return None
|
||||
return False
|
||||
|
||||
tasks_scheduler.cancel(job)
|
||||
log.info(f"{self.description.capitalize()} unscheduled.")
|
||||
return True
|
||||
|
||||
|
||||
class RemoteFilePullTask(PeriodicTask):
|
||||
def __init__(self, *args, url: str, **kwargs):
|
||||
class RemoteFilePullTask(PeriodicTask, ABC):
|
||||
"""Base class for tasks that pull files from a remote URL."""
|
||||
|
||||
def __init__(self, *args: Any, url: str, **kwargs: Any):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.url = url
|
||||
|
||||
|
||||
@@ -73,56 +73,68 @@ class TestPeriodicTask:
|
||||
|
||||
@patch.object(ConcretePeriodicTask, "_get_existing_job")
|
||||
@patch.object(ConcretePeriodicTask, "schedule")
|
||||
@patch.object(ConcretePeriodicTask, "unschedule")
|
||||
def test_init_enabled_no_existing_job(
|
||||
self, mock_schedule, mock_get_existing_job, task
|
||||
self, mock_unschedule, mock_schedule, mock_get_existing_job, task
|
||||
):
|
||||
"""Test init when task is enabled and no existing job"""
|
||||
mock_job = MagicMock(spec=Job)
|
||||
mock_get_existing_job.return_value = None
|
||||
mock_schedule.return_value = "scheduled"
|
||||
mock_schedule.return_value = mock_job
|
||||
|
||||
result = task.init()
|
||||
|
||||
mock_schedule.assert_called_once()
|
||||
assert result == "scheduled"
|
||||
mock_unschedule.assert_not_called()
|
||||
assert result == mock_job
|
||||
|
||||
@patch.object(ConcretePeriodicTask, "_get_existing_job")
|
||||
@patch.object(ConcretePeriodicTask, "schedule")
|
||||
@patch.object(ConcretePeriodicTask, "unschedule")
|
||||
def test_init_disabled_with_existing_job(
|
||||
self, mock_unschedule, mock_get_existing_job, disabled_task
|
||||
self, mock_unschedule, mock_schedule, mock_get_existing_job, disabled_task
|
||||
):
|
||||
"""Test init when task is disabled but has existing job"""
|
||||
mock_job = MagicMock()
|
||||
mock_job = MagicMock(spec=Job)
|
||||
mock_get_existing_job.return_value = mock_job
|
||||
mock_unschedule.return_value = "unscheduled"
|
||||
mock_unschedule.return_value = None
|
||||
|
||||
result = disabled_task.init()
|
||||
|
||||
mock_unschedule.assert_called_once()
|
||||
assert result == "unscheduled"
|
||||
mock_schedule.assert_not_called()
|
||||
assert result is None
|
||||
|
||||
@patch.object(ConcretePeriodicTask, "_get_existing_job")
|
||||
def test_init_enabled_with_existing_job(self, mock_get_existing_job, task):
|
||||
@patch.object(ConcretePeriodicTask, "schedule")
|
||||
@patch.object(ConcretePeriodicTask, "unschedule")
|
||||
def test_init_enabled_with_existing_job(
|
||||
self, mock_unschedule, mock_schedule, mock_get_existing_job, task
|
||||
):
|
||||
"""Test init when task is enabled and job already exists"""
|
||||
mock_job = MagicMock()
|
||||
mock_job = MagicMock(spec=Job)
|
||||
mock_get_existing_job.return_value = mock_job
|
||||
|
||||
result = task.init()
|
||||
|
||||
mock_schedule.assert_not_called()
|
||||
mock_unschedule.assert_not_called()
|
||||
assert result is None # Should do nothing
|
||||
|
||||
@patch.object(ConcretePeriodicTask, "_get_existing_job")
|
||||
@patch.object(tasks_scheduler, "cron")
|
||||
def test_schedule_success(self, mock_cron, mock_get_existing_job, task):
|
||||
"""Test successful scheduling"""
|
||||
mock_job = MagicMock(spec=Job)
|
||||
mock_get_existing_job.return_value = None
|
||||
mock_cron.return_value = "scheduled_job"
|
||||
mock_cron.return_value = mock_job
|
||||
|
||||
result = task.schedule()
|
||||
|
||||
mock_cron.assert_called_once_with(
|
||||
"0 0 * * *", func="test.function", repeat=None
|
||||
)
|
||||
assert result == "scheduled_job"
|
||||
assert result == mock_job
|
||||
|
||||
def test_schedule_not_enabled(self, disabled_task):
|
||||
"""Test scheduling when task is not enabled"""
|
||||
@@ -164,7 +176,7 @@ class TestPeriodicTask:
|
||||
self, mock_log, mock_cancel, mock_get_existing_job, task
|
||||
):
|
||||
"""Test successful unscheduling"""
|
||||
mock_job = MagicMock()
|
||||
mock_job = MagicMock(spec=Job)
|
||||
mock_get_existing_job.return_value = mock_job
|
||||
|
||||
task.unschedule()
|
||||
@@ -222,7 +234,7 @@ class TestRemoteFilePullTask:
|
||||
async def test_run_success(self, mock_log, mock_ctx_httpx_client, task):
|
||||
"""Test successful remote file pull"""
|
||||
mock_client = AsyncMock()
|
||||
mock_response = AsyncMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = b"test content"
|
||||
mock_client.get.return_value = mock_response
|
||||
mock_ctx_httpx_client.get.return_value = mock_client
|
||||
@@ -291,7 +303,7 @@ class TestRemoteFilePullTask:
|
||||
async def test_run_disabled_but_forced(self, mock_ctx_httpx_client, disabled_task):
|
||||
"""Test run when task is disabled but forced"""
|
||||
mock_client = AsyncMock()
|
||||
mock_response = AsyncMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.content = b"forced content"
|
||||
mock_client.get.return_value = mock_response
|
||||
mock_ctx_httpx_client.get.return_value = mock_client
|
||||
|
||||
Reference in New Issue
Block a user