🎉 Add forced shutdown fallback to graceful_shutdown module

main
_ 2020-11-07 02:26:34 +00:00
parent f02e12aecc
commit 9e134d55aa
2 changed files with 85 additions and 23 deletions

View File

@ -139,32 +139,15 @@ async fn main () -> Result <(), Box <dyn Error>> {
}
});
let shutdown_oneshot = ptth::graceful_shutdown::init ();
let (force_shutdown_tx, force_shutdown_rx) = oneshot::channel ();
let (shutdown_rx, forced_shutdown) = ptth::graceful_shutdown::init_with_force ();
let server = Server::bind (&addr)
.serve (make_svc)
.with_graceful_shutdown (async move {
shutdown_oneshot.await.ok ();
info! ("Received graceful shutdown");
// Kick off a timer for a forced shutdown
force_shutdown_tx.send (()).unwrap ();
shutdown_rx.await.ok ();
});
let force_shutdown_fut = async move {
force_shutdown_rx.await.unwrap ();
let timeout = 10;
info! ("Forced shutdown in {} seconds", timeout);
delay_for (Duration::from_secs (timeout)).await;
error! ("Forcing shutdown");
};
futures::select! {
x = server.fuse () => x?,
_ = force_shutdown_fut.fuse () => (),
};
forced_shutdown.wrap_server (server).await??;
Ok (())
}

View File

@ -1,5 +1,14 @@
use std::cell::Cell;
use tokio::sync::oneshot;
use std::{
cell::Cell,
time::Duration,
};
use futures::prelude::*;
use tokio::{
sync::oneshot,
time::delay_for,
};
use tracing::{debug, error, info, trace, warn};
pub fn init () -> oneshot::Receiver <()> {
let (tx, rx) = oneshot::channel::<()> ();
@ -11,7 +20,7 @@ pub fn init () -> oneshot::Receiver <()> {
let tx = Some (tx);
let tx = Cell::new (tx);
ctrlc::set_handler (move ||{
ctrlc::set_handler (move || {
let tx = tx.replace (None);
if let Some (tx) = tx {
@ -21,3 +30,73 @@ pub fn init () -> oneshot::Receiver <()> {
rx
}
#[derive (Debug)]
pub enum ShutdownError {
ForcedShutdown,
}
use std::{
error,
fmt,
};
impl fmt::Display for ShutdownError {
fn fmt (&self, f: &mut fmt::Formatter <'_>) -> fmt::Result {
use ShutdownError::*;
let desc = match self {
ForcedShutdown => "Shutdown was forced after a timeout",
};
write! (f, "{}", desc)
}
}
impl error::Error for ShutdownError {
}
pub struct ForcedShutdown {
rx: oneshot::Receiver <()>,
tx: oneshot::Sender <()>,
}
impl ForcedShutdown {
pub async fn wrap_server <
T,
F: Future <Output = T>
> (
self,
server: F
) -> Result <T, ShutdownError> {
let fut = async move {
self.rx.await.unwrap ();
self.tx.send (()).unwrap ();
let timeout = 10;
debug! ("Starting graceful shutdown. Forcing shutdown in {} seconds", timeout);
delay_for (Duration::from_secs (timeout)).await;
error! ("Forcing shutdown");
};
futures::select! {
x = server.fuse () => {
info! ("Shut down gracefully");
Ok (x)
},
_ = fut.fuse () => Err (ShutdownError::ForcedShutdown),
}
}
}
pub fn init_with_force () -> (oneshot::Receiver <()>, ForcedShutdown) {
let (tx, rx) = oneshot::channel ();
let f = ForcedShutdown {
rx: init (),
tx,
};
(rx, f)
}