From 05a83854f97417b1a02d0f03eaedda351413f698 Mon Sep 17 00:00:00 2001 From: _ <> Date: Tue, 3 Nov 2020 16:00:50 +0000 Subject: [PATCH 1/7] Running for real. Lots of todos added --- Dockerfile | 2 +- todo.md | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 52b3318..a53934e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,7 +8,7 @@ RUN apt-get update \ # Make sure the dependencies are all cached so we won't hammer crates.io ADD old-git.tar.gz . -RUN git checkout 16984ddcd3c9cdc04b2c4c3625eb83176c1b2dda \ +RUN git checkout 690f07dab67111a75fe190f014c8c23ef1753598 \ && git reset --hard \ && cargo check diff --git a/todo.md b/todo.md index 9987cc8..f76f183 100644 --- a/todo.md +++ b/todo.md @@ -1,3 +1,9 @@ +- Add file size in directory listing +- Large text files not streaming as expected? +- Allow spaces in server names +- Make file_server_root mandatory +- Deny unused HTTP methods for endpoints +- Hide ptth_server.toml from file server - ETag cache - Server-side hash? - Log / audit log? From a94741261f8b3d8839d1b7638a376d0842dd669c Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Mon, 2 Nov 2020 19:41:36 -0600 Subject: [PATCH 2/7] :recycle: Remove some unused code --- src/bin/ptth_server.rs | 8 +---- src/lib.rs | 6 +--- src/relay/mod.rs | 2 -- src/relay/watcher.rs | 79 ------------------------------------------ src/server/mod.rs | 9 +---- 5 files changed, 3 insertions(+), 101 deletions(-) delete mode 100644 src/relay/watcher.rs diff --git a/src/bin/ptth_server.rs b/src/bin/ptth_server.rs index 3d972c1..1577ae0 100644 --- a/src/bin/ptth_server.rs +++ b/src/bin/ptth_server.rs @@ -27,11 +27,5 @@ async fn main () -> Result <(), Box > { toml::from_str (&config_s).expect (&format! ("Can't parse {:?} as TOML", config_file_path)) }; - let opt = Opt::from_args (); - - let opt = ptth::server::Opt { - - }; - - ptth::server::main (config_file, opt).await + ptth::server::main (config_file).await } diff --git a/src/lib.rs b/src/lib.rs index 7d53291..d610009 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -114,11 +114,7 @@ mod tests { file_server_root: None, }; spawn (async move { - let opt = server::Opt { - - }; - - server::main (config_file, opt).await.unwrap (); + server::main (config_file).await.unwrap (); }); tokio::time::delay_for (std::time::Duration::from_millis (500)).await; diff --git a/src/relay/mod.rs b/src/relay/mod.rs index cb84e4f..30844b3 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -1,5 +1,3 @@ -pub mod watcher; - use std::{ error::Error, collections::*, diff --git a/src/relay/watcher.rs b/src/relay/watcher.rs deleted file mode 100644 index 69820ed..0000000 --- a/src/relay/watcher.rs +++ /dev/null @@ -1,79 +0,0 @@ -// Watchers on the wall was the last good episode of Game of Thrones - -use std::{ - collections::*, - sync::Arc, - time::Duration, -}; - -use futures::channel::oneshot; -use tokio::{ - sync::Mutex, - time::delay_for, -}; - -pub struct Watchers { - pub senders: HashMap >, -} - -impl Default for Watchers { - fn default () -> Self { - Self { - senders: Default::default (), - } - } -} - -impl Watchers { - pub fn add_watcher_with_id (&mut self, s: oneshot::Sender , id: String) { - self.senders.insert (id, s); - } - - pub fn remove_watcher (&mut self, id: &str) { - self.senders.remove (id); - } - - pub fn wake_one (&mut self, msg: T, id: &str) -> bool { - //println! ("wake_one {}", id); - - if let Some (waiter) = self.senders.remove (id) { - waiter.send (msg).ok (); - true - } - else { - false - } - } - - pub fn num_watchers (&self) -> usize { - self.senders.len () - } - - pub async fn long_poll (that: Arc >, id: String) -> Option { - //println! ("long_poll {}", id); - - let (s, r) = oneshot::channel (); - let timeout = Duration::from_secs (5); - - let id_2 = id.clone (); - { - let mut that = that.lock ().await; - that.add_watcher_with_id (s, id_2); - //println! ("Added server {}", id); - } - - tokio::spawn (async move { - delay_for (timeout).await; - let mut that = that.lock ().await; - that.remove_watcher (&id); - //println! ("Removed server {}", id); - }); - - if let Ok (message) = r.await { - Some (message) - } - else { - None - } - } -} diff --git a/src/server/mod.rs b/src/server/mod.rs index dfba1df..d94c4bc 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -24,7 +24,6 @@ pub mod file_server; struct ServerState { config: Config, - opt: Opt, handlebars: Handlebars <'static>, client: Client, } @@ -109,12 +108,7 @@ pub struct Config { pub file_server_root: Option , } -#[derive (Clone)] -pub struct Opt { - -} - -pub async fn main (config_file: ConfigFile, opt: Opt) +pub async fn main (config_file: ConfigFile) -> Result <(), Box > { use std::convert::TryInto; @@ -140,7 +134,6 @@ pub async fn main (config_file: ConfigFile, opt: Opt) relay_url: config_file.relay_url, file_server_root: config_file.file_server_root, }, - opt, handlebars, client, }); From 7e697df35a341f66d05a890fe5a8e15747b94711 Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Thu, 5 Nov 2020 21:35:55 -0600 Subject: [PATCH 3/7] :bug: Fix graceful shutdown of relay and server in end_to_end test --- Cargo.toml | 3 ++ src/bin/ptth_relay.rs | 8 ++--- src/bin/ptth_server.rs | 10 +++--- src/lib.rs | 48 +++++++++++++++++++++++------ src/relay/mod.rs | 70 ++++++++++++++++++++++++++++++++++-------- src/server/mod.rs | 31 ++++++++++++++++++- 6 files changed, 139 insertions(+), 31 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 04faa6b..6972530 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,5 +29,8 @@ rmp-serde = "0.14.4" serde = {version = "1.0.117", features = ["derive"]} structopt = "0.3.20" tokio = { version = "0.2.22", features = ["full"] } +tracing = "0.1.21" +tracing-futures = "0.2.4" +tracing-subscriber = "0.2.15" toml = "0.5.7" ulid = "0.4.1" diff --git a/src/bin/ptth_relay.rs b/src/bin/ptth_relay.rs index 4c57502..4f5ad9a 100644 --- a/src/bin/ptth_relay.rs +++ b/src/bin/ptth_relay.rs @@ -16,13 +16,13 @@ async fn main () -> Result <(), Box > { let config_file = { let config_file_path = "config/ptth_relay.toml"; - let mut f = File::open (config_file_path).expect (&format! ("Can't open {:?}", config_file_path)); + let mut f = File::open (config_file_path).unwrap_or_else (|_| panic! ("Can't open {:?}", config_file_path)); let mut buffer = vec! [0u8; 4096]; - let bytes_read = f.read (&mut buffer).expect (&format! ("Can't read {:?}", config_file_path)); + let bytes_read = f.read (&mut buffer).unwrap_or_else (|_| panic! ("Can't read {:?}", config_file_path)); buffer.truncate (bytes_read); - let config_s = String::from_utf8 (buffer).expect (&format! ("Can't parse {:?} as UTF-8", config_file_path)); - toml::from_str (&config_s).expect (&format! ("Can't parse {:?} as TOML", config_file_path)) + let config_s = String::from_utf8 (buffer).unwrap_or_else (|_| panic! ("Can't parse {:?} as UTF-8", config_file_path)); + toml::from_str (&config_s).unwrap_or_else (|_| panic! ("Can't parse {:?} as TOML", config_file_path)) }; eprintln! ("ptth_relay Git version: {:?}", ptth::git_version::GIT_VERSION); diff --git a/src/bin/ptth_server.rs b/src/bin/ptth_server.rs index 1577ae0..1ec88a0 100644 --- a/src/bin/ptth_server.rs +++ b/src/bin/ptth_server.rs @@ -18,14 +18,14 @@ async fn main () -> Result <(), Box > { let config_file = { let config_file_path = "config/ptth_server.toml"; - let mut f = std::fs::File::open (config_file_path).expect (&format! ("Can't open {:?}", config_file_path)); + let mut f = std::fs::File::open (config_file_path).unwrap_or_else (|_| panic! ("Can't open {:?}", config_file_path)); let mut buffer = vec! [0u8; 4096]; - let bytes_read = f.read (&mut buffer).expect (&format! ("Can't read {:?}", config_file_path)); + let bytes_read = f.read (&mut buffer).unwrap_or_else (|_| panic! ("Can't read {:?}", config_file_path)); buffer.truncate (bytes_read); - let config_s = String::from_utf8 (buffer).expect (&format! ("Can't parse {:?} as UTF-8", config_file_path)); - toml::from_str (&config_s).expect (&format! ("Can't parse {:?} as TOML", config_file_path)) + let config_s = String::from_utf8 (buffer).unwrap_or_else (|_| panic! ("Can't parse {:?} as UTF-8", config_file_path)); + toml::from_str (&config_s).unwrap_or_else (|_| panic! ("Can't parse {:?} as TOML", config_file_path)) }; - ptth::server::main (config_file).await + ptth::server::main (config_file, None).await } diff --git a/src/lib.rs b/src/lib.rs index d610009..a4ceb98 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,12 +41,21 @@ pub fn password_is_bad (mut password: String) -> bool { #[cfg (test)] mod tests { use std::{ - sync::Arc, + sync::{ + Arc, + atomic::{ + AtomicBool, + Ordering, + }, + }, + time::Duration, }; use tokio::{ runtime::Runtime, spawn, + sync::oneshot, + time::delay_for, }; use super::{ @@ -58,7 +67,7 @@ mod tests { fn check_bad_passwords () { use crate::password_is_bad; - for pw in vec! [ + for pw in &[ "password", "pAsSwOrD", "secret", @@ -80,6 +89,9 @@ mod tests { fn end_to_end () { use maplit::*; use reqwest::Client; + use tracing::{info}; + + tracing_subscriber::fmt::init (); let mut rt = Runtime::new ().unwrap (); @@ -99,8 +111,9 @@ mod tests { let relay_state = Arc::new (relay::RelayState::from (&config_file)); let relay_state_2 = relay_state.clone (); - spawn (async move { - relay::run_relay (relay_state_2, None).await.unwrap (); + let (stop_relay_tx, stop_relay_rx) = oneshot::channel (); + let task_relay = spawn (async move { + relay::run_relay (relay_state_2, Some (stop_relay_rx)).await.unwrap (); }); assert! (relay_state.list_servers ().await.is_empty ()); @@ -113,18 +126,22 @@ mod tests { relay_url: "http://127.0.0.1:4000/7ZSFUKGV".into (), file_server_root: None, }; - spawn (async move { - server::main (config_file).await.unwrap (); - }); + let stop_server_atomic = Arc::new (AtomicBool::from (false)); + let task_server = { + let stop_server_atomic = stop_server_atomic.clone (); + spawn (async move { + server::main (config_file, Some (stop_server_atomic)).await.unwrap (); + }) + }; - tokio::time::delay_for (std::time::Duration::from_millis (500)).await; + delay_for (Duration::from_millis (500)).await; assert_eq! (relay_state.list_servers ().await, vec! [ server_name.to_string (), ]); let client = Client::builder () - .timeout (std::time::Duration::from_secs (2)) + .timeout (Duration::from_secs (2)) .build ().unwrap (); let resp = client.get (&format! ("{}/frontend/relay_up_check", relay_url)) @@ -156,6 +173,19 @@ mod tests { .send ().await.unwrap (); assert_eq! (resp.status (), reqwest::StatusCode::NOT_FOUND); + + info! ("Shutting down end-to-end test"); + + stop_server_atomic.store (true, Ordering::Relaxed); + stop_relay_tx.send (()).unwrap (); + + info! ("Sent stop messages"); + + task_relay.await.unwrap (); + info! ("Relay stopped"); + + task_server.await.unwrap (); + info! ("Server stopped"); }); } } diff --git a/src/relay/mod.rs b/src/relay/mod.rs index 30844b3..18a7127 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -10,7 +10,6 @@ use std::{ }; use dashmap::DashMap; -use futures::channel::oneshot; use handlebars::Handlebars; use hyper::{ Body, @@ -26,7 +25,10 @@ use serde::{ Serialize, }; use tokio::{ - sync::Mutex, + sync::{ + Mutex, + oneshot, + }, }; use crate::{ @@ -183,17 +185,23 @@ async fn handle_http_listen ( if let Some (ParkedClients (v)) = request_rendezvous.remove (&watcher_code) { + // 1 or more clients were parked - Make the server + // handle them immediately + return status_reply (StatusCode::OK, rmp_serde::to_vec (&v).unwrap ()); } request_rendezvous.insert (watcher_code, ParkedServer (tx)); } - let one_req = vec! [ - rx.await.unwrap (), - ]; + // No clients were parked - make the server long-poll - return status_reply (StatusCode::OK, rmp_serde::to_vec (&one_req).unwrap ()); + let one_req = match rx.await { + Ok (r) => r, + Err (_) => return status_reply (StatusCode::SERVICE_UNAVAILABLE, "Server is shutting down, try again soon"), + }; + + status_reply (StatusCode::OK, rmp_serde::to_vec (&vec! [one_req]).unwrap ()) } async fn handle_http_response ( @@ -385,11 +393,13 @@ pub fn load_templates () Ok (handlebars) } +use tracing::info; + pub async fn run_relay ( state: Arc , - shutdown_oneshot: Option > + shutdown_oneshot: Option > ) - -> Result <(), Box > +-> Result <(), Box > { let addr = SocketAddr::from (( [0, 0, 0, 0], @@ -406,7 +416,7 @@ pub async fn run_relay ( } } - eprintln! ("Loaded {} server tripcodes", state.config.server_tripcodes.len ()); + info! ("Loaded {} server tripcodes", state.config.server_tripcodes.len ()); let make_svc = make_service_fn (|_conn| { let state = state.clone (); @@ -424,16 +434,52 @@ pub async fn run_relay ( .serve (make_svc); match shutdown_oneshot { - Some (rx) => server.with_graceful_shutdown (async { - rx.await.ok (); - }).await?, + Some (rx) => { + info! ("Configured for graceful shutdown"); + server.with_graceful_shutdown (async { + rx.await.ok (); + + state.response_rendezvous.clear (); + + let mut request_rendezvoux = state.request_rendezvous.lock ().await; + request_rendezvoux.clear (); + + info! ("Received graceful shutdown"); + }).await? + }, None => server.await?, }; + info! ("Exiting"); Ok (()) } #[cfg (test)] mod tests { + use std::time::Duration; + use tokio::{ + runtime::Runtime, + spawn, + sync::oneshot, + time::delay_for, + }; + + #[test] + fn so_crazy_it_might_work () { + let mut rt = Runtime::new ().unwrap (); + + rt.block_on (async { + let (tx, rx) = oneshot::channel (); + + let task_1 = spawn (async move { + delay_for (Duration::from_secs (1)).await; + tx.send (()).unwrap (); + }); + + rx.await.unwrap (); + + task_1.await.unwrap (); + }); + } } diff --git a/src/server/mod.rs b/src/server/mod.rs index d94c4bc..5538d06 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -108,7 +108,17 @@ pub struct Config { pub file_server_root: Option , } -pub async fn main (config_file: ConfigFile) +use std::sync::atomic::{ + AtomicBool, + Ordering, +}; + +use tracing::info; + +pub async fn main ( + config_file: ConfigFile, + shutdown_atomic: Option > +) -> Result <(), Box > { use std::convert::TryInto; @@ -126,6 +136,7 @@ pub async fn main (config_file: ConfigFile) let client = Client::builder () .default_headers (headers) + .timeout (Duration::from_secs (30)) .build ().unwrap (); let handlebars = file_server::load_templates ()?; @@ -141,10 +152,24 @@ pub async fn main (config_file: ConfigFile) let mut backoff_delay = 0; loop { + if let Some (a) = &shutdown_atomic { + if a.load (Ordering::Relaxed) { + break; + } + } + if backoff_delay > 0 { delay_for (Duration::from_millis (backoff_delay)).await; } + if let Some (a) = &shutdown_atomic { + if a.load (Ordering::Relaxed) { + break; + } + } + + info! ("http_listen"); + let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, config_file.name)); let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500); @@ -177,4 +202,8 @@ pub async fn main (config_file: ConfigFile) handle_req_resp (state, req_resp).await; }); } + + info! ("Exiting"); + + Ok (()) } From 3a50424a352c57c39e69c562211bdfff0f385ecc Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Thu, 5 Nov 2020 21:44:32 -0600 Subject: [PATCH 4/7] :recycle: Remove Option<> --- src/bin/ptth_relay.rs | 2 +- src/bin/ptth_server.rs | 2 +- src/lib.rs | 4 ++-- src/relay/mod.rs | 29 ++++++++++++----------------- src/server/mod.rs | 2 +- 5 files changed, 17 insertions(+), 22 deletions(-) diff --git a/src/bin/ptth_relay.rs b/src/bin/ptth_relay.rs index 4f5ad9a..45683c7 100644 --- a/src/bin/ptth_relay.rs +++ b/src/bin/ptth_relay.rs @@ -50,6 +50,6 @@ async fn main () -> Result <(), Box > { relay::run_relay ( Arc::new (RelayState::from (&config_file)), - Some (rx) + rx ).await } diff --git a/src/bin/ptth_server.rs b/src/bin/ptth_server.rs index 1ec88a0..63370de 100644 --- a/src/bin/ptth_server.rs +++ b/src/bin/ptth_server.rs @@ -27,5 +27,5 @@ async fn main () -> Result <(), Box > { toml::from_str (&config_s).unwrap_or_else (|_| panic! ("Can't parse {:?} as TOML", config_file_path)) }; - ptth::server::main (config_file, None).await + ptth::server::run_server (config_file, None).await } diff --git a/src/lib.rs b/src/lib.rs index a4ceb98..4350775 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -113,7 +113,7 @@ mod tests { let relay_state_2 = relay_state.clone (); let (stop_relay_tx, stop_relay_rx) = oneshot::channel (); let task_relay = spawn (async move { - relay::run_relay (relay_state_2, Some (stop_relay_rx)).await.unwrap (); + relay::run_relay (relay_state_2, stop_relay_rx).await.unwrap (); }); assert! (relay_state.list_servers ().await.is_empty ()); @@ -130,7 +130,7 @@ mod tests { let task_server = { let stop_server_atomic = stop_server_atomic.clone (); spawn (async move { - server::main (config_file, Some (stop_server_atomic)).await.unwrap (); + server::run_server (config_file, Some (stop_server_atomic)).await.unwrap (); }) }; diff --git a/src/relay/mod.rs b/src/relay/mod.rs index 18a7127..ef95378 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -397,7 +397,7 @@ use tracing::info; pub async fn run_relay ( state: Arc , - shutdown_oneshot: Option > + shutdown_oneshot: oneshot::Receiver <()> ) -> Result <(), Box > { @@ -433,22 +433,17 @@ pub async fn run_relay ( let server = Server::bind (&addr) .serve (make_svc); - match shutdown_oneshot { - Some (rx) => { - info! ("Configured for graceful shutdown"); - server.with_graceful_shutdown (async { - rx.await.ok (); - - state.response_rendezvous.clear (); - - let mut request_rendezvoux = state.request_rendezvous.lock ().await; - request_rendezvoux.clear (); - - info! ("Received graceful shutdown"); - }).await? - }, - None => server.await?, - }; + info! ("Configured for graceful shutdown"); + server.with_graceful_shutdown (async { + shutdown_oneshot.await.ok (); + + state.response_rendezvous.clear (); + + let mut request_rendezvoux = state.request_rendezvous.lock ().await; + request_rendezvoux.clear (); + + info! ("Received graceful shutdown"); + }).await?; info! ("Exiting"); Ok (()) diff --git a/src/server/mod.rs b/src/server/mod.rs index 5538d06..38dd1f5 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -115,7 +115,7 @@ use std::sync::atomic::{ use tracing::info; -pub async fn main ( +pub async fn run_server ( config_file: ConfigFile, shutdown_atomic: Option > ) From 26b4c335c6c8c34f226f84974f77162105fba748 Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Thu, 5 Nov 2020 22:18:47 -0600 Subject: [PATCH 5/7] :recycle: --- src/bin/load_toml.rs | 25 +++++++++++++++++++++++++ src/bin/ptth_relay.rs | 16 +++------------- src/bin/ptth_server.rs | 21 +++++++-------------- src/relay/mod.rs | 2 +- 4 files changed, 36 insertions(+), 28 deletions(-) create mode 100644 src/bin/load_toml.rs diff --git a/src/bin/load_toml.rs b/src/bin/load_toml.rs new file mode 100644 index 0000000..acbb64d --- /dev/null +++ b/src/bin/load_toml.rs @@ -0,0 +1,25 @@ +use std::{ + fmt::Debug, + fs::File, + io::Read, + path::Path, +}; + +use serde::de::DeserializeOwned; + +pub fn load < + T: DeserializeOwned, + P: AsRef + Debug +> ( + config_file_path: P +) -> T { + let mut f = File::open (&config_file_path).unwrap_or_else (|_| panic! ("Can't open {:?}", config_file_path)); + let mut buffer = vec! [0u8; 4096]; + let bytes_read = f.read (&mut buffer).unwrap_or_else (|_| panic! ("Can't read {:?}", config_file_path)); + buffer.truncate (bytes_read); + + { + let config_s = String::from_utf8 (buffer).unwrap_or_else (|_| panic! ("Can't parse {:?} as UTF-8", config_file_path)); + toml::from_str (&config_s).unwrap_or_else (|_| panic! ("Can't parse {:?} as TOML", config_file_path)) + } +} diff --git a/src/bin/ptth_relay.rs b/src/bin/ptth_relay.rs index 45683c7..92f851e 100644 --- a/src/bin/ptth_relay.rs +++ b/src/bin/ptth_relay.rs @@ -6,24 +6,14 @@ use std::{ use tokio::sync::oneshot; +mod load_toml; + use ptth::relay; use ptth::relay::RelayState; #[tokio::main] async fn main () -> Result <(), Box > { - use std::io::Read; - - let config_file = { - let config_file_path = "config/ptth_relay.toml"; - - let mut f = File::open (config_file_path).unwrap_or_else (|_| panic! ("Can't open {:?}", config_file_path)); - let mut buffer = vec! [0u8; 4096]; - let bytes_read = f.read (&mut buffer).unwrap_or_else (|_| panic! ("Can't read {:?}", config_file_path)); - buffer.truncate (bytes_read); - - let config_s = String::from_utf8 (buffer).unwrap_or_else (|_| panic! ("Can't parse {:?} as UTF-8", config_file_path)); - toml::from_str (&config_s).unwrap_or_else (|_| panic! ("Can't parse {:?} as TOML", config_file_path)) - }; + let config_file = load_toml::load ("config/ptth_relay.toml"); eprintln! ("ptth_relay Git version: {:?}", ptth::git_version::GIT_VERSION); diff --git a/src/bin/ptth_server.rs b/src/bin/ptth_server.rs index 63370de..fa98799 100644 --- a/src/bin/ptth_server.rs +++ b/src/bin/ptth_server.rs @@ -5,6 +5,8 @@ use std::{ use structopt::StructOpt; +mod load_toml; + #[derive (Debug, StructOpt)] struct Opt { #[structopt (long)] @@ -13,19 +15,10 @@ struct Opt { #[tokio::main] async fn main () -> Result <(), Box > { - use std::io::Read; + let config_file = load_toml::load ("config/ptth_server.toml"); - let config_file = { - let config_file_path = "config/ptth_server.toml"; - - let mut f = std::fs::File::open (config_file_path).unwrap_or_else (|_| panic! ("Can't open {:?}", config_file_path)); - let mut buffer = vec! [0u8; 4096]; - let bytes_read = f.read (&mut buffer).unwrap_or_else (|_| panic! ("Can't read {:?}", config_file_path)); - buffer.truncate (bytes_read); - - let config_s = String::from_utf8 (buffer).unwrap_or_else (|_| panic! ("Can't parse {:?} as UTF-8", config_file_path)); - toml::from_str (&config_s).unwrap_or_else (|_| panic! ("Can't parse {:?} as TOML", config_file_path)) - }; - - ptth::server::run_server (config_file, None).await + ptth::server::run_server ( + config_file, + None + ).await } diff --git a/src/relay/mod.rs b/src/relay/mod.rs index ef95378..d615a80 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -21,7 +21,7 @@ use hyper::{ }; use hyper::service::{make_service_fn, service_fn}; use serde::{ - Deserialize, + Deserialize, Serialize, }; use tokio::{ From e9d335eec1f44f5e2c6991c6a7fb47972ffb9632 Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Thu, 5 Nov 2020 23:03:33 -0600 Subject: [PATCH 6/7] :recycle: Graceful shutdown is now a oneshot for both relays and servers --- src/bin/load_toml.rs | 2 +- src/bin/ptth_relay.rs | 1 - src/bin/ptth_server.rs | 26 ++++++++++++++++++++++++- src/lib.rs | 16 ++++++++-------- src/server/mod.rs | 43 +++++++++++++++++++++--------------------- 5 files changed, 56 insertions(+), 32 deletions(-) diff --git a/src/bin/load_toml.rs b/src/bin/load_toml.rs index acbb64d..99acc89 100644 --- a/src/bin/load_toml.rs +++ b/src/bin/load_toml.rs @@ -20,6 +20,6 @@ pub fn load < { let config_s = String::from_utf8 (buffer).unwrap_or_else (|_| panic! ("Can't parse {:?} as UTF-8", config_file_path)); - toml::from_str (&config_s).unwrap_or_else (|_| panic! ("Can't parse {:?} as TOML", config_file_path)) + toml::from_str (&config_s).unwrap_or_else (|e| panic! ("Can't parse {:?} as TOML: {}", config_file_path, e)) } } diff --git a/src/bin/ptth_relay.rs b/src/bin/ptth_relay.rs index 92f851e..a0ab66c 100644 --- a/src/bin/ptth_relay.rs +++ b/src/bin/ptth_relay.rs @@ -1,6 +1,5 @@ use std::{ error::Error, - fs::File, sync::Arc, }; diff --git a/src/bin/ptth_server.rs b/src/bin/ptth_server.rs index fa98799..495c4b6 100644 --- a/src/bin/ptth_server.rs +++ b/src/bin/ptth_server.rs @@ -4,6 +4,7 @@ use std::{ }; use structopt::StructOpt; +use tokio::sync::oneshot; mod load_toml; @@ -15,10 +16,33 @@ struct Opt { #[tokio::main] async fn main () -> Result <(), Box > { + tracing_subscriber::fmt::init (); + let config_file = load_toml::load ("config/ptth_server.toml"); + let rx = { + let (tx, rx) = oneshot::channel::<()> (); + + // I have to put the tx into a Cell here so that if Ctrl-C gets + // called multiple times, we won't send multiple shutdowns to the + // oneshot channel. (Which would be a compile error) + + let tx = Some (tx); + let tx = std::cell::Cell::new (tx); + + ctrlc::set_handler (move ||{ + let tx = tx.replace (None); + + if let Some (tx) = tx { + tx.send (()).unwrap (); + } + }).expect ("Error setting Ctrl-C handler"); + + rx + }; + ptth::server::run_server ( config_file, - None + rx ).await } diff --git a/src/lib.rs b/src/lib.rs index 4350775..82724e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,10 +43,6 @@ mod tests { use std::{ sync::{ Arc, - atomic::{ - AtomicBool, - Ordering, - }, }, time::Duration, }; @@ -91,6 +87,10 @@ mod tests { use reqwest::Client; use tracing::{info}; + // This should be the first line of the `tracing` + // crate documentation. Their docs are awful, but you + // didn't hear it from me. + tracing_subscriber::fmt::init (); let mut rt = Runtime::new ().unwrap (); @@ -126,11 +126,11 @@ mod tests { relay_url: "http://127.0.0.1:4000/7ZSFUKGV".into (), file_server_root: None, }; - let stop_server_atomic = Arc::new (AtomicBool::from (false)); + + let (stop_server_tx, stop_server_rx) = oneshot::channel (); let task_server = { - let stop_server_atomic = stop_server_atomic.clone (); spawn (async move { - server::run_server (config_file, Some (stop_server_atomic)).await.unwrap (); + server::run_server (config_file, stop_server_rx).await.unwrap (); }) }; @@ -176,7 +176,7 @@ mod tests { info! ("Shutting down end-to-end test"); - stop_server_atomic.store (true, Ordering::Relaxed); + stop_server_tx.send (()).unwrap (); stop_relay_tx.send (()).unwrap (); info! ("Sent stop messages"); diff --git a/src/server/mod.rs b/src/server/mod.rs index 38dd1f5..e946eb2 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -5,6 +5,7 @@ use std::{ time::Duration, }; +use futures::FutureExt; use handlebars::Handlebars; use hyper::{ StatusCode, @@ -12,8 +13,10 @@ use hyper::{ use reqwest::Client; use serde::Deserialize; use tokio::{ + sync::oneshot, time::delay_for, }; +use tracing::info; use crate::{ http_serde, @@ -108,16 +111,9 @@ pub struct Config { pub file_server_root: Option , } -use std::sync::atomic::{ - AtomicBool, - Ordering, -}; - -use tracing::info; - pub async fn run_server ( config_file: ConfigFile, - shutdown_atomic: Option > + shutdown_oneshot: oneshot::Receiver <()> ) -> Result <(), Box > { @@ -150,31 +146,36 @@ pub async fn run_server ( }); let mut backoff_delay = 0; + let mut shutdown_oneshot = shutdown_oneshot.fuse (); loop { - if let Some (a) = &shutdown_atomic { - if a.load (Ordering::Relaxed) { - break; - } - } - if backoff_delay > 0 { - delay_for (Duration::from_millis (backoff_delay)).await; - } - - if let Some (a) = &shutdown_atomic { - if a.load (Ordering::Relaxed) { + let mut delay = delay_for (Duration::from_millis (backoff_delay)).fuse (); + + if futures::select! ( + _ = delay => false, + _ = shutdown_oneshot => true, + ) { + info! ("Received graceful shutdown"); break; } } info! ("http_listen"); - let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, config_file.name)); + let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, config_file.name)).send (); let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500); - let req_resp = match req_req.send ().await { + let req_req = futures::select! { + r = req_req.fuse () => r, + _ = shutdown_oneshot => { + info! ("Received graceful shutdown"); + break; + }, + }; + + let req_resp = match req_req { Err (e) => { eprintln! ("Err: {:?}", e); backoff_delay = err_backoff_delay; From 80e8183af5be991ecfdb40c328eb7430d3b08c57 Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Thu, 5 Nov 2020 23:11:07 -0600 Subject: [PATCH 7/7] :bug: Turns out I had the modules all wrong. This one works good --- src/bin/ptth_relay.rs | 29 ++--------------------------- src/bin/ptth_server.rs | 28 ++-------------------------- src/graceful_shutdown.rs | 23 +++++++++++++++++++++++ src/lib.rs | 2 ++ src/{bin => }/load_toml.rs | 0 5 files changed, 29 insertions(+), 53 deletions(-) create mode 100644 src/graceful_shutdown.rs rename src/{bin => }/load_toml.rs (100%) diff --git a/src/bin/ptth_relay.rs b/src/bin/ptth_relay.rs index a0ab66c..dd3ebd2 100644 --- a/src/bin/ptth_relay.rs +++ b/src/bin/ptth_relay.rs @@ -3,42 +3,17 @@ use std::{ sync::Arc, }; -use tokio::sync::oneshot; - -mod load_toml; - use ptth::relay; use ptth::relay::RelayState; #[tokio::main] async fn main () -> Result <(), Box > { - let config_file = load_toml::load ("config/ptth_relay.toml"); + let config_file = ptth::load_toml::load ("config/ptth_relay.toml"); eprintln! ("ptth_relay Git version: {:?}", ptth::git_version::GIT_VERSION); - let rx = { - let (tx, rx) = oneshot::channel::<()> (); - - // I have to put the tx into a Cell here so that if Ctrl-C gets - // called multiple times, we won't send multiple shutdowns to the - // oneshot channel. (Which would be a compile error) - - let tx = Some (tx); - let tx = std::cell::Cell::new (tx); - - ctrlc::set_handler (move ||{ - let tx = tx.replace (None); - - if let Some (tx) = tx { - tx.send (()).unwrap (); - } - }).expect ("Error setting Ctrl-C handler"); - - rx - }; - relay::run_relay ( Arc::new (RelayState::from (&config_file)), - rx + ptth::graceful_shutdown::init () ).await } diff --git a/src/bin/ptth_server.rs b/src/bin/ptth_server.rs index 495c4b6..790dc97 100644 --- a/src/bin/ptth_server.rs +++ b/src/bin/ptth_server.rs @@ -4,9 +4,6 @@ use std::{ }; use structopt::StructOpt; -use tokio::sync::oneshot; - -mod load_toml; #[derive (Debug, StructOpt)] struct Opt { @@ -18,31 +15,10 @@ struct Opt { async fn main () -> Result <(), Box > { tracing_subscriber::fmt::init (); - let config_file = load_toml::load ("config/ptth_server.toml"); - - let rx = { - let (tx, rx) = oneshot::channel::<()> (); - - // I have to put the tx into a Cell here so that if Ctrl-C gets - // called multiple times, we won't send multiple shutdowns to the - // oneshot channel. (Which would be a compile error) - - let tx = Some (tx); - let tx = std::cell::Cell::new (tx); - - ctrlc::set_handler (move ||{ - let tx = tx.replace (None); - - if let Some (tx) = tx { - tx.send (()).unwrap (); - } - }).expect ("Error setting Ctrl-C handler"); - - rx - }; + let config_file = ptth::load_toml::load ("config/ptth_server.toml"); ptth::server::run_server ( config_file, - rx + ptth::graceful_shutdown::init () ).await } diff --git a/src/graceful_shutdown.rs b/src/graceful_shutdown.rs new file mode 100644 index 0000000..63de1b3 --- /dev/null +++ b/src/graceful_shutdown.rs @@ -0,0 +1,23 @@ +use std::cell::Cell; +use tokio::sync::oneshot; + +pub fn init () -> oneshot::Receiver <()> { + let (tx, rx) = oneshot::channel::<()> (); + + // I have to put the tx into a Cell here so that if Ctrl-C gets + // called multiple times, we won't send multiple shutdowns to the + // oneshot channel. (Which would be a compile error) + + let tx = Some (tx); + let tx = Cell::new (tx); + + ctrlc::set_handler (move ||{ + let tx = tx.replace (None); + + if let Some (tx) = tx { + tx.send (()).unwrap (); + } + }).expect ("Error setting Ctrl-C handler"); + + rx +} diff --git a/src/lib.rs b/src/lib.rs index 82724e3..16d239c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,8 @@ pub const PTTH_MAGIC_HEADER: &str = "X-PTTH-2LJYXWC4"; // test stuff like spawn them both in the same process pub mod git_version; +pub mod graceful_shutdown; +pub mod load_toml; pub mod relay; pub mod server; diff --git a/src/bin/load_toml.rs b/src/load_toml.rs similarity index 100% rename from src/bin/load_toml.rs rename to src/load_toml.rs