use redis to store netplay data

This commit is contained in:
Georges-Antoine Assi
2025-12-07 12:02:35 -05:00
parent f5676d7cfe
commit 09dbb2e244
8 changed files with 119 additions and 75 deletions

View File

@@ -4,6 +4,7 @@ from fastapi import HTTPException, Request, status
from decorators.auth import protected_route
from handler.auth.constants import Scope
from handler.netplay_handler import NetplayRoom, netplay_handler
from utils.router import APIRouter
router = APIRouter(
@@ -12,41 +13,6 @@ router = APIRouter(
)
class NetplayPlayerInfo(TypedDict):
socketId: str
player_name: str
userid: str | None
playerId: str | None
class NetplayRoom(TypedDict):
owner: str
players: Dict[str, NetplayPlayerInfo]
peers: list[str]
room_name: str
game_id: str
domain: Optional[str]
password: Optional[str]
max_players: int
netplay_rooms: dict[str, NetplayRoom] = {}
# # Background cleanup task to delete empty rooms periodically
# async def cleanup_empty_rooms_loop():
# while True:
# await asyncio.sleep(60)
# to_delete = [sid for sid, r in rooms.items() if len(r.get("players", {})) == 0]
# for sid in to_delete:
# del rooms[sid]
# # Start cleanup task on app startup
# @app.on_event("startup")
# async def startup_event():
# asyncio.create_task(cleanup_empty_rooms_loop())
DEFAULT_MAX_PLAYERS = 4
@@ -77,13 +43,16 @@ class RoomsResponse(TypedDict):
@protected_route(router.get, "/list", [Scope.ASSETS_READ])
def get_rooms(request: Request, game_id: Optional[str]) -> Dict[str, RoomsResponse]:
async def get_rooms(
request: Request, game_id: Optional[str]
) -> Dict[str, RoomsResponse]:
if not game_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing game_id query parameter",
)
netplay_rooms = await netplay_handler.get_all()
open_rooms: Dict[str, RoomsResponse] = {}
for session_id, room in netplay_rooms.items():
if not _is_room_open(room, game_id):

View File

