diff --git a/backend/endpoints/netplay.py b/backend/endpoints/netplay.py index 8ae28a48d..fac2b3b73 100644 --- a/backend/endpoints/netplay.py +++ b/backend/endpoints/netplay.py @@ -4,6 +4,7 @@ from fastapi import HTTPException, Request, status from decorators.auth import protected_route from handler.auth.constants import Scope +from handler.netplay_handler import NetplayRoom, netplay_handler from utils.router import APIRouter router = APIRouter( @@ -12,41 +13,6 @@ router = APIRouter( ) -class NetplayPlayerInfo(TypedDict): - socketId: str - player_name: str - userid: str | None - playerId: str | None - - -class NetplayRoom(TypedDict): - owner: str - players: Dict[str, NetplayPlayerInfo] - peers: list[str] - room_name: str - game_id: str - domain: Optional[str] - password: Optional[str] - max_players: int - - -netplay_rooms: dict[str, NetplayRoom] = {} - -# # Background cleanup task to delete empty rooms periodically -# async def cleanup_empty_rooms_loop(): -# while True: -# await asyncio.sleep(60) -# to_delete = [sid for sid, r in rooms.items() if len(r.get("players", {})) == 0] -# for sid in to_delete: -# del rooms[sid] - - -# # Start cleanup task on app startup -# @app.on_event("startup") -# async def startup_event(): -# asyncio.create_task(cleanup_empty_rooms_loop()) - - DEFAULT_MAX_PLAYERS = 4 @@ -77,13 +43,16 @@ class RoomsResponse(TypedDict): @protected_route(router.get, "/list", [Scope.ASSETS_READ]) -def get_rooms(request: Request, game_id: Optional[str]) -> Dict[str, RoomsResponse]: +async def get_rooms( + request: Request, game_id: Optional[str] +) -> Dict[str, RoomsResponse]: if not game_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Missing game_id query parameter", ) + netplay_rooms = await netplay_handler.get_all() open_rooms: Dict[str, RoomsResponse] = {} for session_id, room in netplay_rooms.items(): if not _is_room_open(room, game_id): diff --git a/backend/endpoints/sockets/netplay.py b/backend/endpoints/sockets/netplay.py index 3fde2655f..0490b86c9 100644 --- a/backend/endpoints/sockets/netplay.py +++ b/backend/endpoints/sockets/netplay.py @@ -1,11 +1,7 @@ from typing import Any, NotRequired, TypedDict -from endpoints.netplay import ( - DEFAULT_MAX_PLAYERS, - NetplayPlayerInfo, - NetplayRoom, - netplay_rooms, -) +from endpoints.netplay import DEFAULT_MAX_PLAYERS +from handler.netplay_handler import NetplayPlayerInfo, NetplayRoom, netplay_handler from handler.socket_handler import netplay_socket_handler @@ -35,10 +31,10 @@ async def open_room(sid: str, data: RoomData): if not session_id or not player_id: return "Invalid data: sessionId and playerId required" - if session_id in netplay_rooms: + if await netplay_handler.get(session_id): return "Room already exists" - netplay_rooms[session_id] = NetplayRoom( + new_room = NetplayRoom( owner=sid, players={ player_id: NetplayPlayerInfo( @@ -55,6 +51,7 @@ async def open_room(sid: str, data: RoomData): password=extra_data.get("room_password", None), max_players=data.get("maxPlayers") or DEFAULT_MAX_PLAYERS, ) + await netplay_handler.set(session_id, new_room) await netplay_socket_handler.socket_server.enter_room(sid, session_id) await netplay_socket_handler.socket_server.save_session( @@ -65,7 +62,7 @@ async def open_room(sid: str, data: RoomData): }, ) await netplay_socket_handler.socket_server.emit( - "users-updated", netplay_rooms[session_id]["players"], room=session_id + "users-updated", new_room["players"], room=session_id ) @@ -79,10 +76,10 @@ async def join_room(sid: str, data: RoomData): if not session_id or not player_id: return "Invalid data: sessionId and playerId required" - if session_id not in netplay_rooms: + current_room = await netplay_handler.get(session_id) + if not current_room: return "Room not found" - current_room = netplay_rooms[session_id] if current_room["password"] and current_room["password"] != extra_data.get( "room_password" ): @@ -97,6 +94,7 @@ async def join_room(sid: str, data: RoomData): userid=extra_data.get("userid"), playerId=extra_data.get("playerId"), ) + await netplay_handler.set(session_id, current_room) await netplay_socket_handler.socket_server.enter_room(sid, session_id) await netplay_socket_handler.socket_server.save_session( @@ -114,35 +112,37 @@ async def join_room(sid: str, data: RoomData): async def _handle_leave(sid: str, session_id: str, player_id: str): - current_room = netplay_rooms[session_id] + current_room = await netplay_handler.get(session_id) + if not current_room: + return + current_room["players"].pop(player_id, None) + await netplay_handler.set(session_id, current_room) await netplay_socket_handler.socket_server.emit( "users-updated", current_room["players"], room=session_id ) if not current_room["players"]: - netplay_rooms.pop(session_id, None) + await netplay_handler.delete(session_id) elif sid == current_room["owner"]: remaining = list(current_room["players"].keys()) if remaining: current_room["owner"] = current_room["players"][remaining[0]]["socketId"] + await netplay_handler.set(session_id, current_room) await netplay_socket_handler.socket_server.emit( "users-updated", current_room["players"], room=session_id ) @netplay_socket_handler.socket_server.on("leave-room") # type: ignore -async def leave_room(sid: str, data: RoomData): - extra_data = data["extra"] +async def leave_room(sid: str): + stored_session = await netplay_socket_handler.socket_server.get_session(sid) + session_id = stored_session.get("session_id") + player_id = stored_session.get("player_id") - session_id = extra_data["sessionid"] - player_id = extra_data["userid"] or extra_data["playerId"] - - if not session_id or not player_id: - return - - await _handle_leave(sid, session_id, player_id) - await netplay_socket_handler.socket_server.leave_room(sid, session_id) + if session_id and player_id: + await _handle_leave(sid, session_id, player_id) + await netplay_socket_handler.socket_server.leave_room(sid, session_id) class WebRTCSignalData(TypedDict, total=False): @@ -222,5 +222,5 @@ async def disconnect(sid: str): session_id = stored_session.get("session_id") player_id = stored_session.get("player_id") - if session_id: + if session_id and player_id: await _handle_leave(sid, session_id, player_id) diff --git a/backend/handler/netplay_handler.py b/backend/handler/netplay_handler.py new file mode 100644 index 000000000..89e89bae3 --- /dev/null +++ b/backend/handler/netplay_handler.py @@ -0,0 +1,50 @@ +import json +from typing import Optional, TypedDict + +from handler.redis_handler import async_cache + + +class NetplayPlayerInfo(TypedDict): + socketId: str + player_name: str + userid: str | None + playerId: str | None + + +class NetplayRoom(TypedDict): + owner: str + players: dict[str, NetplayPlayerInfo] + peers: list[str] + room_name: str + game_id: str + domain: Optional[str] + password: Optional[str] + max_players: int + + +class NetplayHandler: + """A class to handle netplay rooms in Redis.""" + + def __init__(self): + self.hash_name = "netplay:rooms" + + async def get(self, room_id: str) -> NetplayRoom | None: + """Get a room from Redis.""" + room = await async_cache.hget(self.hash_name, room_id) + return json.loads(room) if room else None + + def set(self, room_id: str, room_data: NetplayRoom): + """Set a room in Redis.""" + return async_cache.hset(self.hash_name, room_id, json.dumps(room_data)) + + def delete(self, room_id: str): + """Delete a room from Redis.""" + return async_cache.hdel(self.hash_name, room_id) + + async def get_all(self) -> dict[str, NetplayRoom]: + """Get all rooms from Redis.""" + rooms = await async_cache.hgetall(self.hash_name) + return {room_id: json.loads(room_data) for room_id, room_data in rooms.items()} + + +netplay_handler = NetplayHandler() diff --git a/backend/handler/socket_handler.py b/backend/handler/socket_handler.py index 39e07ef8d..9b5d5ebe8 100644 --- a/backend/handler/socket_handler.py +++ b/backend/handler/socket_handler.py @@ -5,7 +5,7 @@ from utils import json_module class SocketHandler: - def __init__(self, path: str = "/ws/socket.io") -> None: + def __init__(self, path: str) -> None: self.socket_server = socketio.AsyncServer( cors_allowed_origins="*", async_mode="asgi", @@ -22,5 +22,5 @@ class SocketHandler: self.socket_app = socketio.ASGIApp(self.socket_server, socketio_path=path) -socket_handler = SocketHandler() +socket_handler = SocketHandler(path="/ws/socket.io") netplay_socket_handler = SocketHandler(path="/netplay/socket.io") diff --git a/backend/startup.py b/backend/startup.py index ca1a7db51..6a66e3342 100644 --- a/backend/startup.py +++ b/backend/startup.py @@ -24,6 +24,7 @@ from handler.metadata.base_handler import ( from handler.redis_handler import async_cache from logger.logger import log from models.firmware import FIRMWARE_FIXTURES_DIR, KNOWN_BIOS_KEY +from tasks.scheduled.cleanup_netplay import cleanup_netplay_task from tasks.scheduled.convert_images_to_webp import convert_images_to_webp_task from tasks.scheduled.scan_library import scan_library_task from tasks.scheduled.sync_retroachievements_progress import ( @@ -46,6 +47,8 @@ async def main() -> None: log.info("Running startup tasks") # Initialize scheduled tasks + cleanup_netplay_task.init() + if ENABLE_SCHEDULED_RESCAN: log.info("Starting scheduled rescan") scan_library_task.init() diff --git a/backend/tasks/scheduled/cleanup_netplay.py b/backend/tasks/scheduled/cleanup_netplay.py new file mode 100644 index 000000000..1a8222426 --- /dev/null +++ b/backend/tasks/scheduled/cleanup_netplay.py @@ -0,0 +1,33 @@ +from handler.netplay_handler import netplay_handler +from logger.logger import log +from tasks.tasks import PeriodicTask, TaskType + + +class CleanupNetplayTask(PeriodicTask): + def __init__(self): + super().__init__( + title="Scheduled netplay cleanup", + description="Cleans up empty netplay rooms", + task_type=TaskType.CLEANUP, + enabled=True, + manual_run=False, + cron_string="*/5 * * * *", # Every 5 minutes + func="tasks.scheduled.cleanup_netplay.cleanup_netplay_task.run", + ) + + async def run(self) -> None: + if not self.enabled: + self.unschedule() + return + + netplay_rooms = await netplay_handler.get_all() + rooms_to_delete = [ + sid for sid, r in netplay_rooms.items() if len(r.get("players", {})) == 0 + ] + if rooms_to_delete: + log.info(f"Cleaning up {len(rooms_to_delete)} empty netplay rooms") + for sid in rooms_to_delete: + await netplay_handler.delete(sid) + + +cleanup_netplay_task = CleanupNetplayTask() diff --git a/docker/nginx/templates/default.conf.template b/docker/nginx/templates/default.conf.template index 3b637c611..b7031a721 100644 --- a/docker/nginx/templates/default.conf.template +++ b/docker/nginx/templates/default.conf.template @@ -57,13 +57,7 @@ server { proxy_request_buffering off; proxy_buffering off; } - location /ws { - proxy_pass http://wsgi_server; - proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - } - location /netplay { + location ~ ^/(ws|netplay) { proxy_pass http://wsgi_server; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; diff --git a/frontend/vite.config.js b/frontend/vite.config.js index bb76a36dc..47aade6cb 100644 --- a/frontend/vite.config.js +++ b/frontend/vite.config.js @@ -123,12 +123,7 @@ export default defineConfig(({ mode }) => { changeOrigin: false, secure: false, }, - "/ws": { - target: `http://127.0.0.1:${backendPort}`, - changeOrigin: false, - ws: true, - }, - "/netplay": { + "^/(?:ws|netplay)": { target: `http://127.0.0.1:${backendPort}`, changeOrigin: false, ws: true,