Use best-match algo for more providers

This commit is contained in:
Georges-Antoine Assi
2025-08-07 16:57:29 -04:00
parent 3f98ef6b00
commit 1a2944806b
5 changed files with 232 additions and 197 deletions

View File

@@ -3,6 +3,7 @@ import json
import os
import re
import unicodedata
from difflib import SequenceMatcher
from functools import lru_cache
from itertools import batched
from typing import Final, NotRequired, TypedDict
@@ -92,6 +93,29 @@ def _normalize_search_term(
return name.strip()
WORD_TOKEN_PATTERN = re.compile(r"\b\w+\b")
def wagner_fischer_distance(s1: str, s2: str) -> int:
if len(s1) < len(s2):
return wagner_fischer_distance(s2, s1)
if len(s2) == 0:
return len(s1)
previous_row = list(range(len(s2) + 1))
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return previous_row[-1]
class MetadataHandler:
def __init__(self):
# Initialize cache data lazily when the handler is first instantiated
@@ -109,6 +133,115 @@ class MetadataHandler:
) -> str:
return _normalize_search_term(name, remove_articles, remove_punctuation)
def calculate_text_similarity(
self,
normalized_search_term: str,
game_name: str,
remove_articles: bool = True,
remove_punctuation: bool = True,
) -> float:
"""
Calculate similarity between search term and game name using multiple metrics.
Returns a score between 0 and 1, where 1 is a perfect match.
Args:
search_term: The search term to compare
game_name: The game name to compare against
Returns:
Similarity score between 0 and 1
"""
game_normalized = self.normalize_search_term(
game_name,
remove_articles=remove_articles,
remove_punctuation=remove_punctuation,
)
# Exact match gets the highest score
if normalized_search_term == game_normalized:
return 1.0
# Split into tokens for word-based matching
search_tokens = set(WORD_TOKEN_PATTERN.findall(normalized_search_term.lower()))
game_tokens = set(WORD_TOKEN_PATTERN.findall(game_normalized.lower()))
# Calculate token overlap ratio
if search_tokens and game_tokens:
intersection = search_tokens & game_tokens
union = search_tokens | game_tokens
token_overlap_ratio = len(intersection) / len(union)
else:
token_overlap_ratio = 0.0
# Calculate sequence similarity (better for longer strings)
sequence_ratio = SequenceMatcher(
None, normalized_search_term, game_normalized
).ratio()
# Calculate Wagner-Fischer distance (normalized by max length)
max_len = max(len(normalized_search_term), len(game_normalized))
if max_len > 0:
wagner_fischer_ratio = 1 - (
wagner_fischer_distance(normalized_search_term, game_normalized)
/ max_len
)
else:
wagner_fischer_ratio = 1.0
# Token overlap is most important for game titles
final_score = (
token_overlap_ratio * 0.5
+ sequence_ratio * 0.3
+ wagner_fischer_ratio * 0.2
)
return final_score
def find_best_match(
self,
normalized_search_term: str,
game_names: list[str],
min_similarity_score: float = 0.75,
remove_articles: bool = True,
remove_punctuation: bool = True,
) -> tuple[str | None, float]:
"""
Find the best matching game name from a list of candidates.
Args:
search_term: The search term to match
game_names: List of game names to check against
min_similarity_score: Minimum similarity score to consider a match
Returns:
Tuple of (best_match_name, similarity_score) or (None, 0.0) if no good match
"""
if not game_names:
return None, 0.0
best_match = None
best_score = 0.0
for game_name in game_names:
score = self.calculate_text_similarity(
normalized_search_term,
game_name,
remove_articles=remove_articles,
remove_punctuation=remove_punctuation,
)
if score > best_score:
best_score = score
best_match = game_name
# Early exit for perfect match
if score == 1.0:
break
if best_score >= min_similarity_score:
return best_match, best_score
return None, 0.0
async def _ps2_opl_format(self, match: re.Match[str], search_term: str) -> str:
serial_code = match.group(1)
index_entry = await async_cache.hget(PS2_OPL_KEY, serial_code)

View File

