♻️ Graceful shutdown is now a oneshot for both relays and servers

main
_ 2020-11-05 23:03:33 -06:00
parent 26b4c335c6
commit e9d335eec1
5 changed files with 56 additions and 32 deletions

View File

@ -20,6 +20,6 @@ pub fn load <
{ {
let config_s = String::from_utf8 (buffer).unwrap_or_else (|_| panic! ("Can't parse {:?} as UTF-8", config_file_path)); let config_s = String::from_utf8 (buffer).unwrap_or_else (|_| panic! ("Can't parse {:?} as UTF-8", config_file_path));
toml::from_str (&config_s).unwrap_or_else (|_| panic! ("Can't parse {:?} as TOML", config_file_path)) toml::from_str (&config_s).unwrap_or_else (|e| panic! ("Can't parse {:?} as TOML: {}", config_file_path, e))
} }
} }

View File

@ -1,6 +1,5 @@
use std::{ use std::{
error::Error, error::Error,
fs::File,
sync::Arc, sync::Arc,
}; };

View File

@ -4,6 +4,7 @@ use std::{
}; };
use structopt::StructOpt; use structopt::StructOpt;
use tokio::sync::oneshot;
mod load_toml; mod load_toml;
@ -15,10 +16,33 @@ struct Opt {
#[tokio::main] #[tokio::main]
async fn main () -> Result <(), Box <dyn Error>> { async fn main () -> Result <(), Box <dyn Error>> {
tracing_subscriber::fmt::init ();
let config_file = load_toml::load ("config/ptth_server.toml"); let config_file = load_toml::load ("config/ptth_server.toml");
let rx = {
let (tx, rx) = oneshot::channel::<()> ();
// I have to put the tx into a Cell here so that if Ctrl-C gets
// called multiple times, we won't send multiple shutdowns to the
// oneshot channel. (Which would be a compile error)
let tx = Some (tx);
let tx = std::cell::Cell::new (tx);
ctrlc::set_handler (move ||{
let tx = tx.replace (None);
if let Some (tx) = tx {
tx.send (()).unwrap ();
}
}).expect ("Error setting Ctrl-C handler");
rx
};
ptth::server::run_server ( ptth::server::run_server (
config_file, config_file,
None rx
).await ).await
} }

View File

@ -43,10 +43,6 @@ mod tests {
use std::{ use std::{
sync::{ sync::{
Arc, Arc,
atomic::{
AtomicBool,
Ordering,
},
}, },
time::Duration, time::Duration,
}; };
@ -91,6 +87,10 @@ mod tests {
use reqwest::Client; use reqwest::Client;
use tracing::{info}; use tracing::{info};
// This should be the first line of the `tracing`
// crate documentation. Their docs are awful, but you
// didn't hear it from me.
tracing_subscriber::fmt::init (); tracing_subscriber::fmt::init ();
let mut rt = Runtime::new ().unwrap (); let mut rt = Runtime::new ().unwrap ();
@ -126,11 +126,11 @@ mod tests {
relay_url: "http://127.0.0.1:4000/7ZSFUKGV".into (), relay_url: "http://127.0.0.1:4000/7ZSFUKGV".into (),
file_server_root: None, file_server_root: None,
}; };
let stop_server_atomic = Arc::new (AtomicBool::from (false));
let (stop_server_tx, stop_server_rx) = oneshot::channel ();
let task_server = { let task_server = {
let stop_server_atomic = stop_server_atomic.clone ();
spawn (async move { spawn (async move {
server::run_server (config_file, Some (stop_server_atomic)).await.unwrap (); server::run_server (config_file, stop_server_rx).await.unwrap ();
}) })
}; };
@ -176,7 +176,7 @@ mod tests {
info! ("Shutting down end-to-end test"); info! ("Shutting down end-to-end test");
stop_server_atomic.store (true, Ordering::Relaxed); stop_server_tx.send (()).unwrap ();
stop_relay_tx.send (()).unwrap (); stop_relay_tx.send (()).unwrap ();
info! ("Sent stop messages"); info! ("Sent stop messages");

View File

@ -5,6 +5,7 @@ use std::{
time::Duration, time::Duration,
}; };
use futures::FutureExt;
use handlebars::Handlebars; use handlebars::Handlebars;
use hyper::{ use hyper::{
StatusCode, StatusCode,
@ -12,8 +13,10 @@ use hyper::{
use reqwest::Client; use reqwest::Client;
use serde::Deserialize; use serde::Deserialize;
use tokio::{ use tokio::{
sync::oneshot,
time::delay_for, time::delay_for,
}; };
use tracing::info;
use crate::{ use crate::{
http_serde, http_serde,
@ -108,16 +111,9 @@ pub struct Config {
pub file_server_root: Option <PathBuf>, pub file_server_root: Option <PathBuf>,
} }
use std::sync::atomic::{
AtomicBool,
Ordering,
};
use tracing::info;
pub async fn run_server ( pub async fn run_server (
config_file: ConfigFile, config_file: ConfigFile,
shutdown_atomic: Option <Arc <AtomicBool>> shutdown_oneshot: oneshot::Receiver <()>
) )
-> Result <(), Box <dyn Error>> -> Result <(), Box <dyn Error>>
{ {
@ -150,31 +146,36 @@ pub async fn run_server (
}); });
let mut backoff_delay = 0; let mut backoff_delay = 0;
let mut shutdown_oneshot = shutdown_oneshot.fuse ();
loop { loop {
if let Some (a) = &shutdown_atomic {
if a.load (Ordering::Relaxed) {
break;
}
}
if backoff_delay > 0 { if backoff_delay > 0 {
delay_for (Duration::from_millis (backoff_delay)).await; let mut delay = delay_for (Duration::from_millis (backoff_delay)).fuse ();
}
if let Some (a) = &shutdown_atomic { if futures::select! (
if a.load (Ordering::Relaxed) { _ = delay => false,
_ = shutdown_oneshot => true,
) {
info! ("Received graceful shutdown");
break; break;
} }
} }
info! ("http_listen"); info! ("http_listen");
let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, config_file.name)); let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, config_file.name)).send ();
let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500); let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500);
let req_resp = match req_req.send ().await { let req_req = futures::select! {
r = req_req.fuse () => r,
_ = shutdown_oneshot => {
info! ("Received graceful shutdown");
break;
},
};
let req_resp = match req_req {
Err (e) => { Err (e) => {
eprintln! ("Err: {:?}", e); eprintln! ("Err: {:?}", e);
backoff_delay = err_backoff_delay; backoff_delay = err_backoff_delay;