From d8010cf33c5d6ff1980e84bd40adf65908fba4f8 Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Thu, 5 Nov 2020 23:03:33 -0600 Subject: [PATCH] :recycle: Graceful shutdown is now a oneshot for both relays and servers --- src/bin/load_toml.rs | 2 +- src/bin/ptth_relay.rs | 1 - src/bin/ptth_server.rs | 26 ++++++++++++++++++++++++- src/lib.rs | 16 ++++++++-------- src/server/mod.rs | 43 +++++++++++++++++++++--------------------- 5 files changed, 56 insertions(+), 32 deletions(-) diff --git a/src/bin/load_toml.rs b/src/bin/load_toml.rs index acbb64d..99acc89 100644 --- a/src/bin/load_toml.rs +++ b/src/bin/load_toml.rs @@ -20,6 +20,6 @@ pub fn load < { let config_s = String::from_utf8 (buffer).unwrap_or_else (|_| panic! ("Can't parse {:?} as UTF-8", config_file_path)); - toml::from_str (&config_s).unwrap_or_else (|_| panic! ("Can't parse {:?} as TOML", config_file_path)) + toml::from_str (&config_s).unwrap_or_else (|e| panic! ("Can't parse {:?} as TOML: {}", config_file_path, e)) } } diff --git a/src/bin/ptth_relay.rs b/src/bin/ptth_relay.rs index 92f851e..a0ab66c 100644 --- a/src/bin/ptth_relay.rs +++ b/src/bin/ptth_relay.rs @@ -1,6 +1,5 @@ use std::{ error::Error, - fs::File, sync::Arc, }; diff --git a/src/bin/ptth_server.rs b/src/bin/ptth_server.rs index fa98799..495c4b6 100644 --- a/src/bin/ptth_server.rs +++ b/src/bin/ptth_server.rs @@ -4,6 +4,7 @@ use std::{ }; use structopt::StructOpt; +use tokio::sync::oneshot; mod load_toml; @@ -15,10 +16,33 @@ struct Opt { #[tokio::main] async fn main () -> Result <(), Box > { + tracing_subscriber::fmt::init (); + let config_file = load_toml::load ("config/ptth_server.toml"); + let rx = { + let (tx, rx) = oneshot::channel::<()> (); + + // I have to put the tx into a Cell here so that if Ctrl-C gets + // called multiple times, we won't send multiple shutdowns to the + // oneshot channel. (Which would be a compile error) + + let tx = Some (tx); + let tx = std::cell::Cell::new (tx); + + ctrlc::set_handler (move ||{ + let tx = tx.replace (None); + + if let Some (tx) = tx { + tx.send (()).unwrap (); + } + }).expect ("Error setting Ctrl-C handler"); + + rx + }; + ptth::server::run_server ( config_file, - None + rx ).await } diff --git a/src/lib.rs b/src/lib.rs index 4350775..82724e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,10 +43,6 @@ mod tests { use std::{ sync::{ Arc, - atomic::{ - AtomicBool, - Ordering, - }, }, time::Duration, }; @@ -91,6 +87,10 @@ mod tests { use reqwest::Client; use tracing::{info}; + // This should be the first line of the `tracing` + // crate documentation. Their docs are awful, but you + // didn't hear it from me. + tracing_subscriber::fmt::init (); let mut rt = Runtime::new ().unwrap (); @@ -126,11 +126,11 @@ mod tests { relay_url: "http://127.0.0.1:4000/7ZSFUKGV".into (), file_server_root: None, }; - let stop_server_atomic = Arc::new (AtomicBool::from (false)); + + let (stop_server_tx, stop_server_rx) = oneshot::channel (); let task_server = { - let stop_server_atomic = stop_server_atomic.clone (); spawn (async move { - server::run_server (config_file, Some (stop_server_atomic)).await.unwrap (); + server::run_server (config_file, stop_server_rx).await.unwrap (); }) }; @@ -176,7 +176,7 @@ mod tests { info! ("Shutting down end-to-end test"); - stop_server_atomic.store (true, Ordering::Relaxed); + stop_server_tx.send (()).unwrap (); stop_relay_tx.send (()).unwrap (); info! ("Sent stop messages"); diff --git a/src/server/mod.rs b/src/server/mod.rs index 6cce392..901b485 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -5,6 +5,7 @@ use std::{ time::Duration, }; +use futures::FutureExt; use handlebars::Handlebars; use hyper::{ StatusCode, @@ -12,8 +13,10 @@ use hyper::{ use reqwest::Client; use serde::Deserialize; use tokio::{ + sync::oneshot, time::delay_for, }; +use tracing::info; use crate::{ http_serde, @@ -117,16 +120,9 @@ pub struct Config { pub file_server_root: Option , } -use std::sync::atomic::{ - AtomicBool, - Ordering, -}; - -use tracing::info; - pub async fn run_server ( config_file: ConfigFile, - shutdown_atomic: Option > + shutdown_oneshot: oneshot::Receiver <()> ) -> Result <(), Box > { @@ -159,31 +155,36 @@ pub async fn run_server ( }); let mut backoff_delay = 0; + let mut shutdown_oneshot = shutdown_oneshot.fuse (); loop { - if let Some (a) = &shutdown_atomic { - if a.load (Ordering::Relaxed) { - break; - } - } - if backoff_delay > 0 { - delay_for (Duration::from_millis (backoff_delay)).await; - } - - if let Some (a) = &shutdown_atomic { - if a.load (Ordering::Relaxed) { + let mut delay = delay_for (Duration::from_millis (backoff_delay)).fuse (); + + if futures::select! ( + _ = delay => false, + _ = shutdown_oneshot => true, + ) { + info! ("Received graceful shutdown"); break; } } info! ("http_listen"); - let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, config_file.name)); + let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, config_file.name)).send (); let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500); - let req_resp = match req_req.send ().await { + let req_req = futures::select! { + r = req_req.fuse () => r, + _ = shutdown_oneshot => { + info! ("Received graceful shutdown"); + break; + }, + }; + + let req_resp = match req_req { Err (e) => { eprintln! ("Err: {:?}", e); backoff_delay = err_backoff_delay;