changed to ORM database models with sqlalchemy

This commit is contained in:
zurdi zurdo
2023-03-22 13:36:29 +01:00
parent 08f0e73e61
commit 45c2dfb30a
11 changed files with 114 additions and 185 deletions

View File

@@ -1,5 +1,5 @@
requests==2.28.2
fastapi==0.92.0
uvicorn==0.20.0
mariadb==1.0.11 # works in python base image
# mariadb==1.1.6 # works in ubuntu base image
mariadb==1.1.6
SQLAlchemy==2.0.7

View File

@@ -18,14 +18,15 @@ DEFAULT_PATH_COVER_S: str = f"/assets/emulation/resources/default/cover_s.png"
# IGDB
CLIENT_ID=os.getenv('CLIENT_ID')
CLIENT_SECRET=os.getenv('CLIENT_SECRET')
CLIENT_ID: str = os.getenv('CLIENT_ID')
CLIENT_SECRET: str = os.getenv('CLIENT_SECRET')
# STEAMGRIDDB
STEAMGRIDDB_API_KEY=os.getenv('STEAMGRIDDB_API_KEY')
STEAMGRIDDB_API_KEY: str = os.getenv('STEAMGRIDDB_API_KEY')
# DB
DB_HOST=os.getenv('DB_HOST')
DB_PORT=int(os.getenv('DB_PORT'))
DB_ROOT_PASSWD=os.getenv('DB_ROOT_PASSWD')
DB_USER=os.getenv('DB_USER')
DB_PASSWD=os.getenv('DB_PASSWD')
DB_HOST: str = os.getenv('DB_HOST')
DB_PORT: int = int(os.getenv('DB_PORT'))
DB_ROOT_PASSWD: str = os.getenv('DB_ROOT_PASSWD')
DB_USER: str = os.getenv('DB_USER')
DB_PASSWD: str = os.getenv('DB_PASSWD')
DB_NAME: str = 'romm'

View File

@@ -1,29 +0,0 @@
from dataclasses import dataclass
from config.config import DEFAULT_PATH_LOGO
@dataclass
class Platform:
igdb_id: str = ""
sgdb_id: str = ""
slug: str = ""
name: str = ""
path_logo: str = DEFAULT_PATH_LOGO
@dataclass
class Rom:
r_igdb_id: str = ""
r_sgdb_id: str = ""
p_igdb_id: str = ""
p_sgdb_id: str = ""
filename_no_ext: str = ""
filename: str = ""
name: str = ""
r_slug: str = ""
summary: str = ""
p_slug: str = ""
path_cover_l: str = ""
path_cover_s: str = ""
has_cover: int = 0

View File