@@ -223,6 +223,7 @@ class IGDBHandler(MetadataHandler):
"Client-ID": IGDB_CLIENT_ID,
"Accept": "application/json",
}
self.min_similarity_score: Final = 0.75
@staticmethod
def check_twitch_token(func):
@@ -332,40 +333,31 @@ class IGDBHandler(MetadataHandler):
else:
game_type_filter = ""
def is_exact_match(rom: dict, search_term: str) -> bool:
search_term_lower = search_term.lower()
if rom["slug"].lower() == search_term_lower:
return True
# Check both the ROM name and alternative names for an exact match.
rom_names = [rom["name"]] + [
alternative_name["name"]
for alternative_name in rom.get("alternative_names", [])
]
return any(
(
rom_name.lower() == search_term_lower
or self.normalize_search_term(rom_name) == search_term
)
for rom_name in rom_names
)
log.debug("Searching in games endpoint with game_type %s", game_type_filter)
roms = await self._request(
self.games_endpoint,
data=f'search "{uc(search_term)}"; fields {",".join(self.games_fields)}; where platforms=[{platform_igdb_id}] {game_type_filter};',
)
for rom in roms:
# Return early if an exact match is found.
if is_exact_match(rom, search_term):
return rom
games_by_name = {game["name"]: game for game in roms}
best_match, best_score = self.find_best_match(
search_term,
list(games_by_name.keys()),
min_similarity_score=self.min_similarity_score,
remove_punctuation=False,
)
if best_match:
log.debug(
f"Found match for '{search_term}' -> '{best_match}' (score: {best_score:.3f})"
)
return games_by_name[best_match]
log.debug("Searching expanded in search endpoint")
roms_expanded = await self._request(
self.search_endpoint,
data=f'fields {",".join(self.search_fields)}; where game.platforms=[{platform_igdb_id}] & (name ~ *"{search_term}"* | alternative_name ~ *"{search_term}"*);',
)
if roms_expanded:
log.debug(
"Searching expanded in games endpoint for expanded game %s",
@@ -375,14 +367,23 @@ class IGDBHandler(MetadataHandler):
self.games_endpoint,
f'fields {",".join(self.games_fields)}; where id={roms_expanded[0]["game"]["id"]};',
)
for rom in extra_roms:
# Return early if an exact match is found.
if is_exact_match(rom, search_term):
return rom
games_by_name = {game["name"]: game for game in extra_roms}
best_match, best_score = self.find_best_match(
search_term,
list(games_by_name.keys()),
min_similarity_score=self.min_similarity_score,
remove_punctuation=False,
)
if best_match:
log.debug(
f"Found match for '{search_term}' -> '{best_match}' (score: {best_score:.3f})"
)
return games_by_name[best_match]
roms.extend(extra_roms)
return roms[0] if roms else None
return None
# @check_twitch_token
# async def get_platforms(self) -> None:

View File

@@ -6,6 +6,7 @@ import pydash
from adapters.services.mobygames import MobyGamesService
from adapters.services.mobygames_types import MobyGame
from config import MOBYGAMES_API_KEY
from logger.logger import log
from unidecode import unidecode as uc
from .base_hander import (
@@ -21,6 +22,8 @@ from .base_hander import UniversalPlatformSlug as UPS
# Used to display the Mobygames API status in the frontend
MOBY_API_ENABLED: Final = bool(MOBYGAMES_API_KEY)
SEARCH_TERM_SPLIT_PATTERN = re.compile(r"[\:\-\/]")
PS1_MOBY_ID: Final = 6
PS2_MOBY_ID: Final = 7
PSP_MOBY_ID: Final = 46
@@ -74,6 +77,7 @@ def extract_metadata_from_moby_rom(rom: MobyGame) -> MobyMetadata:
class MobyGamesHandler(MetadataHandler):
def __init__(self) -> None:
self.moby_service = MobyGamesService()
self.min_similarity_score: Final = 0.75
async def _search_rom(
self, search_term: str, platform_moby_id: int
@@ -88,16 +92,20 @@ class MobyGamesHandler(MetadataHandler):
if not roms:
return None
# Find an exact match.
search_term_casefold = search_term.casefold()
for rom in roms:
if (
rom["title"].casefold() == search_term_casefold
or self.normalize_search_term(rom["title"]) == search_term
):
return rom
games_by_name = {game["title"]: game for game in roms}
best_match, best_score = self.find_best_match(
search_term,
list(games_by_name.keys()),
min_similarity_score=self.min_similarity_score,
remove_punctuation=False,
)
if best_match:
log.debug(
f"Found match for '{search_term}' -> '{best_match}' (score: {best_score:.3f})"
)
return games_by_name[best_match]
return roms[0]
return None
def get_platform(self, slug: str) -> MobyGamesPlatform:
if slug not in MOBYGAMES_PLATFORM_LIST:
@@ -179,30 +187,17 @@ class MobyGamesHandler(MetadataHandler):
search_term = await self._mame_format(search_term)
fallback_rom = MobyGamesRom(moby_id=None, name=search_term)
normalized_search_term = self.normalize_search_term(search_term)
normalized_search_term = self.normalize_search_term(
search_term, remove_punctuation=False
)
res = await self._search_rom(normalized_search_term, platform_moby_id)
# Moby API doesn't handle some special characters well
if not res and (
": " in search_term or " - " in search_term or "/" in search_term
):
if ":" in search_term:
terms = [
s.strip() for s in search_term.split(":") if len(s.strip()) > 2
]
elif " - " in search_term:
terms = [
s.strip() for s in search_term.split(" - ") if len(s.strip()) > 2
]
else:
terms = [
s.strip() for s in search_term.split("/") if len(s.strip()) > 2
]
for i in range(len(terms) - 1, -1, -1):
res = await self._search_rom(terms[i], platform_moby_id)
if res:
break
terms = re.split(SEARCH_TERM_SPLIT_PATTERN, search_term)
res = await self._search_rom(terms[-1], platform_moby_id)
if not res:
return fallback_rom

View File

@@ -1,6 +1,4 @@
import asyncio
import re
from difflib import SequenceMatcher
from typing import Final, NotRequired, TypedDict
from adapters.services.steamgriddb import SteamGridDBService
@@ -10,29 +8,6 @@ from logger.logger import log
from .base_hander import MetadataHandler
WORD_TOKEN_PATTERN = re.compile(r"\b\w+\b")
def levenshtein_distance(s1: str, s2: str) -> int:
if len(s1) < len(s2):
return levenshtein_distance(s2, s1)
if len(s2) == 0:
return len(s1)
previous_row = list(range(len(s2) + 1))
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1
deletions = current_row[j] + 1
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return previous_row[-1]
# Used to display the Mobygames API status in the frontend
STEAMGRIDDB_API_ENABLED: Final = bool(STEAMGRIDDB_API_KEY)
@@ -56,57 +31,8 @@ class SGDBRom(TypedDict):
class SGDBBaseHandler(MetadataHandler):
def __init__(self) -> None:
self.sgdb_service = SteamGridDBService()
self.max_levenshtein_distance: Final = 2
self.min_sequence_ratio: Final = 0.85
self.min_token_overlap_ratio: Final = 0.7
self.min_similarity_score: Final = 0.75
def _calculate_title_similarity(
self, search_normalized: str, game_name: str
) -> float:
"""
Calculate similarity between search term and game name using multiple metrics.
Returns a score between 0 and 1, where 1 is a perfect match.
"""
game_normalized = self.normalize_search_term(game_name, remove_articles=False)
# Exact match gets the highest score
if search_normalized == game_normalized:
return 1.0
# Split into tokens for word-based matching
search_tokens = set(WORD_TOKEN_PATTERN.findall(search_normalized.lower()))
game_tokens = set(WORD_TOKEN_PATTERN.findall(game_normalized.lower()))
# Calculate token overlap ratio
if search_tokens and game_tokens:
intersection = search_tokens & game_tokens
union = search_tokens | game_tokens
token_overlap_ratio = len(intersection) / len(union)
else:
token_overlap_ratio = 0.0
# Calculate sequence similarity (better for longer strings)
sequence_ratio = SequenceMatcher(
None, search_normalized, game_normalized
).ratio()
# Calculate Levenshtein distance (normalized by max length)
max_len = max(len(search_normalized), len(game_normalized))
if max_len > 0:
levenshtein_ratio = 1 - (
levenshtein_distance(search_normalized, game_normalized) / max_len
)
else:
levenshtein_ratio = 1.0
# Token overlap is most important for game titles
final_score = (
token_overlap_ratio * 0.5 + sequence_ratio * 0.3 + levenshtein_ratio * 0.2
)
return final_score
async def get_details(self, search_term: str) -> list[SGDBResult]:
if not STEAMGRIDDB_API_ENABLED:
return []
@@ -135,46 +61,34 @@ class SGDBBaseHandler(MetadataHandler):
log.debug(f"Could not find '{search_term}' on SteamGridDB")
continue
game_scores = []
for game in games:
similarity_score = self._calculate_title_similarity(
search_term, game["name"]
games_by_name = {game["name"]: game for game in games}
best_match, best_score = self.find_best_match(
search_term,
list(games_by_name.keys()),
min_similarity_score=self.min_similarity_score,
remove_articles=False,
)
if best_match:
game_details = await self._get_game_covers(
game_id=games_by_name[best_match]["id"],
game_name=games_by_name[best_match]["name"],
types=(SGDBType.STATIC,),
is_nsfw=False,
is_humor=False,
is_epilepsy=False,
)
game_scores.append((game, similarity_score))
# A perfect match is found, no need to check other games
if similarity_score == 1.0:
break
# Sort by similarity score (descending) to get the best match first
game_scores.sort(key=lambda x: x[1], reverse=True)
# Try the best matches within the threshold
for game, score in game_scores:
if score >= self.min_similarity_score:
game_details = await self._get_game_covers(
game_id=game["id"],
game_name=game["name"],
types=(SGDBType.STATIC,),
is_nsfw=False,
is_humor=False,
is_epilepsy=False,
first_resource = next(
(res for res in game_details["resources"] if res["url"]), None
)
if first_resource:
log.debug(
f"Found match for '{search_term}' -> '{best_match}' (score: {best_score:.3f})"
)
first_resource = next(
(res for res in game_details["resources"] if res["url"]), None
return SGDBRom(
sgdb_id=games_by_name[best_match]["id"],
url_cover=first_resource["url"],
)
if first_resource:
log.debug(
f"Found match for '{search_term}' -> '{game['name']}' (score: {score:.3f})"
)
return SGDBRom(
sgdb_id=game["id"], url_cover=first_resource["url"]
)
else:
# If the best match is below threshold, don't try others
break
log.debug(f"No good match found for '{', '.join(game_names)}' on SteamGridDB")
return SGDBRom(sgdb_id=None)

View File

@@ -8,6 +8,7 @@ import pydash
from adapters.services.screenscraper import ScreenScraperService
from adapters.services.screenscraper_types import SSGame, SSGameDate
from config import SCREENSCRAPER_PASSWORD, SCREENSCRAPER_USER
from logger.logger import log
from unidecode import unidecode as uc
from .base_hander import (
@@ -25,6 +26,8 @@ SS_API_ENABLED: Final = bool(SCREENSCRAPER_USER) and bool(SCREENSCRAPER_PASSWORD
SS_DEV_ID: Final = base64.b64decode("enVyZGkxNQ==").decode()
SS_DEV_PASSWORD: Final = base64.b64decode("eFRKd29PRmpPUUc=").decode()
SEARCH_TERM_SPLIT_PATTERN = re.compile(r"[\:\-\/]")
PS1_SS_ID: Final = 57
PS2_SS_ID: Final = 58
PSP_SS_ID: Final = 61
@@ -275,30 +278,32 @@ def extract_metadata_from_ss_rom(rom: SSGame) -> SSMetadata:
class SSHandler(MetadataHandler):
def __init__(self) -> None:
self.ss_service = ScreenScraperService()
self.min_similarity_score: Final = 0.75
async def _search_rom(self, search_term: str, platform_ss_id: int) -> SSGame | None:
if not platform_ss_id:
return None
def is_exact_match(rom: SSGame, search_term: str) -> bool:
rom_names = [name.get("text", "").lower() for name in rom.get("noms", [])]
return any(
(
rom_name.lower() == search_term.lower()
or self.normalize_search_term(rom_name) == search_term
)
for rom_name in rom_names
)
roms = await self.ss_service.search_games(
term=quote(uc(search_term), safe="/ "),
system_id=platform_ss_id,
)
for rom in roms:
if is_exact_match(rom, search_term):
return rom
games_by_name = {
name["text"]: rom for rom in roms for name in rom.get("noms", [])
}
best_match, best_score = self.find_best_match(
search_term,
list(games_by_name.keys()),
min_similarity_score=self.min_similarity_score,
remove_punctuation=False,
)
if best_match:
log.debug(
f"Found match for '{search_term}' -> '{best_match}' (score: {best_score:.3f})"
)
return games_by_name[best_match]
return roms[0] if roms else None
@@ -387,29 +392,16 @@ class SSHandler(MetadataHandler):
normalized_search_term = self.normalize_search_term(
search_term, remove_punctuation=False
)
res = await self._search_rom(normalized_search_term, platform_ss_id)
res = await self._search_rom(
normalized_search_term.replace(": ", " - "), platform_ss_id
)
# SS API doesn't handle some special characters well
if not res and (
": " in search_term or " - " in search_term or "/" in search_term
):
if ": " in search_term:
terms = [
s.strip() for s in search_term.split(":") if len(s.strip()) > 2
]
elif " - " in search_term:
terms = [
s.strip() for s in search_term.split(" - ") if len(s.strip()) > 2
]
else:
terms = [
s.strip() for s in search_term.split("/") if len(s.strip()) > 2
]
for i in range(len(terms) - 1, -1, -1):
res = await self._search_rom(terms[i], platform_ss_id)
if res:
break
terms = re.split(SEARCH_TERM_SPLIT_PATTERN, search_term)
res = await self._search_rom(terms[-1], platform_ss_id)
if not res or not res.get("id"):
return fallback_rom