@@ -1,11 +1,7 @@
from typing import Any, NotRequired, TypedDict
from endpoints.netplay import (
DEFAULT_MAX_PLAYERS,
NetplayPlayerInfo,
NetplayRoom,
netplay_rooms,
)
from endpoints.netplay import DEFAULT_MAX_PLAYERS
from handler.netplay_handler import NetplayPlayerInfo, NetplayRoom, netplay_handler
from handler.socket_handler import netplay_socket_handler
@@ -35,10 +31,10 @@ async def open_room(sid: str, data: RoomData):
if not session_id or not player_id:
return "Invalid data: sessionId and playerId required"
if session_id in netplay_rooms:
if await netplay_handler.get(session_id):
return "Room already exists"
netplay_rooms[session_id] = NetplayRoom(
new_room = NetplayRoom(
owner=sid,
players={
player_id: NetplayPlayerInfo(
@@ -55,6 +51,7 @@ async def open_room(sid: str, data: RoomData):
password=extra_data.get("room_password", None),
max_players=data.get("maxPlayers") or DEFAULT_MAX_PLAYERS,
)
await netplay_handler.set(session_id, new_room)
await netplay_socket_handler.socket_server.enter_room(sid, session_id)
await netplay_socket_handler.socket_server.save_session(
@@ -65,7 +62,7 @@ async def open_room(sid: str, data: RoomData):
},
)
await netplay_socket_handler.socket_server.emit(
"users-updated", netplay_rooms[session_id]["players"], room=session_id
"users-updated", new_room["players"], room=session_id
)
@@ -79,10 +76,10 @@ async def join_room(sid: str, data: RoomData):
if not session_id or not player_id:
return "Invalid data: sessionId and playerId required"
if session_id not in netplay_rooms:
current_room = await netplay_handler.get(session_id)
if not current_room:
return "Room not found"
current_room = netplay_rooms[session_id]
if current_room["password"] and current_room["password"] != extra_data.get(
"room_password"
):
@@ -97,6 +94,7 @@ async def join_room(sid: str, data: RoomData):
userid=extra_data.get("userid"),
playerId=extra_data.get("playerId"),
)
await netplay_handler.set(session_id, current_room)
await netplay_socket_handler.socket_server.enter_room(sid, session_id)
await netplay_socket_handler.socket_server.save_session(
@@ -114,35 +112,37 @@ async def join_room(sid: str, data: RoomData):
async def _handle_leave(sid: str, session_id: str, player_id: str):
current_room = netplay_rooms[session_id]
current_room = await netplay_handler.get(session_id)
if not current_room:
return
current_room["players"].pop(player_id, None)
await netplay_handler.set(session_id, current_room)
await netplay_socket_handler.socket_server.emit(
"users-updated", current_room["players"], room=session_id
)
if not current_room["players"]:
netplay_rooms.pop(session_id, None)
await netplay_handler.delete(session_id)
elif sid == current_room["owner"]:
remaining = list(current_room["players"].keys())
if remaining:
current_room["owner"] = current_room["players"][remaining[0]]["socketId"]
await netplay_handler.set(session_id, current_room)
await netplay_socket_handler.socket_server.emit(
"users-updated", current_room["players"], room=session_id
)
@netplay_socket_handler.socket_server.on("leave-room") # type: ignore
async def leave_room(sid: str, data: RoomData):
extra_data = data["extra"]
async def leave_room(sid: str):
stored_session = await netplay_socket_handler.socket_server.get_session(sid)
session_id = stored_session.get("session_id")
player_id = stored_session.get("player_id")
session_id = extra_data["sessionid"]
player_id = extra_data["userid"] or extra_data["playerId"]
if not session_id or not player_id:
return
await _handle_leave(sid, session_id, player_id)
await netplay_socket_handler.socket_server.leave_room(sid, session_id)
if session_id and player_id:
await _handle_leave(sid, session_id, player_id)
await netplay_socket_handler.socket_server.leave_room(sid, session_id)
class WebRTCSignalData(TypedDict, total=False):
@@ -222,5 +222,5 @@ async def disconnect(sid: str):
session_id = stored_session.get("session_id")
player_id = stored_session.get("player_id")
if session_id:
if session_id and player_id:
await _handle_leave(sid, session_id, player_id)

View File

@@ -0,0 +1,50 @@
import json
from typing import Optional, TypedDict
from handler.redis_handler import async_cache
class NetplayPlayerInfo(TypedDict):
socketId: str
player_name: str
userid: str | None
playerId: str | None
class NetplayRoom(TypedDict):
owner: str
players: dict[str, NetplayPlayerInfo]
peers: list[str]
room_name: str
game_id: str
domain: Optional[str]
password: Optional[str]
max_players: int
class NetplayHandler:
"""A class to handle netplay rooms in Redis."""
def __init__(self):
self.hash_name = "netplay:rooms"
async def get(self, room_id: str) -> NetplayRoom | None:
"""Get a room from Redis."""
room = await async_cache.hget(self.hash_name, room_id)
return json.loads(room) if room else None
def set(self, room_id: str, room_data: NetplayRoom):
"""Set a room in Redis."""
return async_cache.hset(self.hash_name, room_id, json.dumps(room_data))
def delete(self, room_id: str):
"""Delete a room from Redis."""
return async_cache.hdel(self.hash_name, room_id)
async def get_all(self) -> dict[str, NetplayRoom]:
"""Get all rooms from Redis."""
rooms = await async_cache.hgetall(self.hash_name)
return {room_id: json.loads(room_data) for room_id, room_data in rooms.items()}
netplay_handler = NetplayHandler()

View File

@@ -5,7 +5,7 @@ from utils import json_module
class SocketHandler:
def __init__(self, path: str = "/ws/socket.io") -> None:
def __init__(self, path: str) -> None:
self.socket_server = socketio.AsyncServer(
cors_allowed_origins="*",
async_mode="asgi",
@@ -22,5 +22,5 @@ class SocketHandler:
self.socket_app = socketio.ASGIApp(self.socket_server, socketio_path=path)
socket_handler = SocketHandler()
socket_handler = SocketHandler(path="/ws/socket.io")
netplay_socket_handler = SocketHandler(path="/netplay/socket.io")

View File

@@ -24,6 +24,7 @@ from handler.metadata.base_handler import (
from handler.redis_handler import async_cache
from logger.logger import log
from models.firmware import FIRMWARE_FIXTURES_DIR, KNOWN_BIOS_KEY
from tasks.scheduled.cleanup_netplay import cleanup_netplay_task
from tasks.scheduled.convert_images_to_webp import convert_images_to_webp_task
from tasks.scheduled.scan_library import scan_library_task
from tasks.scheduled.sync_retroachievements_progress import (
@@ -46,6 +47,8 @@ async def main() -> None:
log.info("Running startup tasks")
# Initialize scheduled tasks
cleanup_netplay_task.init()
if ENABLE_SCHEDULED_RESCAN:
log.info("Starting scheduled rescan")
scan_library_task.init()

View File

@@ -0,0 +1,33 @@
from handler.netplay_handler import netplay_handler
from logger.logger import log
from tasks.tasks import PeriodicTask, TaskType
class CleanupNetplayTask(PeriodicTask):
def __init__(self):
super().__init__(
title="Scheduled netplay cleanup",
description="Cleans up empty netplay rooms",
task_type=TaskType.CLEANUP,
enabled=True,
manual_run=False,
cron_string="*/5 * * * *", # Every 5 minutes
func="tasks.scheduled.cleanup_netplay.cleanup_netplay_task.run",
)
async def run(self) -> None:
if not self.enabled:
self.unschedule()
return
netplay_rooms = await netplay_handler.get_all()
rooms_to_delete = [
sid for sid, r in netplay_rooms.items() if len(r.get("players", {})) == 0
]
if rooms_to_delete:
log.info(f"Cleaning up {len(rooms_to_delete)} empty netplay rooms")
for sid in rooms_to_delete:
await netplay_handler.delete(sid)
cleanup_netplay_task = CleanupNetplayTask()

View File

@@ -57,13 +57,7 @@ server {
proxy_request_buffering off;
proxy_buffering off;
}
location /ws {
proxy_pass http://wsgi_server;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
location /netplay {
location ~ ^/(ws|netplay) {
proxy_pass http://wsgi_server;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;

View File

@@ -123,12 +123,7 @@ export default defineConfig(({ mode }) => {
changeOrigin: false,
secure: false,
},
"/ws": {
target: `http://127.0.0.1:${backendPort}`,
changeOrigin: false,
ws: true,
},
"/netplay": {
"^/(?:ws|netplay)": {
target: `http://127.0.0.1:${backendPort}`,
changeOrigin: false,
ws: true,