@@ -1,123 +1,32 @@
import os
import sys
from dataclasses import asdict
from fastapi import HTTPException
import mariadb as db
from config.config import DB_HOST, DB_PORT, DB_USER, DB_PASSWD
from models.base import Session, engine, BaseModel
from models.platform import Platform
from models.rom import Rom
from logger.logger import log
class DBHandler:
class DBHandler:
def __init__(self) -> None:
self.DATABASE: str = 'romm'
self.PLATFORM_TABLE: str = 'platform'
self.ROM_TABLE: str = 'rom'
try:
self.conn = db.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWD,
database=self.DATABASE,
autocommit=True
)
self.conn.auto_reconnect = True
except db.Error as e:
log.error(f"error connecting to the database: {e}")
sys.exit(1)
self.cur = self.conn.cursor()
def regenerate_platform_table(self, igdb_id: str, sgdb_id: str, slug: str, name: str, path_logo: str) -> None:
try:
self.cur.execute(f"""
create table if not exists {self.PLATFORM_TABLE}
({igdb_id} varchar(20), {sgdb_id} varchar(20),
{slug} varchar(50), {name} varchar(100),
{path_logo} varchar(500))
""")
log.info(f"{self.PLATFORM_TABLE} table created")
except Exception as e:
log.error(f"{self.PLATFORM_TABLE} table can't be created: {e}")
raise HTTPException(status_code=500, detail=f"Can't create platform table. {e}")
try:
self.cur.execute(f"truncate {self.PLATFORM_TABLE}")
log.info(f"{self.PLATFORM_TABLE} table truncated")
except Exception as e:
log.error(f"{self.PLATFORM_TABLE} table can't be truncated: {e}")
raise HTTPException(status_code=500, detail=f"Can't truncate platform table. {e}")
def write_platforms(self, platforms: list) -> None:
values: list = [{k: str(v) for k, v in asdict(p).items()} for p in platforms]
try:
self.cur.executemany(f"insert into {self.PLATFORM_TABLE} (igdb_id, sgdb_id, slug, name, path_logo) \
values (%(igdb_id)s, %(sgdb_id)s, %(slug)s, %(name)s, %(path_logo)s)", values)
log.info(f"{self.PLATFORM_TABLE} table populated")
except Exception as e:
log.error(f"{self.PLATFORM_TABLE} can't be populated: {e}")
raise HTTPException(status_code=500, detail=f"Can't write in platform table. {e}")
def get_platforms(self) -> list:
try:
self.cur.execute(f"select igdb_id, sgdb_id, slug, name, path_logo from {self.PLATFORM_TABLE} order by name asc")
log.info(f"platforms details fetch from {self.PLATFORM_TABLE}")
except Exception as e:
log.error(f"platforms details can't be fetch from {self.PLATFORM_TABLE}")
raise HTTPException(status_code=500, detail=f"Can't read platform table. {e}")
return self.cur
def regenerate_rom_table(self, r_igdb_id: str, r_sgdb_id: str, p_igdb_id: str, p_sgdb_id: str, filename_no_ext: str,
filename: str, name: str, r_slug: str, summary: str, p_slug: str, path_cover_s: str, path_cover_l: str, has_cover: bool) -> None:
try:
self.cur.execute(f"""
create table if not exists {self.ROM_TABLE}
({r_igdb_id} varchar(20), {r_sgdb_id} varchar(20),
{p_igdb_id} varchar(20), {p_sgdb_id} varchar(20),
{filename_no_ext} varchar(200), {filename} varchar(200), {name} varchar(100),
{r_slug} varchar(100), {summary} text, {p_slug} varchar(100),
{path_cover_s} varchar(500), {path_cover_l} varchar(500), {has_cover} BOOLEAN)
""")
log.info(f"{self.ROM_TABLE} table created")
except Exception as e:
log.error(f"{self.ROM_TABLE} table can't be created: {e}")
raise HTTPException(status_code=500, detail=f"Can't create rom table. {e}")
try:
self.cur.execute(f"truncate {self.ROM_TABLE}")
log.info(f"{self.ROM_TABLE} table truncated")
except Exception as e:
log.error(f"{self.ROM_TABLE} table can't be truncated: {e}")
raise HTTPException(status_code=500, detail=f"Can't truncate rom table. {e}")
def write_roms(self, roms: list) -> None:
values: list = [{k: str(v) for k, v in asdict(p).items()} for p in roms]
try:
self.cur.executemany(
f"insert into {self.ROM_TABLE} \
(r_igdb_id, r_sgdb_id, p_igdb_id, p_sgdb_id, filename_no_ext, filename, name, r_slug, summary, p_slug, path_cover_s, path_cover_l, has_cover) \
values (%(r_igdb_id)s, %(r_sgdb_id)s, %(p_igdb_id)s, %(p_sgdb_id)s, %(filename_no_ext)s, %(filename)s, %(name)s, %(r_slug)s, %(summary)s, %(p_slug)s, %(path_cover_s)s, %(path_cover_l)s, %(has_cover)s)", values)
log.info(f"{self.ROM_TABLE} table populated")
except Exception as e:
log.error(f"{self.ROM_TABLE} can't be populated: {e}")
raise HTTPException(status_code=500, detail=f"Can't write in platform table. {e}")
BaseModel.metadata.create_all(engine)
self.session = Session()
def get_roms(self, p_slug: str) -> list:
try:
self.cur.execute(f"select r_igdb_id, r_sgdb_id, p_igdb_id, p_sgdb_id, filename_no_ext, filename, name, r_slug, summary, p_slug, path_cover_s, path_cover_l, has_cover from {self.ROM_TABLE} where p_slug = '{p_slug}' order by name asc")
log.info(f"platforms details fetch from {self.ROM_TABLE}")
except Exception as e:
log.error(f"platforms details can't be fetch from {self.ROM_TABLE}")
raise HTTPException(status_code=500, detail=f"Can't read rom table. {e}")
return self.cur
def add_platform(self, **kargs):
self.session.merge(Platform(**kargs))
def get_platforms(self):
return self.session.query(Platform).all()
def close_conn(self) -> None:
log.info("closing database connection")
self.conn.close()
def add_rom(self, **kargs):
self.session.merge(Rom(**kargs))
def get_roms(self, p_slug):
return self.session.query(Rom).filter(Rom.p_slug == p_slug).all()
def commit(self):
self.session.commit()

