From 5f947ed73cafd320433a3f8cd2f7f69873e3e004 Mon Sep 17 00:00:00 2001 From: _ <> Date: Sun, 13 Dec 2020 03:29:54 +0000 Subject: [PATCH] :shirt: refactor: Extract relay_state module --- crates/ptth_relay/src/lib.rs | 96 +------------------ crates/ptth_relay/src/relay_state.rs | 116 +++++++++++++++++++++++ crates/ptth_relay/src/server_endpoint.rs | 2 + crates/ptth_relay/src/tests.rs | 11 +++ todo.md | 1 - 5 files changed, 132 insertions(+), 94 deletions(-) create mode 100644 crates/ptth_relay/src/relay_state.rs diff --git a/crates/ptth_relay/src/lib.rs b/crates/ptth_relay/src/lib.rs index 999e5d9..1c1c7fa 100644 --- a/crates/ptth_relay/src/lib.rs +++ b/crates/ptth_relay/src/lib.rs @@ -17,7 +17,6 @@ use std::{ borrow::Cow, collections::HashMap, - convert::TryFrom, iter::FromIterator, net::SocketAddr, path::{Path, PathBuf}, @@ -46,10 +45,7 @@ use serde::{ }; use tokio::{ sync::{ - Mutex, oneshot, - RwLock, - watch, }, }; @@ -63,101 +59,15 @@ pub mod config; pub mod errors; pub mod git_version; pub mod key_validity; +mod relay_state; mod server_endpoint; pub use config::Config; pub use errors::*; +pub use relay_state::RelayState; -/* - -Here's what we need to handle: - -When a request comes in: - -- Park the client in response_rendezvous -- Look up the server ID in request_rendezvous -- If a server is parked, unpark it and send the request -- Otherwise, queue the request - -When a server comes to listen: - -- Look up the server ID in request_rendezvous -- Either return all pending requests, or park the server - -When a server comes to respond: - -- Look up the parked client in response_rendezvous -- Unpark the client and begin streaming - -So we need these lookups to be fast: - -- Server IDs, where (1 server) or (0 or many clients) -can be parked -- Request IDs, where 1 client is parked - -*/ - -enum RequestRendezvous { - ParkedClients (Vec ), - ParkedServer (oneshot::Sender >), -} - -type ResponseRendezvous = oneshot::Sender >; - -#[derive (Clone)] -pub struct ServerStatus { - last_seen: DateTime , -} - -impl Default for ServerStatus { - fn default () -> Self { - Self { - last_seen: Utc::now (), - } - } -} - -pub struct RelayState { - config: RwLock , - handlebars: Arc >, - - // Key: Server ID - request_rendezvous: Mutex >, - server_status: Mutex >, - - // Key: Request ID - response_rendezvous: RwLock >, - - shutdown_watch_tx: watch::Sender , - shutdown_watch_rx: watch::Receiver , -} - -impl TryFrom for RelayState { - type Error = RelayError; - - fn try_from (config: Config) -> Result { - let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false); - - Ok (Self { - config: config.into (), - handlebars: Arc::new (load_templates (&PathBuf::new ())?), - request_rendezvous: Default::default (), - server_status: Default::default (), - response_rendezvous: Default::default (), - shutdown_watch_tx, - shutdown_watch_rx, - }) - } -} - -impl RelayState { - pub async fn list_servers (&self) -> Vec { - self.request_rendezvous.lock ().await.iter () - .map (|(k, _)| (*k).clone ()) - .collect () - } -} +use relay_state::*; fn ok_reply > (b: B) -> Result , http::Error> diff --git a/crates/ptth_relay/src/relay_state.rs b/crates/ptth_relay/src/relay_state.rs new file mode 100644 index 0000000..6b7f0c1 --- /dev/null +++ b/crates/ptth_relay/src/relay_state.rs @@ -0,0 +1,116 @@ +use std::{ + collections::HashMap, + convert::TryFrom, + path::{PathBuf}, + sync::Arc, +}; + +use chrono::{DateTime, Utc}; +use dashmap::DashMap; +use handlebars::Handlebars; +use tokio::sync::{ + Mutex, + RwLock, + oneshot, + watch, +}; + +use crate::{ + Body, + Config, + RelayError, + ShuttingDownError, + load_templates, +}; + +use ptth_core::http_serde; + +/* + +Here's what we need to handle: + +When a request comes in: + +- Park the client in response_rendezvous +- Look up the server ID in request_rendezvous +- If a server is parked, unpark it and send the request +- Otherwise, queue the request + +When a server comes to listen: + +- Look up the server ID in request_rendezvous +- Either return all pending requests, or park the server + +When a server comes to respond: + +- Look up the parked client in response_rendezvous +- Unpark the client and begin streaming + +So we need these lookups to be fast: + +- Server IDs, where (1 server) or (0 or many clients) +can be parked +- Request IDs, where 1 client is parked + +*/ + +pub enum RequestRendezvous { + ParkedClients (Vec ), + ParkedServer (oneshot::Sender >), +} + +type ResponseRendezvous = oneshot::Sender >; + +#[derive (Clone)] +pub struct ServerStatus { + pub last_seen: DateTime , +} + +impl Default for ServerStatus { + fn default () -> Self { + Self { + last_seen: Utc::now (), + } + } +} + +pub struct RelayState { + pub config: RwLock , + pub handlebars: Arc >, + + // Key: Server ID + pub request_rendezvous: Mutex >, + pub server_status: Mutex >, + + // Key: Request ID + pub response_rendezvous: RwLock >, + + pub shutdown_watch_tx: watch::Sender , + pub shutdown_watch_rx: watch::Receiver , +} + +impl TryFrom for RelayState { + type Error = RelayError; + + fn try_from (config: Config) -> Result { + let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false); + + Ok (Self { + config: config.into (), + handlebars: Arc::new (load_templates (&PathBuf::new ())?), + request_rendezvous: Default::default (), + server_status: Default::default (), + response_rendezvous: Default::default (), + shutdown_watch_tx, + shutdown_watch_rx, + }) + } +} + +impl RelayState { + pub async fn list_servers (&self) -> Vec { + self.request_rendezvous.lock ().await.iter () + .map (|(k, _)| (*k).clone ()) + .collect () + } +} diff --git a/crates/ptth_relay/src/server_endpoint.rs b/crates/ptth_relay/src/server_endpoint.rs index c6e9795..f907552 100644 --- a/crates/ptth_relay/src/server_endpoint.rs +++ b/crates/ptth_relay/src/server_endpoint.rs @@ -75,6 +75,8 @@ pub async fn handle_listen ( // End of early returns { + // TODO: Move into relay_state.rs + let mut server_status = state.server_status.lock ().await; let mut status = server_status.entry (watcher_code.clone ()).or_insert_with (Default::default); diff --git a/crates/ptth_relay/src/tests.rs b/crates/ptth_relay/src/tests.rs index 6b80c37..b40cc1a 100644 --- a/crates/ptth_relay/src/tests.rs +++ b/crates/ptth_relay/src/tests.rs @@ -1,3 +1,5 @@ +use tokio::runtime::Runtime; + use super::*; #[test] @@ -23,3 +25,12 @@ fn test_pretty_print_last_seen () { assert_eq! (actual, expected); } } + +#[test] +fn scraper_endpoints () { + let mut rt = Runtime::new ().expect ("Can't create runtime for testing"); + + rt.block_on (async { + + }); +} diff --git a/todo.md b/todo.md index da42c81..7f9fcf4 100644 --- a/todo.md +++ b/todo.md @@ -1,7 +1,6 @@ Interesting issues will get a unique ID with `dd if=/dev/urandom bs=5 count=1 | base32` -- Report server version in HTML - [YNQAQKJS](issues/2020-12Dec/auth-route-YNQAQKJS.md) Open new auth route for spiders / scrapers - Track / Estimate bandwidth per server? - EOTPXGR3 Remote `tail -f` (_Complicated_) (Maybe use chunked encoding or something?)