use std::{ collections::HashMap, convert::TryFrom, }; use chrono::{DateTime, Utc}; use dashmap::DashMap; use tokio::sync::{ Mutex, RwLock, oneshot, watch, }; use crate::{ Body, Config, RelayError, ShuttingDownError, }; use ptth_core::http_serde; /* Here's what we need to handle: When a request comes in: - Park the client in response_rendezvous - Look up the server ID in request_rendezvous - If a server is parked, unpark it and send the request - Otherwise, queue the request When a server comes to listen: - Look up the server ID in request_rendezvous - Either return all pending requests, or park the server When a server comes to respond: - Look up the parked client in response_rendezvous - Unpark the client and begin streaming So we need these lookups to be fast: - Server IDs, where (1 server) or (0 or many clients) can be parked - Request IDs, where 1 client is parked */ pub enum RequestRendezvous { ParkedClients (Vec ), ParkedServer (oneshot::Sender >), } type ResponseRendezvous = oneshot::Sender >; #[derive (Clone)] pub struct ServerStatus { pub last_seen: DateTime , } impl Default for ServerStatus { fn default () -> Self { Self { last_seen: Utc::now (), } } } #[derive (Clone)] pub struct RejectedServer { pub name: String, pub tripcode: blake3::Hash, pub seen: DateTime , } pub struct RelayState { pub config: RwLock , // Key: Server ID pub request_rendezvous: Mutex >, pub server_status: Mutex >, // Key: Request ID pub response_rendezvous: RwLock >, pub shutdown_watch_tx: watch::Sender , pub shutdown_watch_rx: watch::Receiver , // List of recently rejected server names (Used to approve servers) pub unregistered_servers: RwLock >, } impl TryFrom for RelayState { type Error = RelayError; fn try_from (config: Config) -> Result { let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false); Ok (Self { config: config.into (), request_rendezvous: Default::default (), server_status: Default::default (), response_rendezvous: Default::default (), shutdown_watch_tx, shutdown_watch_rx, unregistered_servers: Default::default (), }) } } impl RelayState { pub async fn list_servers (&self) -> Vec { self.request_rendezvous.lock ().await.iter () .map (|(k, _)| (*k).clone ()) .collect () } }