View File

@@ -1,5 +1,3 @@
import os
import requests
from config.config import STEAMGRIDDB_API_KEY

View File

@@ -1,5 +1,3 @@
from dataclasses import asdict
from fastapi import FastAPI
import uvicorn
@@ -8,7 +6,6 @@ from handler.igdb_handler import IGDBHandler
from handler.sgdb_handler import SGDBHandler
from handler.db_handler import DBHandler
from config.config import PORT, HOST
from data.data import Platform, Rom
from utils import fs, fastapi
@@ -20,16 +17,22 @@ sgdbh: SGDBHandler = SGDBHandler()
dbh: DBHandler = DBHandler()
@app.get("/platforms/{slug}/roms")
async def platforms(slug):
@app.patch("/platforms/{p_slug}/roms/{filename}")
async def editRom(p_slug: str, filename: str):
"""Edits rom details"""
return {'msg': 'WIP'}
@app.get("/platforms/{p_slug}/roms")
async def platforms(p_slug: str):
"""Returns roms data of the desired platform"""
return {'data': [Rom(*r) for r in dbh.get_roms(slug)]}
return {'data': dbh.get_roms(p_slug)}
@app.get("/platforms")
async def platforms():
"""Returns platforms data"""
return {'data': [Platform(*p) for p in dbh.get_platforms()]}
return {'data': dbh.get_platforms()}
@app.get("/scan")
@@ -38,44 +41,47 @@ async def scan(overwrite: bool=False):
log.info("scaning...")
platforms: list = []
roms: list = []
fs.store_default_resources(overwrite)
for p_slug in fs.get_platforms():
platform: dict = {}
log.info(f"Getting {p_slug} details")
p_igdb_id, p_name, url_logo = igdbh.get_platform_details(p_slug)
p_sgdb_id: str = ""
details: list = [p_igdb_id, p_sgdb_id, p_slug, p_name]
platform['slug'] = p_slug
platform['igdb_id'] = p_igdb_id
platform['name'] = p_name
# TODO: refactor logo details logic
#TODO: refactor logo details logic
if (overwrite or not fs.p_logo_exists(p_slug)) and url_logo:
fs.store_p_logo(p_slug, url_logo)
if fs.p_logo_exists(p_slug):
details.append(fs.get_p_logo_path(p_slug))
platform['path_logo']: str = fs.get_p_path_logo(p_slug)
platforms.append(Platform(*details))
dbh.add_platform(**platform)
for filename in fs.get_roms(p_slug):
rom: dict = {}
log.info(f"Getting {filename} details")
r_igdb_id, filename_no_ext, r_slug, r_name, summary, url_cover = igdbh.get_rom_details(filename, p_igdb_id)
r_sgdb_id: str = ""
cover_path_s, cover_path_l, has_cover = fs.get_cover_details(overwrite, p_slug, filename_no_ext, url_cover)
details: list = [r_igdb_id, r_sgdb_id, p_igdb_id, p_sgdb_id,
filename_no_ext, filename, r_name, r_slug, summary, p_slug,
cover_path_s, cover_path_l, has_cover]
roms.append(Rom(*details))
path_cover_s, path_cover_l, has_cover = fs.get_cover_details(overwrite, p_slug, filename_no_ext, url_cover)
rom['filename'] = filename
rom['filename_no_ext'] = filename_no_ext
rom['r_igdb_id'] = r_igdb_id
rom['p_igdb_id'] = p_igdb_id
rom['name'] = r_name
rom['r_slug'] = r_slug
rom['p_slug'] = p_slug
rom['summary'] = summary
rom['path_cover_s'] = path_cover_s
rom['path_cover_l'] = path_cover_l
rom['has_cover'] = has_cover
dbh.regenerate_platform_table(*asdict(Platform()).keys())
dbh.write_platforms(platforms)
dbh.regenerate_rom_table(*asdict(Rom()).keys())
dbh.write_roms(roms)
dbh.add_rom(**rom)
dbh.commit()
return {'msg': 'success'}
if __name__ == '__main__':
uvicorn.run("main:app", host=HOST, port=PORT, reload=True)
dbh.close_conn()

