234 lines
8.4 KiB
Rust
234 lines
8.4 KiB
Rust
mod auth;
|
|
mod db;
|
|
mod hand_shake;
|
|
mod http;
|
|
mod lobby;
|
|
mod message_relay;
|
|
|
|
use crate::auth::AuthBackend;
|
|
use crate::hand_shake::{
|
|
ClientServerSpecificData, DisconnectData, inform_client_of_connection, init_and_connect,
|
|
shutdown_connection,
|
|
};
|
|
use crate::lobby::{AppState, reload_config};
|
|
use crate::message_relay::{handle_client_logic, handle_server_logic};
|
|
use axum::Router;
|
|
use axum::extract::ws::{Message, WebSocket};
|
|
use axum::extract::{State, WebSocketUpgrade};
|
|
use axum::http::{HeaderName, Method};
|
|
use axum::response::IntoResponse;
|
|
use axum::routing::get;
|
|
use axum_login::{AuthManagerLayerBuilder, AuthSession};
|
|
use bytes::Bytes;
|
|
use futures_util::SinkExt;
|
|
use futures_util::stream::StreamExt;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use time::Duration as TimeDuration;
|
|
use tokio::sync::Mutex;
|
|
use tower_http::cors::{AllowOrigin, CorsLayer};
|
|
use tower_http::services::{ServeDir, ServeFile};
|
|
use tower_sessions::MemoryStore;
|
|
use tower_sessions::{Expiry, SessionManagerLayer};
|
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
|
|
|
#[tokio::main]
|
|
/// Activates error tracing, spawns a watch dog task to eliminate eventual dead rooms, then it sets up the roting system to serve the
|
|
/// web sockets and listen for the pages enlist and reload. The server listens on port 8080.
|
|
async fn main() {
|
|
tracing_subscriber::registry()
|
|
.with(
|
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| format!("{}=trace", env!("CARGO_CRATE_NAME")).into()),
|
|
)
|
|
.with(
|
|
tracing_subscriber::fmt::layer()
|
|
.with_file(true)
|
|
.with_line_number(true)
|
|
.with_target(true) // Modul-Path (e.g. relay_server::processing_module)
|
|
.with_thread_ids(true) // Thread-ID (helpful for Tokio)
|
|
.with_thread_names(true), // Thread-Name
|
|
)
|
|
.init();
|
|
|
|
let database_url = std::env::var("DATABASE_URL")
|
|
.unwrap_or_else(|_| "postgresql://trictrac:trictrac@127.0.0.1:5432/trictrac".to_string());
|
|
let pool = db::init_db(&database_url).await;
|
|
|
|
let session_store = MemoryStore::default();
|
|
let session_layer = SessionManagerLayer::new(session_store)
|
|
.with_secure(false)
|
|
.with_expiry(Expiry::OnInactivity(TimeDuration::days(30)));
|
|
|
|
let auth_backend = AuthBackend::new(pool.clone());
|
|
let auth_layer = AuthManagerLayerBuilder::new(auth_backend, session_layer).build();
|
|
|
|
let app_state = Arc::new(AppState::new(pool));
|
|
let watchdog_state = app_state.clone();
|
|
tokio::spawn(async move {
|
|
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1200)); // 20 Min
|
|
loop {
|
|
interval.tick().await;
|
|
cleanup_dead_rooms(&watchdog_state).await;
|
|
}
|
|
});
|
|
|
|
let initial = reload_config(&app_state).await;
|
|
if let Err(message) = initial {
|
|
tracing::error!(message, "Initial load error.");
|
|
panic!("Initial load error: {}", message);
|
|
}
|
|
|
|
let cors = CorsLayer::new()
|
|
.allow_origin(AllowOrigin::list([
|
|
"http://localhost:9091".parse().unwrap(), // unified web dev server
|
|
]))
|
|
.allow_methods([Method::GET, Method::POST, Method::OPTIONS])
|
|
.allow_headers([
|
|
HeaderName::from_static("content-type"),
|
|
HeaderName::from_static("cookie"),
|
|
])
|
|
.allow_credentials(true);
|
|
|
|
let app = Router::new()
|
|
.route("/reload", get(reload_handler))
|
|
.route("/enlist", get(enlist_handler))
|
|
.route("/ws", get(websocket_handler))
|
|
.merge(http::router())
|
|
.with_state(app_state)
|
|
.layer(auth_layer)
|
|
.layer(cors)
|
|
.fallback_service(ServeDir::new(".").not_found_service(ServeFile::new("index.html")));
|
|
|
|
let listener = tokio::net::TcpListener::bind("127.0.0.1:8080")
|
|
.await
|
|
.unwrap();
|
|
|
|
axum::serve(listener, app).await.unwrap();
|
|
}
|
|
|
|
/// Runs over all rooms and checks if they are diconnected from the server.
|
|
/// If so, it cleans them up. This is a fallback solution things should be handled internally otherwise.
|
|
async fn cleanup_dead_rooms(state: &Arc<AppState>) {
|
|
let mut rooms = state.rooms.lock().await;
|
|
rooms.retain(|room_id, room| {
|
|
// Keep rooms where the host is actively connected.
|
|
// Rooms with host_connected = false are in the grace period — the
|
|
// grace-period task spawned by shutdown_connection owns their cleanup.
|
|
let is_alive = room.host_connected && !room.to_host_sender.is_closed();
|
|
if !is_alive {
|
|
tracing::info!("Removing dead room: {}", room_id);
|
|
}
|
|
is_alive
|
|
});
|
|
}
|
|
|
|
/// Generates a list with the current rooms, the amount of players and info if this is a dead room.
|
|
async fn enlist_handler(State(state): State<Arc<AppState>>) -> String {
|
|
let rooms = state.rooms.lock().await;
|
|
rooms
|
|
.iter()
|
|
.map(|(name, room)| {
|
|
format!(
|
|
"Room: {:<30} Variation: {:03} Players: {:03} is alive: {}",
|
|
name,
|
|
room.rule_variation,
|
|
room.amount_of_players,
|
|
!room.to_host_sender.is_closed()
|
|
)
|
|
})
|
|
.collect::<Vec<_>>()
|
|
.join("\n")
|
|
}
|
|
|
|
/// Forces the reload of the config file and lists the content. This enables the adding of new games
|
|
/// without restarting the service.
|
|
async fn reload_handler(State(state): State<Arc<AppState>>) -> String {
|
|
let res = reload_config(&state).await;
|
|
match res {
|
|
Ok(_) => state
|
|
.configs
|
|
.read()
|
|
.await
|
|
.iter()
|
|
.map(|(key, players)| {
|
|
format!("Game: {:<40} Maximum Amount of Players: {}", key, players)
|
|
})
|
|
.collect::<Vec<_>>()
|
|
.join("\n"),
|
|
Err(e) => {
|
|
format!("Config reload failed: {}", e)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// This function gets immediately called and upgrades the web response to a web socket.
|
|
async fn websocket_handler(
|
|
ws: WebSocketUpgrade,
|
|
auth_session: AuthSession<AuthBackend>,
|
|
State(state): State<Arc<AppState>>,
|
|
) -> impl IntoResponse {
|
|
let user_id = auth_session.user.map(|u| u.id);
|
|
ws.on_upgrade(move |socket| websocket(socket, state, user_id))
|
|
}
|
|
|
|
/// Does the whole handling from start to finish: Handshake -> Handling of logic depending on if we are connected to
|
|
/// the server or client -> Shut down processing.
|
|
async fn websocket(stream: WebSocket, state: Arc<AppState>, user_id: Option<i64>) {
|
|
// By splitting, we can send and receive at the same time.
|
|
let (mut sender, mut receiver) = stream.split();
|
|
|
|
let handshake_result =
|
|
init_and_connect(&mut sender, &mut receiver, state.clone(), user_id).await;
|
|
if handshake_result.is_none() {
|
|
// We quit here, as the handshake did not work out.
|
|
return;
|
|
}
|
|
let base_data = handshake_result.unwrap();
|
|
|
|
let disconnect_data = DisconnectData::from(&base_data);
|
|
let success = inform_client_of_connection(&mut sender, &base_data).await;
|
|
let wrapped_sender = Arc::new(Mutex::new(sender));
|
|
|
|
// Ping-Task to keep alive.
|
|
let ping_sender = wrapped_sender.clone();
|
|
let ping_task = tokio::spawn(async move {
|
|
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
|
interval.tick().await; // Skip first tick.
|
|
loop {
|
|
interval.tick().await;
|
|
let mut s = ping_sender.lock().await;
|
|
if s.send(Message::Ping(Bytes::new())).await.is_err() {
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
let mut error_message = "Connection to server lost";
|
|
if success {
|
|
match base_data.specific_data {
|
|
ClientServerSpecificData::Server(internal_receiver, internal_sender) => {
|
|
error_message = handle_server_logic(
|
|
wrapped_sender.clone(),
|
|
receiver,
|
|
internal_receiver,
|
|
internal_sender,
|
|
)
|
|
.await;
|
|
}
|
|
ClientServerSpecificData::Client(internal_receiver, internal_sender) => {
|
|
error_message = handle_client_logic(
|
|
wrapped_sender.clone(),
|
|
receiver,
|
|
internal_receiver,
|
|
internal_sender,
|
|
base_data.player_id,
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
|
|
ping_task.abort();
|
|
shutdown_connection(wrapped_sender, disconnect_data, state, error_message).await;
|
|
}
|