♻️ Graceful shutdown is now a oneshot for both relays and servers

main
_ 2020-11-05 23:03:33 -06:00 committed by _
parent 50393e60a0
commit d8010cf33c
5 changed files with 56 additions and 32 deletions

View File

@ -20,6 +20,6 @@ pub fn load <
{
let config_s = String::from_utf8 (buffer).unwrap_or_else (|_| panic! ("Can't parse {:?} as UTF-8", config_file_path));
toml::from_str (&config_s).unwrap_or_else (|_| panic! ("Can't parse {:?} as TOML", config_file_path))
toml::from_str (&config_s).unwrap_or_else (|e| panic! ("Can't parse {:?} as TOML: {}", config_file_path, e))
}
}

View File

@ -1,6 +1,5 @@
use std::{
error::Error,
fs::File,
sync::Arc,
};

View File

@ -4,6 +4,7 @@ use std::{
};
use structopt::StructOpt;
use tokio::sync::oneshot;
mod load_toml;
@ -15,10 +16,33 @@ struct Opt {
#[tokio::main]
async fn main () -> Result <(), Box <dyn Error>> {
tracing_subscriber::fmt::init ();
let config_file = load_toml::load ("config/ptth_server.toml");
let rx = {
let (tx, rx) = oneshot::channel::<()> ();
// I have to put the tx into a Cell here so that if Ctrl-C gets
// called multiple times, we won't send multiple shutdowns to the
// oneshot channel. (Which would be a compile error)
let tx = Some (tx);
let tx = std::cell::Cell::new (tx);
ctrlc::set_handler (move ||{
let tx = tx.replace (None);
if let Some (tx) = tx {
tx.send (()).unwrap ();
}
}).expect ("Error setting Ctrl-C handler");
rx
};
ptth::server::run_server (
config_file,
None
rx
).await
}

View File

@ -43,10 +43,6 @@ mod tests {
use std::{
sync::{
Arc,
atomic::{
AtomicBool,
Ordering,
},
},
time::Duration,
};
@ -91,6 +87,10 @@ mod tests {
use reqwest::Client;
use tracing::{info};
// This should be the first line of the `tracing`
// crate documentation. Their docs are awful, but you
// didn't hear it from me.
tracing_subscriber::fmt::init ();
let mut rt = Runtime::new ().unwrap ();
@ -126,11 +126,11 @@ mod tests {
relay_url: "http://127.0.0.1:4000/7ZSFUKGV".into (),
file_server_root: None,
};
let stop_server_atomic = Arc::new (AtomicBool::from (false));
let (stop_server_tx, stop_server_rx) = oneshot::channel ();
let task_server = {
let stop_server_atomic = stop_server_atomic.clone ();
spawn (async move {
server::run_server (config_file, Some (stop_server_atomic)).await.unwrap ();
server::run_server (config_file, stop_server_rx).await.unwrap ();
})
};
@ -176,7 +176,7 @@ mod tests {
info! ("Shutting down end-to-end test");
stop_server_atomic.store (true, Ordering::Relaxed);
stop_server_tx.send (()).unwrap ();
stop_relay_tx.send (()).unwrap ();
info! ("Sent stop messages");

View File

@ -5,6 +5,7 @@ use std::{
time::Duration,
};
use futures::FutureExt;
use handlebars::Handlebars;
use hyper::{
StatusCode,
@ -12,8 +13,10 @@ use hyper::{
use reqwest::Client;
use serde::Deserialize;
use tokio::{
sync::oneshot,
time::delay_for,
};
use tracing::info;
use crate::{
http_serde,
@ -117,16 +120,9 @@ pub struct Config {
pub file_server_root: Option <PathBuf>,
}
use std::sync::atomic::{
AtomicBool,
Ordering,
};
use tracing::info;
pub async fn run_server (
config_file: ConfigFile,
shutdown_atomic: Option <Arc <AtomicBool>>
shutdown_oneshot: oneshot::Receiver <()>
)
-> Result <(), Box <dyn Error>>
{
@ -159,31 +155,36 @@ pub async fn run_server (
});
let mut backoff_delay = 0;
let mut shutdown_oneshot = shutdown_oneshot.fuse ();
loop {
if let Some (a) = &shutdown_atomic {
if a.load (Ordering::Relaxed) {
break;
}
}
if backoff_delay > 0 {
delay_for (Duration::from_millis (backoff_delay)).await;
}
if let Some (a) = &shutdown_atomic {
if a.load (Ordering::Relaxed) {
let mut delay = delay_for (Duration::from_millis (backoff_delay)).fuse ();
if futures::select! (
_ = delay => false,
_ = shutdown_oneshot => true,
) {
info! ("Received graceful shutdown");
break;
}
}
info! ("http_listen");
let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, config_file.name));
let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, config_file.name)).send ();
let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500);
let req_resp = match req_req.send ().await {
let req_req = futures::select! {
r = req_req.fuse () => r,
_ = shutdown_oneshot => {
info! ("Received graceful shutdown");
break;
},
};
let req_resp = match req_req {
Err (e) => {
eprintln! ("Err: {:?}", e);
backoff_delay = err_backoff_delay;