View File

@@ -0,0 +1,10 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from config.config import DB_HOST, DB_PORT, DB_USER, DB_PASSWD, DB_NAME
engine = create_engine(f"mariadb+mariadbconnector://{DB_USER}:{DB_PASSWD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
Session = sessionmaker(bind=engine)
BaseModel = declarative_base()

View File

@@ -0,0 +1,13 @@
from sqlalchemy import Column, String
from config.config import DEFAULT_PATH_LOGO
from models.base import BaseModel
class Platform(BaseModel):
__tablename__ = 'platforms'
igdb_id = Column(String(length=50), default="")
sgdb_id = Column(String(length=50), default="")
slug = Column(String(length=100), primary_key=True)
name = Column(String(length=200), default="")
path_logo = Column(String(length=500), default=DEFAULT_PATH_LOGO)

21
backend/src/models/rom.py Normal file
View File

@@ -0,0 +1,21 @@
from sqlalchemy import Column, String, Text, Boolean
from config.config import DEFAULT_PATH_COVER_S, DEFAULT_PATH_COVER_L
from models.base import BaseModel
class Rom(BaseModel):
__tablename__ = 'roms'
filename = Column(String(length=200), primary_key=True)
filename_no_ext = Column(String(length=200), default="")
r_igdb_id = Column(String(length=50), default="")
p_igdb_id = Column(String(length=50), default="")
r_sgdb_id = Column(String(length=50), default="")
p_sgdb_id = Column(String(length=50), default="")
name = Column(String(length=100), default="")
r_slug = Column(String(length=100), default="")
p_slug = Column(String(length=100), default="")
summary = Column(Text, default="")
path_cover_s = Column(String(length=500), default=DEFAULT_PATH_COVER_S)
path_cover_l = Column(String(length=500), default=DEFAULT_PATH_COVER_L)
has_cover = Column(Boolean, default=False)

View File

@@ -37,7 +37,7 @@ def p_logo_exists(slug: str) -> bool:
return True if os.path.exists(logo_path) else False
def get_p_logo_path(slug: str) -> str:
def get_p_path_logo(slug: str) -> str:
"""Returns platform logo filesystem path
Args:
@@ -72,7 +72,7 @@ def get_platforms() -> list:
Automatically discards the default directory.
"""
try:
platforms: list = list(os.walk(EMULATION_BASE_PATH))[0][1]
platforms: list[str] = list(os.walk(EMULATION_BASE_PATH))[0][1]
if 'resources' in platforms: platforms.remove('resources')
log.info(f"filesystem platforms found: {platforms}")
return platforms
@@ -154,7 +154,7 @@ def get_roms(p_slug) -> list:
Automatically discards the default directory.
"""
try:
roms: list = []
roms: list[str] = []
roms = list(os.walk(f"{EMULATION_BASE_PATH}/{p_slug}/roms/"))[0][2]
log.info(f"filesystem roms found for {p_slug}: {roms}")
except IndexError: