mirror of
https://github.com/rommapp/romm.git
synced 2026-02-19 07:50:57 +01:00
Create separate `tests/` folder for all tests. This will also simplify not copying tests code into the Docker image.
560 lines
20 KiB
Python
560 lines
20 KiB
Python
from unittest.mock import Mock, patch
|
|
|
|
import pytest
|
|
from fastapi import status
|
|
from fastapi.testclient import TestClient
|
|
from main import app
|
|
from tasks.tasks import Task
|
|
|
|
|
|
@pytest.fixture
|
|
def client():
|
|
with TestClient(app) as client:
|
|
yield client
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_task():
|
|
"""Create a mock task for testing"""
|
|
task = Mock(spec=Task)
|
|
task.title = "Test Task"
|
|
task.description = "A test task for unit testing"
|
|
task.enabled = True
|
|
task.manual_run = True
|
|
task.cron_string = "0 0 * * *"
|
|
task.run = Mock()
|
|
return task
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_disabled_task():
|
|
"""Create a mock disabled task for testing"""
|
|
task = Mock(spec=Task)
|
|
task.title = "Disabled Task"
|
|
task.description = "A disabled task for testing"
|
|
task.enabled = False
|
|
task.manual_run = True
|
|
task.cron_string = None
|
|
task.run = Mock()
|
|
return task
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_non_manual_task():
|
|
"""Create a mock task that cannot be run manually"""
|
|
task = Mock(spec=Task)
|
|
task.title = "Non-Manual Task"
|
|
task.description = "A task that cannot be run manually"
|
|
task.enabled = True
|
|
task.manual_run = False
|
|
task.cron_string = "0 0 * * *"
|
|
task.run = Mock()
|
|
return task
|
|
|
|
|
|
class TestListTasks:
|
|
"""Test suite for the list_tasks endpoint"""
|
|
|
|
@patch("endpoints.tasks.ENABLE_RESCAN_ON_FILESYSTEM_CHANGE", True)
|
|
@patch("endpoints.tasks.RESCAN_ON_FILESYSTEM_CHANGE_DELAY", 5)
|
|
@patch(
|
|
"endpoints.tasks.manual_tasks",
|
|
{
|
|
"test_manual": Mock(
|
|
spec=Task,
|
|
title="Manual Task",
|
|
description="Manual task",
|
|
enabled=True,
|
|
manual_run=True,
|
|
cron_string=None,
|
|
)
|
|
},
|
|
)
|
|
@patch(
|
|
"endpoints.tasks.scheduled_tasks",
|
|
{
|
|
"test_scheduled": Mock(
|
|
spec=Task,
|
|
title="Scheduled Task",
|
|
description="Scheduled task",
|
|
enabled=True,
|
|
manual_run=False,
|
|
cron_string="0 0 * * *",
|
|
)
|
|
},
|
|
)
|
|
def test_list_tasks_success(self, client, access_token):
|
|
"""Test successful listing of all tasks"""
|
|
response = client.get(
|
|
"/api/tasks", headers={"Authorization": f"Bearer {access_token}"}
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
data = response.json()
|
|
|
|
# Check structure
|
|
assert "scheduled" in data
|
|
assert "manual" in data
|
|
assert "watcher" in data
|
|
|
|
# Check scheduled tasks
|
|
assert len(data["scheduled"]) == 1
|
|
scheduled_task = data["scheduled"][0]
|
|
assert scheduled_task["name"] == "test_scheduled"
|
|
assert scheduled_task["title"] == "Scheduled Task"
|
|
assert scheduled_task["description"] == "Scheduled task"
|
|
assert scheduled_task["enabled"] is True
|
|
assert scheduled_task["manual_run"] is False
|
|
assert scheduled_task["cron_string"] == "0 0 * * *"
|
|
|
|
# Check manual tasks
|
|
assert len(data["manual"]) == 1
|
|
manual_task = data["manual"][0]
|
|
assert manual_task["name"] == "test_manual"
|
|
assert manual_task["title"] == "Manual Task"
|
|
assert manual_task["description"] == "Manual task"
|
|
assert manual_task["enabled"] is True
|
|
assert manual_task["manual_run"] is True
|
|
assert manual_task["cron_string"] == ""
|
|
|
|
# Check watcher task
|
|
assert len(data["watcher"]) == 1
|
|
watcher_task = data["watcher"][0]
|
|
assert watcher_task["name"] == "filesystem_watcher"
|
|
assert watcher_task["title"] == "Rescan on filesystem change"
|
|
assert "5 minute delay" in watcher_task["description"]
|
|
assert watcher_task["enabled"] is True
|
|
assert watcher_task["manual_run"] is False
|
|
assert watcher_task["cron_string"] == ""
|
|
|
|
@patch("endpoints.tasks.ENABLE_RESCAN_ON_FILESYSTEM_CHANGE", False)
|
|
@patch("endpoints.tasks.RESCAN_ON_FILESYSTEM_CHANGE_DELAY", 10)
|
|
@patch("endpoints.tasks.manual_tasks", {})
|
|
@patch("endpoints.tasks.scheduled_tasks", {})
|
|
def test_list_tasks_empty(self, client, access_token):
|
|
"""Test listing tasks when no tasks are available"""
|
|
response = client.get(
|
|
"/api/tasks", headers={"Authorization": f"Bearer {access_token}"}
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
data = response.json()
|
|
|
|
assert data["scheduled"] == []
|
|
assert data["manual"] == []
|
|
assert len(data["watcher"]) == 1
|
|
assert data["watcher"][0]["enabled"] is False
|
|
assert "10 minute delay" in data["watcher"][0]["description"]
|
|
|
|
def test_list_tasks_unauthorized(self, client):
|
|
"""Test that unauthorized requests are rejected"""
|
|
response = client.get("/api/tasks")
|
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
def test_list_tasks_insufficient_scope(self, client, admin_user):
|
|
"""Test that requests without proper scope are rejected"""
|
|
# Create a token without TASKS_RUN scope
|
|
from datetime import timedelta
|
|
|
|
from handler.auth import oauth_handler
|
|
|
|
data = {
|
|
"sub": admin_user.username,
|
|
"iss": "romm:oauth",
|
|
"scopes": "roms:read", # Missing TASKS_RUN scope
|
|
"type": "access",
|
|
}
|
|
|
|
token = oauth_handler.create_oauth_token(
|
|
data=data, expires_delta=timedelta(minutes=30)
|
|
)
|
|
|
|
response = client.get(
|
|
"/api/tasks", headers={"Authorization": f"Bearer {token}"}
|
|
)
|
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
|
|
class TestRunAllTasks:
|
|
"""Test suite for the run_all_tasks endpoint"""
|
|
|
|
@patch(
|
|
"endpoints.tasks.low_prio_queue.enqueue",
|
|
return_value=Mock(
|
|
get_id=Mock(return_value="1"), get_status=Mock(return_value="queued")
|
|
),
|
|
)
|
|
@patch(
|
|
"endpoints.tasks.manual_tasks",
|
|
{
|
|
"task1": Mock(spec=Task, enabled=True, manual_run=True, run=Mock()),
|
|
"task2": Mock(spec=Task, enabled=True, manual_run=True, run=Mock()),
|
|
},
|
|
)
|
|
@patch(
|
|
"endpoints.tasks.scheduled_tasks",
|
|
{
|
|
"task3": Mock(spec=Task, enabled=True, manual_run=True, run=Mock()),
|
|
"task4": Mock(
|
|
spec=Task, enabled=False, manual_run=True, run=Mock()
|
|
), # Disabled
|
|
},
|
|
)
|
|
def test_run_all_tasks_success(self, mock_queue, client, access_token):
|
|
"""Test successful running of all runnable tasks"""
|
|
response = client.post(
|
|
"/api/tasks/run", headers={"Authorization": f"Bearer {access_token}"}
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
data = response.json()
|
|
assert len(data) == 3
|
|
assert data[0]["task_name"] == "task1"
|
|
assert data[1]["task_name"] == "task2"
|
|
assert data[2]["task_name"] == "task3"
|
|
|
|
@patch("endpoints.tasks.low_prio_queue")
|
|
@patch("endpoints.tasks.manual_tasks", {})
|
|
@patch("endpoints.tasks.scheduled_tasks", {})
|
|
def test_run_all_tasks_no_runnable_tasks(self, mock_queue, client, access_token):
|
|
"""Test running all tasks when no tasks are runnable"""
|
|
response = client.post(
|
|
"/api/tasks/run", headers={"Authorization": f"Bearer {access_token}"}
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
data = response.json()
|
|
assert data["detail"] == "No runnable tasks available to run"
|
|
|
|
# Verify that enqueue was not called
|
|
mock_queue.assert_not_called()
|
|
|
|
@patch("endpoints.tasks.low_prio_queue")
|
|
@patch(
|
|
"endpoints.tasks.manual_tasks",
|
|
{
|
|
"task1": Mock(
|
|
spec=Task, enabled=True, manual_run=False, run=Mock()
|
|
), # Not manual
|
|
"task2": Mock(
|
|
spec=Task, enabled=False, manual_run=True, run=Mock()
|
|
), # Disabled
|
|
},
|
|
)
|
|
@patch("endpoints.tasks.scheduled_tasks", {})
|
|
def test_run_all_tasks_mixed_conditions(self, mock_queue, client, access_token):
|
|
"""Test running all tasks with mixed enabled/disabled and manual/non-manual tasks"""
|
|
response = client.post(
|
|
"/api/tasks/run", headers={"Authorization": f"Bearer {access_token}"}
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
data = response.json()
|
|
assert data["detail"] == "No runnable tasks available to run"
|
|
|
|
# Verify that enqueue was not called since no tasks are both enabled and manual
|
|
mock_queue.enqueue.assert_not_called()
|
|
|
|
def test_run_all_tasks_unauthorized(self, client):
|
|
"""Test that unauthorized requests are rejected"""
|
|
response = client.post("/api/tasks/run")
|
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
|
|
class TestRunSingleTask:
|
|
"""Test suite for the run_single_task endpoint"""
|
|
|
|
@patch(
|
|
"endpoints.tasks.low_prio_queue.enqueue",
|
|
return_value=Mock(
|
|
get_id=Mock(return_value="1"), get_status=Mock(return_value="queued")
|
|
),
|
|
)
|
|
@patch(
|
|
"endpoints.tasks.manual_tasks",
|
|
{"test_task": Mock(spec=Task, enabled=True, manual_run=True, run=Mock())},
|
|
)
|
|
@patch("endpoints.tasks.scheduled_tasks", {})
|
|
def test_run_single_task_success(self, mock_queue, client, access_token):
|
|
"""Test successful running of a single task"""
|
|
response = client.post(
|
|
"/api/tasks/run/test_task",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
data = response.json()
|
|
|
|
assert data["task_name"] == "test_task"
|
|
assert data["task_id"] == "1"
|
|
assert data["status"] == "queued"
|
|
assert "queued_at" in data
|
|
|
|
mock_queue.assert_called_once()
|
|
|
|
@patch("endpoints.tasks.manual_tasks", {})
|
|
@patch("endpoints.tasks.scheduled_tasks", {})
|
|
def test_run_single_task_not_found(self, client, access_token):
|
|
"""Test running a non-existent task"""
|
|
response = client.post(
|
|
"/api/tasks/run/nonexistent_task",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
data = response.json()
|
|
assert "not found" in data["detail"].lower()
|
|
|
|
@patch("endpoints.tasks.low_prio_queue")
|
|
@patch(
|
|
"endpoints.tasks.manual_tasks",
|
|
{"disabled_task": Mock(spec=Task, enabled=False, manual_run=True, run=Mock())},
|
|
)
|
|
@patch("endpoints.tasks.scheduled_tasks", {})
|
|
def test_run_single_task_disabled(self, mock_queue, client, access_token):
|
|
"""Test running a disabled task"""
|
|
response = client.post(
|
|
"/api/tasks/run/disabled_task",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
data = response.json()
|
|
assert "cannot be run" in data["detail"].lower()
|
|
|
|
@patch("endpoints.tasks.low_prio_queue")
|
|
@patch(
|
|
"endpoints.tasks.manual_tasks",
|
|
{
|
|
"non_manual_task": Mock(
|
|
spec=Task, enabled=True, manual_run=False, run=Mock()
|
|
)
|
|
},
|
|
)
|
|
@patch("endpoints.tasks.scheduled_tasks", {})
|
|
def test_run_single_task_non_manual(self, mock_queue, client, access_token):
|
|
"""Test running a task that cannot be run manually"""
|
|
response = client.post(
|
|
"/api/tasks/run/non_manual_task",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
|
data = response.json()
|
|
assert "cannot be run" in data["detail"].lower()
|
|
|
|
def test_run_single_task_unauthorized(self, client):
|
|
"""Test running a task without authentication"""
|
|
response = client.post("/api/tasks/run/test_task")
|
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
|
|
class TestGetTaskById:
|
|
"""Test suite for the get_task_by_id endpoint"""
|
|
|
|
@patch("endpoints.tasks.low_prio_queue")
|
|
@patch("endpoints.tasks.Job.fetch")
|
|
def test_get_task_by_id_success(
|
|
self, mock_job_fetch, mock_queue, client, access_token
|
|
):
|
|
"""Test successful retrieval of a task by job ID"""
|
|
# Mock job object with all necessary attributes
|
|
mock_job = Mock()
|
|
mock_job.created_at = Mock()
|
|
mock_job.created_at.isoformat.return_value = "2023-01-01T00:00:00"
|
|
mock_job.started_at = Mock()
|
|
mock_job.started_at.isoformat.return_value = "2023-01-01T00:01:00"
|
|
mock_job.ended_at = Mock()
|
|
mock_job.ended_at.isoformat.return_value = "2023-01-01T00:02:00"
|
|
mock_job.meta = {"task_name": "test_task"}
|
|
mock_job.func_name = "test_task"
|
|
mock_job.get_status.return_value = "finished"
|
|
|
|
mock_job_fetch.return_value = mock_job
|
|
|
|
response = client.get(
|
|
"/api/tasks/test-job-id-123",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
data = response.json()
|
|
|
|
assert data["task_name"] == "test_task"
|
|
assert data["task_id"] == "test-job-id-123"
|
|
assert data["status"] == "finished"
|
|
assert data["queued_at"] == "2023-01-01T00:00:00"
|
|
assert data["started_at"] == "2023-01-01T00:01:00"
|
|
assert data["ended_at"] == "2023-01-01T00:02:00"
|
|
|
|
mock_job_fetch.assert_called_once_with(
|
|
"test-job-id-123", connection=mock_queue.connection
|
|
)
|
|
|
|
@patch("endpoints.tasks.low_prio_queue")
|
|
@patch("endpoints.tasks.Job.fetch")
|
|
def test_get_task_by_id_not_found(
|
|
self, mock_job_fetch, mock_queue, client, access_token
|
|
):
|
|
"""Test retrieval of a non-existent task by job ID"""
|
|
mock_job_fetch.side_effect = Exception("Job not found")
|
|
|
|
response = client.get(
|
|
"/api/tasks/nonexistent-job-id",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|
|
data = response.json()
|
|
assert "not found" in data["detail"].lower()
|
|
|
|
@patch("endpoints.tasks.low_prio_queue")
|
|
@patch("endpoints.tasks.Job.fetch")
|
|
def test_get_task_by_id_with_exception_info(
|
|
self, mock_job_fetch, mock_queue, client, access_token
|
|
):
|
|
"""Test retrieval of a task that failed with exception"""
|
|
mock_job = Mock()
|
|
mock_job.created_at = Mock()
|
|
mock_job.created_at.isoformat.return_value = "2023-01-01T00:00:00"
|
|
mock_job.started_at = Mock()
|
|
mock_job.started_at.isoformat.return_value = "2023-01-01T00:01:00"
|
|
mock_job.ended_at = Mock()
|
|
mock_job.ended_at.isoformat.return_value = "2023-01-01T00:01:30"
|
|
mock_job.meta = {"task_name": "test_task"}
|
|
mock_job.func_name = "test_task"
|
|
mock_job.get_status.return_value = "failed"
|
|
|
|
mock_job_fetch.return_value = mock_job
|
|
|
|
response = client.get(
|
|
"/api/tasks/failed-job-id",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
data = response.json()
|
|
|
|
assert data["status"] == "failed"
|
|
|
|
@patch("endpoints.tasks.low_prio_queue")
|
|
@patch("endpoints.tasks.Job.fetch")
|
|
def test_get_task_by_id_no_metadata(
|
|
self, mock_job_fetch, mock_queue, client, access_token
|
|
):
|
|
"""Test retrieval of a task with no metadata"""
|
|
mock_job = Mock()
|
|
mock_job.created_at = Mock()
|
|
mock_job.created_at.isoformat.return_value = "2023-01-01T00:00:00"
|
|
mock_job.started_at = None
|
|
mock_job.ended_at = None
|
|
mock_job.meta = None
|
|
mock_job.func_name = "test_task"
|
|
mock_job.get_status.return_value = "queued"
|
|
|
|
mock_job_fetch.return_value = mock_job
|
|
|
|
response = client.get(
|
|
"/api/tasks/queued-job-id",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
data = response.json()
|
|
|
|
assert data["task_name"] == "test_task"
|
|
assert data["status"] == "queued"
|
|
assert data["started_at"] is None
|
|
assert data["ended_at"] is None
|
|
|
|
def test_get_task_by_id_unauthorized(self, client):
|
|
"""Test retrieval of a task without authentication"""
|
|
response = client.get("/api/tasks/test-job-id")
|
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
|
|
class TestTaskInfoBuilding:
|
|
"""Test suite for the _build_task_info helper function"""
|
|
|
|
@patch("endpoints.tasks._build_task_info")
|
|
def test_build_task_info_structure(
|
|
self, mock_build_task_info, client, access_token
|
|
):
|
|
"""Test that _build_task_info creates correct TaskInfo structure"""
|
|
# Mock the helper function to return a known structure
|
|
mock_build_task_info.return_value = {
|
|
"name": "test_task",
|
|
"title": "Test Task",
|
|
"description": "Test Description",
|
|
"enabled": True,
|
|
"manual_run": True,
|
|
"cron_string": "0 0 * * *",
|
|
}
|
|
|
|
with patch(
|
|
"endpoints.tasks.manual_tasks",
|
|
{
|
|
"test_task": Mock(
|
|
spec=Task,
|
|
title="Test Task",
|
|
description="Test Description",
|
|
enabled=True,
|
|
manual_run=True,
|
|
cron_string="0 0 * * *",
|
|
)
|
|
},
|
|
):
|
|
with patch("endpoints.tasks.scheduled_tasks", {}):
|
|
response = client.get(
|
|
"/api/tasks", headers={"Authorization": f"Bearer {access_token}"}
|
|
)
|
|
|
|
assert response.status_code == status.HTTP_200_OK
|
|
# The mock ensures the structure is correct
|
|
|
|
|
|
class TestIntegration:
|
|
"""Integration tests for the tasks endpoints"""
|
|
|
|
@patch("endpoints.tasks.ENABLE_RESCAN_ON_FILESYSTEM_CHANGE", True)
|
|
@patch("endpoints.tasks.RESCAN_ON_FILESYSTEM_CHANGE_DELAY", 5)
|
|
@patch(
|
|
"endpoints.tasks.low_prio_queue.enqueue",
|
|
return_value=Mock(
|
|
get_id=Mock(return_value="1"), get_status=Mock(return_value="queued")
|
|
),
|
|
)
|
|
def test_full_workflow(self, mock_queue, client, access_token):
|
|
"""Test a complete workflow: list tasks, then run a specific task"""
|
|
# First, list all tasks
|
|
list_response = client.get(
|
|
"/api/tasks", headers={"Authorization": f"Bearer {access_token}"}
|
|
)
|
|
assert list_response.status_code == status.HTTP_200_OK
|
|
|
|
# Then run a specific task (if any exist)
|
|
with patch(
|
|
"endpoints.tasks.manual_tasks",
|
|
{
|
|
"workflow_task": Mock(
|
|
spec=Task, enabled=True, manual_run=True, run=Mock()
|
|
)
|
|
},
|
|
):
|
|
with patch("endpoints.tasks.scheduled_tasks", {}):
|
|
run_response = client.post(
|
|
"/api/tasks/run/workflow_task",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert run_response.status_code == status.HTTP_200_OK
|
|
assert mock_queue.called
|
|
|
|
def test_error_handling(self, client, access_token):
|
|
"""Test error handling for various scenarios"""
|
|
# Test with invalid task name
|
|
response = client.post(
|
|
"/api/tasks/run/invalid_task_name_with_special_chars!@#",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert response.status_code == status.HTTP_404_NOT_FOUND
|