mod auth; mod db; mod hand_shake; mod http; mod lobby; mod message_relay; use crate::auth::AuthBackend; use crate::hand_shake::{ ClientServerSpecificData, DisconnectData, inform_client_of_connection, init_and_connect, shutdown_connection, }; use crate::lobby::{AppState, reload_config}; use crate::message_relay::{handle_client_logic, handle_server_logic}; use axum::Router; use axum::extract::ws::{Message, WebSocket}; use axum::extract::{State, WebSocketUpgrade}; use axum::response::IntoResponse; use axum::routing::get; use axum_login::{AuthManagerLayerBuilder, AuthSession}; use bytes::Bytes; use futures_util::SinkExt; use futures_util::stream::StreamExt; use std::sync::Arc; use std::time::Duration; use time::Duration as TimeDuration; use tokio::sync::Mutex; use axum::http::{HeaderName, Method}; use tower_http::cors::{AllowOrigin, CorsLayer}; use tower_http::services::{ServeDir, ServeFile}; use tower_sessions::{Expiry, SessionManagerLayer}; use tower_sessions::MemoryStore; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[tokio::main] /// Activates error tracing, spawns a watch dog task to eliminate eventual dead rooms, then it sets up the roting system to serve the /// web sockets and listen for the pages enlist and reload. The server listens on port 8080. async fn main() { tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| format!("{}=trace", env!("CARGO_CRATE_NAME")).into()), ) .with( tracing_subscriber::fmt::layer() .with_file(true) .with_line_number(true) .with_target(true) // Modul-Path (e.g. relay_server::processing_module) .with_thread_ids(true) // Thread-ID (helpful for Tokio) .with_thread_names(true), // Thread-Name ) .init(); let database_url = std::env::var("DATABASE_URL") .unwrap_or_else(|_| "postgresql://trictrac:trictrac@127.0.0.1:5432/trictrac".to_string()); let pool = db::init_db(&database_url).await; let session_store = MemoryStore::default(); let session_layer = SessionManagerLayer::new(session_store) .with_secure(false) .with_expiry(Expiry::OnInactivity(TimeDuration::days(30))); let auth_backend = AuthBackend::new(pool.clone()); let auth_layer = AuthManagerLayerBuilder::new(auth_backend, session_layer).build(); let app_state = Arc::new(AppState::new(pool)); let watchdog_state = app_state.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1200)); // 20 Min loop { interval.tick().await; cleanup_dead_rooms(&watchdog_state).await; } }); let initial = reload_config(&app_state).await; if let Err(message) = initial { tracing::error!(message, "Initial load error."); panic!("Initial load error: {}", message); } let cors = CorsLayer::new() .allow_origin(AllowOrigin::list([ "http://localhost:9091".parse().unwrap(), // tic-tac-toe dev server "http://localhost:9092".parse().unwrap(), // portal dev server ])) .allow_methods([Method::GET, Method::POST, Method::OPTIONS]) .allow_headers([ HeaderName::from_static("content-type"), HeaderName::from_static("cookie"), ]) .allow_credentials(true); let app = Router::new() .route("/reload", get(reload_handler)) .route("/enlist", get(enlist_handler)) .route("/ws", get(websocket_handler)) .merge(http::router()) .nest_service("/portal", ServeDir::new("portal").not_found_service(ServeFile::new("portal/index.html"))) .with_state(app_state) .layer(auth_layer) .layer(cors) .fallback_service(ServeDir::new(".").not_found_service(ServeFile::new("index.html"))); let listener = tokio::net::TcpListener::bind("127.0.0.1:8080") .await .unwrap(); axum::serve(listener, app).await.unwrap(); } /// Runs over all rooms and checks if they are diconnected from the server. /// If so, it cleans them up. This is a fallback solution things should be handled internally otherwise. async fn cleanup_dead_rooms(state: &Arc) { let mut rooms = state.rooms.lock().await; rooms.retain(|room_id, room| { // Keep rooms where the host is actively connected. // Rooms with host_connected = false are in the grace period — the // grace-period task spawned by shutdown_connection owns their cleanup. let is_alive = room.host_connected && !room.to_host_sender.is_closed(); if !is_alive { tracing::info!("Removing dead room: {}", room_id); } is_alive }); } /// Generates a list with the current rooms, the amount of players and info if this is a dead room. async fn enlist_handler(State(state): State>) -> String { let rooms = state.rooms.lock().await; rooms .iter() .map(|(name, room)| { format!( "Room: {:<30} Variation: {:03} Players: {:03} is alive: {}", name, room.rule_variation, room.amount_of_players, !room.to_host_sender.is_closed() ) }) .collect::>() .join("\n") } /// Forces the reload of the config file and lists the content. This enables the adding of new games /// without restarting the service. async fn reload_handler(State(state): State>) -> String { let res = reload_config(&state).await; match res { Ok(_) => state .configs .read() .await .iter() .map(|(key, players)| { format!("Game: {:<40} Maximum Amount of Players: {}", key, players) }) .collect::>() .join("\n"), Err(e) => { format!("Config reload failed: {}", e) } } } /// This function gets immediately called and upgrades the web response to a web socket. async fn websocket_handler( ws: WebSocketUpgrade, auth_session: AuthSession, State(state): State>, ) -> impl IntoResponse { let user_id = auth_session.user.map(|u| u.id); ws.on_upgrade(move |socket| websocket(socket, state, user_id)) } /// Does the whole handling from start to finish: Handshake -> Handling of logic depending on if we are connected to /// the server or client -> Shut down processing. async fn websocket(stream: WebSocket, state: Arc, user_id: Option) { // By splitting, we can send and receive at the same time. let (mut sender, mut receiver) = stream.split(); let handshake_result = init_and_connect(&mut sender, &mut receiver, state.clone(), user_id).await; if handshake_result.is_none() { // We quit here, as the handshake did not work out. return; } let base_data = handshake_result.unwrap(); let disconnect_data = DisconnectData::from(&base_data); let success = inform_client_of_connection(&mut sender, &base_data).await; let wrapped_sender = Arc::new(Mutex::new(sender)); // Ping-Task to keep alive. let ping_sender = wrapped_sender.clone(); let ping_task = tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(30)); interval.tick().await; // Skip first tick. loop { interval.tick().await; let mut s = ping_sender.lock().await; if s.send(Message::Ping(Bytes::new())).await.is_err() { break; } } }); let mut error_message = "Connection to server lost"; if success { match base_data.specific_data { ClientServerSpecificData::Server(internal_receiver, internal_sender) => { error_message = handle_server_logic( wrapped_sender.clone(), receiver, internal_receiver, internal_sender, ) .await; } ClientServerSpecificData::Client(internal_receiver, internal_sender) => { error_message = handle_client_logic( wrapped_sender.clone(), receiver, internal_receiver, internal_sender, base_data.player_id, ) .await; } } } ping_task.abort(); shutdown_connection(wrapped_sender, disconnect_data, state, error_message).await; }