Files
romm/backend/tests/handler/filesystem/test_base_handler.py
Georges-Antoine Assi 1602688d08 fix one more test
2026-01-19 12:48:54 -05:00

481 lines
19 KiB
Python

import asyncio
import shutil
import tempfile
from io import BytesIO
from pathlib import Path
from unittest.mock import Mock, patch
import pytest
from fastapi import UploadFile
from handler.filesystem.base_handler import FSHandler
from models.base import FILE_NAME_MAX_LENGTH
class TestFSHandler:
"""Test suite for FSHandler class"""
@pytest.fixture
def temp_dir(self):
"""Create a temporary directory for testing"""
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.fixture
def handler(self, temp_dir):
"""Create FSHandler instance for testing"""
return FSHandler(temp_dir)
@pytest.fixture
def sample_file_content(self):
"""Sample file content for testing"""
return b"This is test content for file operations"
@pytest.fixture
def mock_upload_file(self, sample_file_content):
"""Mock UploadFile for testing"""
mock_file = Mock(spec=UploadFile)
mock_file.filename = "test_file.txt"
mock_file.file = BytesIO(sample_file_content)
return mock_file
def test_init_creates_base_directory(self, temp_dir):
"""Test that FSHandler creates base directory on initialization"""
# Remove the directory to test creation
shutil.rmtree(temp_dir)
handler = FSHandler(temp_dir)
assert handler.base_path.exists()
assert handler.base_path.is_dir()
def test_init_resolves_path(self, temp_dir):
"""Test that FSHandler resolves the base path"""
handler = FSHandler(temp_dir)
assert handler.base_path == Path(temp_dir).resolve()
def test_sanitize_filename_valid(self, handler: FSHandler):
"""Test filename sanitization with valid filenames"""
assert handler._sanitize_filename("test.txt") == "test.txt"
assert handler._sanitize_filename("file-name_123.zip") == "file-name_123.zip"
assert handler._sanitize_filename("file.name.ext") == "file.name.ext"
def test_sanitize_filename_path_traversal(self, handler: FSHandler):
"""Test filename sanitization prevents path traversal"""
assert handler._sanitize_filename("../test.txt") == "test.txt"
assert handler._sanitize_filename("../../test.txt") == "test.txt"
assert handler._sanitize_filename("/etc/passwd") == "passwd"
assert handler._sanitize_filename("dir/../test.txt") == "test.txt"
def test_sanitize_filename_invalid(self, handler: FSHandler):
"""Test filename sanitization with invalid filenames"""
with pytest.raises(ValueError, match="Empty filename"):
handler._sanitize_filename("")
with pytest.raises(ValueError, match="Invalid filename"):
handler._sanitize_filename(".")
with pytest.raises(ValueError, match="Invalid filename"):
handler._sanitize_filename("..")
def test_sanitize_filename_too_long(self, handler: FSHandler):
"""Test filename sanitization with too long filenames"""
long_name = "a" * (FILE_NAME_MAX_LENGTH + 1)
with pytest.raises(ValueError, match="Filename .* exceeds maximum length"):
handler._sanitize_filename(long_name)
def test_validate_path_valid(self, handler: FSHandler):
"""Test path validation with valid paths"""
valid_paths = [
"test.txt",
"dir/test.txt",
"dir/subdir/test.txt",
"test-file_123.txt",
]
for path in valid_paths:
result = handler.validate_path(path)
assert result.is_relative_to(handler.base_path)
def test_validate_path_traversal_attack(self, handler: FSHandler):
"""Test path validation prevents directory traversal attacks"""
malicious_paths = [
"../test.txt",
"../../etc/passwd",
"dir/../../../etc/passwd",
"dir/../../test.txt",
]
for path in malicious_paths:
with pytest.raises(
ValueError, match="Path .* contains invalid parent directory references"
):
handler.validate_path(path)
def test_validate_path_absolute(self, handler: FSHandler):
"""Test path validation rejects absolute paths"""
absolute_paths = ["/etc/passwd", "/tmp/test.txt", "/home/user/file.txt"]
for path in absolute_paths:
with pytest.raises(
ValueError, match="Path .* must be relative, not absolute"
):
handler.validate_path(path)
def test_get_file_name_with_no_extension(self, handler: FSHandler):
"""Test file name extraction without extension"""
assert handler.get_file_name_with_no_extension("test.txt") == "test"
assert handler.get_file_name_with_no_extension("file.tar.gz") == "file"
assert handler.get_file_name_with_no_extension("file.with.dots.txt") == "file"
assert handler.get_file_name_with_no_extension("no_extension") == "no_extension"
def test_get_file_name_with_no_tags(self, handler: FSHandler):
"""Test file name extraction without tags"""
assert handler.get_file_name_with_no_tags("game (USA).rom") == "game"
assert handler.get_file_name_with_no_tags("game [Beta].rom") == "game"
assert handler.get_file_name_with_no_tags("game (USA) [Beta].rom") == "game"
assert handler.get_file_name_with_no_tags("plain_name.rom") == "plain_name"
def test_parse_file_extension(self, handler: FSHandler):
"""Test file extension parsing"""
assert handler.parse_file_extension("test.txt") == "txt"
assert handler.parse_file_extension("file.tar.gz") == "tar.gz"
assert handler.parse_file_extension("no_extension") == ""
assert handler.parse_file_extension("file.with.dots.txt") == "with.dots.txt"
def test_exclude_single_files(self, handler: FSHandler):
"""Test file exclusion functionality"""
files = ["test.txt", "game.rom", "excluded.tmp", "data.json"]
# Mock configuration
with patch("handler.filesystem.base_handler.cm.get_config") as mock_config:
mock_config.return_value.EXCLUDED_SINGLE_EXT = ["tmp"]
mock_config.return_value.EXCLUDED_SINGLE_FILES = ["test.txt"]
result = handler.exclude_single_files(files)
assert "excluded.tmp" not in result
assert "test.txt" not in result
assert "game.rom" in result
assert "data.json" in result
async def test_make_directory(self, handler: FSHandler):
"""Test directory creation"""
await handler.make_directory("test_dir")
full_path = handler.base_path / "test_dir"
assert full_path.exists()
assert full_path.is_dir()
async def test_make_directory_nested(self, handler: FSHandler):
"""Test nested directory creation"""
await handler.make_directory("parent/child/grandchild")
full_path = handler.base_path / "parent" / "child" / "grandchild"
assert full_path.exists()
assert full_path.is_dir()
async def test_make_directory_exists(self, handler: FSHandler):
"""Test directory creation when directory already exists"""
await handler.make_directory("test_dir")
await handler.make_directory("test_dir") # Should not raise error
full_path = handler.base_path / "test_dir"
assert full_path.exists()
assert full_path.is_dir()
async def test_make_directory_file_exists(self, handler: FSHandler):
"""Test directory creation when file with same name exists"""
# Create a file first
(handler.base_path / "test_file").touch()
with pytest.raises(
FileNotFoundError, match="Path already exists and is not a directory"
):
await handler.make_directory("test_file")
async def test_list_directories(self, handler: FSHandler):
"""Test directory listing"""
# Create test directories
await handler.make_directory("dir1")
await handler.make_directory("dir2")
await handler.make_directory("parent/child")
# Create a file (should not be listed)
(handler.base_path / "file.txt").touch()
dirs = await handler.list_directories(".")
assert "dir1" in dirs
assert "dir2" in dirs
assert "parent" in dirs
assert "file.txt" not in dirs
async def test_list_directories_nonexistent(self, handler: FSHandler):
"""Test directory listing with nonexistent directory"""
with pytest.raises(
FileNotFoundError, match="Path does not exist or is not a directory"
):
await handler.list_directories("nonexistent")
async def test_remove_directory(self, handler: FSHandler):
"""Test directory removal"""
# Create directory with content
await handler.make_directory("test_dir/subdir")
(handler.base_path / "test_dir" / "file.txt").touch()
await handler.remove_directory("test_dir")
assert not (handler.base_path / "test_dir").exists()
async def test_remove_directory_nonexistent(self, handler: FSHandler):
"""Test directory removal with nonexistent directory"""
with pytest.raises(
FileNotFoundError, match="Path does not exist or is not a directory"
):
await handler.remove_directory("nonexistent")
async def test_write_file_upload_file(self, handler: FSHandler, mock_upload_file):
"""Test file writing with UploadFile"""
await handler.write_file(mock_upload_file, ".", "test_file.txt")
file_path = handler.base_path / "test_file.txt"
assert file_path.exists()
assert file_path.read_bytes() == b"This is test content for file operations"
async def test_write_file_bytes(self, handler: FSHandler, sample_file_content):
"""Test file writing with bytes"""
await handler.write_file(sample_file_content, ".", "test_file.txt")
file_path = handler.base_path / "test_file.txt"
assert file_path.exists()
assert file_path.read_bytes() == sample_file_content
async def test_write_file_binary_io(self, handler: FSHandler, sample_file_content):
"""Test file writing with BinaryIO"""
bio = BytesIO(sample_file_content)
await handler.write_file(bio, ".", "test_file.txt")
file_path = handler.base_path / "test_file.txt"
assert file_path.exists()
assert file_path.read_bytes() == sample_file_content
async def test_write_file_nested_path(
self, handler: FSHandler, sample_file_content
):
"""Test file writing in nested path"""
await handler.write_file(sample_file_content, "parent/child", "test_file.txt")
file_path = handler.base_path / "parent" / "child" / "test_file.txt"
assert file_path.exists()
assert file_path.read_bytes() == sample_file_content
async def test_write_file_no_filename(
self, handler: FSHandler, sample_file_content
):
"""Test file writing without filename"""
with pytest.raises(ValueError, match="Filename cannot be empty"):
await handler.write_file(sample_file_content, ".", None)
async def test_write_file_streamed(self, handler: FSHandler, sample_file_content):
"""Test streamed file writing"""
async with await handler.write_file_streamed(".", "test_file.txt") as f:
await f.write(sample_file_content)
file_path = handler.base_path / "test_file.txt"
assert file_path.exists()
assert file_path.read_bytes() == sample_file_content
async def test_read_file(self, handler: FSHandler, sample_file_content):
"""Test file reading"""
# Write file first
await handler.write_file(sample_file_content, ".", "test_file.txt")
# Read file
content = await handler.read_file("test_file.txt")
assert content == sample_file_content
async def test_read_file_nonexistent(self, handler: FSHandler):
"""Test reading nonexistent file"""
with pytest.raises(FileNotFoundError, match="File not found"):
await handler.read_file("nonexistent.txt")
async def test_stream_file(self, handler: FSHandler, sample_file_content):
"""Test file streaming"""
# Write file first
await handler.write_file(sample_file_content, ".", "test_file.txt")
# Stream file
async with await handler.stream_file("test_file.txt") as f:
content = await f.read()
assert content == sample_file_content
async def test_stream_file_nonexistent(self, handler: FSHandler):
"""Test streaming nonexistent file"""
with pytest.raises(FileNotFoundError, match="File not found"):
await handler.stream_file("nonexistent.txt")
async def test_move_file(self, handler: FSHandler, sample_file_content):
"""Test file moving"""
# Write source file
await handler.write_file(sample_file_content, ".", "source.txt")
# Move file
await handler.move_file_or_folder("source.txt", "destination.txt")
assert not (handler.base_path / "source.txt").exists()
assert (handler.base_path / "destination.txt").exists()
assert (
handler.base_path / "destination.txt"
).read_bytes() == sample_file_content
async def test_move_file_to_nested_path(
self, handler: FSHandler, sample_file_content
):
"""Test moving file to nested path"""
# Write source file
await handler.write_file(sample_file_content, ".", "source.txt")
# Move file to nested path
await handler.move_file_or_folder("source.txt", "parent/child/destination.txt")
assert not (handler.base_path / "source.txt").exists()
assert (handler.base_path / "parent" / "child" / "destination.txt").exists()
async def test_move_file_nonexistent(self, handler: FSHandler):
"""Test moving nonexistent file"""
with pytest.raises(FileNotFoundError, match="Source file or folder not found"):
await handler.move_file_or_folder("nonexistent.txt", "destination.txt")
async def test_remove_file(self, handler: FSHandler, sample_file_content):
"""Test file removal"""
# Write file first
await handler.write_file(sample_file_content, ".", "test_file.txt")
# Remove file
await handler.remove_file("test_file.txt")
assert not (handler.base_path / "test_file.txt").exists()
async def test_remove_file_nonexistent(self, handler: FSHandler):
"""Test removing nonexistent file"""
with pytest.raises(FileNotFoundError, match="File not found"):
await handler.remove_file("nonexistent.txt")
async def test_list_files(self, handler: FSHandler, sample_file_content):
"""Test file listing"""
# Create test files
await handler.write_file(sample_file_content, ".", "file1.txt")
await handler.write_file(sample_file_content, ".", "file2.txt")
await handler.make_directory("subdir")
files = await handler.list_files(".")
assert "file1.txt" in files
assert "file2.txt" in files
assert "subdir" not in files # Directories should not be listed
async def test_list_files_nonexistent(self, handler: FSHandler):
"""Test listing files in nonexistent directory"""
with pytest.raises(FileNotFoundError, match="Directory not found"):
await handler.list_files("nonexistent")
async def test_file_exists(self, handler: FSHandler, sample_file_content):
"""Test file existence check"""
assert not await handler.file_exists("test_file.txt")
await handler.write_file(sample_file_content, ".", "test_file.txt")
assert await handler.file_exists("test_file.txt")
async def test_file_exists_directory(self, handler: FSHandler):
"""Test file existence check on directory"""
await handler.make_directory("test_dir")
assert not await handler.file_exists(
"test_dir"
) # Should return False for directories
async def test_get_file_size(self, handler: FSHandler, sample_file_content):
"""Test file size retrieval"""
await handler.write_file(sample_file_content, ".", "test_file.txt")
size = await handler.get_file_size("test_file.txt")
assert size == len(sample_file_content)
async def test_get_file_size_nonexistent(self, handler: FSHandler):
"""Test file size retrieval for nonexistent file"""
with pytest.raises(FileNotFoundError, match="File not found"):
await handler.get_file_size("nonexistent.txt")
async def test_async_concurrency(self, handler: FSHandler):
"""Test async concurrency of file operations"""
async def write_file(filename, content):
await handler.write_file(content, ".", filename)
# Test concurrent file writes using asyncio
tasks = []
for i in range(10):
content = f"Content {i}".encode()
task = write_file(f"file_{i}.txt", content)
tasks.append(task)
# Wait for all to complete
await asyncio.gather(*tasks)
# Verify all files were written correctly
for i in range(10):
assert await handler.file_exists(f"file_{i}.txt")
content = await handler.read_file(f"file_{i}.txt")
assert content == f"Content {i}".encode()
async def test_empty_path_validation(self, handler: FSHandler):
"""Test validation of empty paths"""
with pytest.raises(ValueError, match="cannot be empty"):
await handler.read_file("")
with pytest.raises(ValueError, match="cannot be empty"):
await handler.file_exists("")
with pytest.raises(ValueError, match="cannot be empty"):
await handler.get_file_size("")
async def test_atomic_write_rollback(self, handler: FSHandler):
"""Test atomic write rollback on failure"""
# Mock a failure during file write
def failing_move(*args, **kwargs):
raise OSError("Simulated failure")
with patch("shutil.move", side_effect=failing_move):
with pytest.raises(OSError, match="Simulated failure"):
await handler.write_file(b"test content", ".", "test_file.txt")
# Verify no temporary files are left behind
temp_files = [f for f in await handler.list_files(".") if f.startswith(".tmp_")]
assert len(temp_files) == 0
# Verify the target file was not created
assert not await handler.file_exists("test_file.txt")
async def test_concurrent_directory_operations(self, handler: FSHandler):
"""Test concurrent directory operations"""
async def create_and_remove_dir(dir_name):
await handler.make_directory(dir_name)
await handler.remove_directory(dir_name)
# Test concurrent directory operations using asyncio
tasks = []
for i in range(5):
task = create_and_remove_dir(f"test_dir_{i}")
tasks.append(task)
# Wait for all to complete
await asyncio.gather(*tasks)
# Verify all directories were cleaned up
dirs = await handler.list_directories(".")
for i in range(5):
assert f"test_dir_{i}" not in dirs