use std::{ collections::HashMap, convert::TryFrom, time::Instant, }; use chrono::{DateTime, Utc}; use dashmap::DashMap; use tokio::sync::{ Mutex, RwLock, oneshot, watch, }; use crate::{ Body, Config, RelayError, ShuttingDownError, }; use ptth_core::http_serde; /* Here's what we need to handle: When a request comes in: - Park the client in response_rendezvous - Look up the server ID in request_rendezvous - If a server is parked, unpark it and send the request - Otherwise, queue the request When a server comes to listen: - Look up the server ID in request_rendezvous - Either return all pending requests, or park the server When a server comes to respond: - Look up the parked client in response_rendezvous - Unpark the client and begin streaming So we need these lookups to be fast: - Server IDs, where (1 server) or (0 or many clients) can be parked - Request IDs, where 1 client is parked */ pub enum RequestRendezvous { ParkedClients (Vec ), ParkedServer (oneshot::Sender >), } type ResponseRendezvous = oneshot::Sender >; #[derive (Clone)] pub struct ServerStatus { pub last_seen: DateTime , } impl Default for ServerStatus { fn default () -> Self { Self { last_seen: Utc::now (), } } } pub struct Relay { pub config: RwLock , // Key: Server ID pub request_rendezvous: Mutex >, pub server_status: Mutex >, // Key: Request ID pub response_rendezvous: RwLock >, pub shutdown_watch_tx: watch::Sender , pub shutdown_watch_rx: watch::Receiver , // List of recently rejected server names (Used to approve servers) pub unregistered_servers: BoundedVec , // Memory backend for audit logging // TODO: Add file / database / network server logging backend pub audit_log: BoundedVec , } #[derive (Clone)] pub struct RejectedServer { pub name: String, pub tripcode: blake3::Hash, pub seen: DateTime , } #[derive (Clone, Debug)] pub struct AuditEvent { time_monotonic: Instant, time_utc: DateTime , data: AuditData, } #[derive (Clone, Debug)] pub enum AuditData { RelayStart, WebClientGet { req_id: String, server_name: String, }, } impl AuditEvent { pub fn new (data: AuditData) -> Self { Self { time_monotonic: Instant::now (), time_utc: Utc::now (), data, } } } pub struct BoundedVec { bound: usize, v: RwLock >, } impl BoundedVec { pub fn new (bound: usize) -> Self { Self { bound, v: Default::default (), } } pub async fn to_vec (&self) -> Vec { let guard = self.v.read ().await; (*guard).clone () } // Not mut because we have a RwLock // One of the only problems with Rust is that // 'mut' doesn't _really_ -mean 'mutable' once you're // multi-threading or using async pub async fn push (&self, x: T) { let mut guard = self.v.write ().await; guard.push (x); while guard.len () > self.bound { guard.remove (0); } } } impl TryFrom for Relay { type Error = RelayError; fn try_from (config: Config) -> Result { let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false); Ok (Self { config: config.into (), request_rendezvous: Default::default (), server_status: Default::default (), response_rendezvous: Default::default (), shutdown_watch_tx, shutdown_watch_rx, unregistered_servers: BoundedVec::new (20), audit_log: BoundedVec::new (256), }) } } impl Relay { pub async fn list_servers (&self) -> Vec { self.request_rendezvous.lock ().await.iter () .map (|(k, _)| (*k).clone ()) .collect () } }