From 9e134d55aa99697cfe656ac7276da87c249c74e3 Mon Sep 17 00:00:00 2001 From: _ <> Date: Sat, 7 Nov 2020 02:26:34 +0000 Subject: [PATCH] :tada: Add forced shutdown fallback to graceful_shutdown module --- src/bin/ptth_file_server.rs | 23 ++-------- src/graceful_shutdown.rs | 85 +++++++++++++++++++++++++++++++++++-- 2 files changed, 85 insertions(+), 23 deletions(-) diff --git a/src/bin/ptth_file_server.rs b/src/bin/ptth_file_server.rs index c36bf84..7834934 100644 --- a/src/bin/ptth_file_server.rs +++ b/src/bin/ptth_file_server.rs @@ -139,32 +139,15 @@ async fn main () -> Result <(), Box > { } }); - let shutdown_oneshot = ptth::graceful_shutdown::init (); - let (force_shutdown_tx, force_shutdown_rx) = oneshot::channel (); + let (shutdown_rx, forced_shutdown) = ptth::graceful_shutdown::init_with_force (); let server = Server::bind (&addr) .serve (make_svc) .with_graceful_shutdown (async move { - shutdown_oneshot.await.ok (); - info! ("Received graceful shutdown"); - - // Kick off a timer for a forced shutdown - force_shutdown_tx.send (()).unwrap (); + shutdown_rx.await.ok (); }); - let force_shutdown_fut = async move { - force_shutdown_rx.await.unwrap (); - let timeout = 10; - info! ("Forced shutdown in {} seconds", timeout); - delay_for (Duration::from_secs (timeout)).await; - - error! ("Forcing shutdown"); - }; - - futures::select! { - x = server.fuse () => x?, - _ = force_shutdown_fut.fuse () => (), - }; + forced_shutdown.wrap_server (server).await??; Ok (()) } diff --git a/src/graceful_shutdown.rs b/src/graceful_shutdown.rs index 63de1b3..15cb518 100644 --- a/src/graceful_shutdown.rs +++ b/src/graceful_shutdown.rs @@ -1,5 +1,14 @@ -use std::cell::Cell; -use tokio::sync::oneshot; +use std::{ + cell::Cell, + time::Duration, +}; + +use futures::prelude::*; +use tokio::{ + sync::oneshot, + time::delay_for, +}; +use tracing::{debug, error, info, trace, warn}; pub fn init () -> oneshot::Receiver <()> { let (tx, rx) = oneshot::channel::<()> (); @@ -11,7 +20,7 @@ pub fn init () -> oneshot::Receiver <()> { let tx = Some (tx); let tx = Cell::new (tx); - ctrlc::set_handler (move ||{ + ctrlc::set_handler (move || { let tx = tx.replace (None); if let Some (tx) = tx { @@ -21,3 +30,73 @@ pub fn init () -> oneshot::Receiver <()> { rx } + +#[derive (Debug)] +pub enum ShutdownError { + ForcedShutdown, +} + +use std::{ + error, + fmt, +}; + +impl fmt::Display for ShutdownError { + fn fmt (&self, f: &mut fmt::Formatter <'_>) -> fmt::Result { + use ShutdownError::*; + + let desc = match self { + ForcedShutdown => "Shutdown was forced after a timeout", + }; + + write! (f, "{}", desc) + } +} + +impl error::Error for ShutdownError { + +} + +pub struct ForcedShutdown { + rx: oneshot::Receiver <()>, + tx: oneshot::Sender <()>, +} + +impl ForcedShutdown { + pub async fn wrap_server < + T, + F: Future + > ( + self, + server: F + ) -> Result { + let fut = async move { + self.rx.await.unwrap (); + self.tx.send (()).unwrap (); + let timeout = 10; + debug! ("Starting graceful shutdown. Forcing shutdown in {} seconds", timeout); + delay_for (Duration::from_secs (timeout)).await; + + error! ("Forcing shutdown"); + }; + + futures::select! { + x = server.fuse () => { + info! ("Shut down gracefully"); + Ok (x) + }, + _ = fut.fuse () => Err (ShutdownError::ForcedShutdown), + } + } +} + +pub fn init_with_force () -> (oneshot::Receiver <()>, ForcedShutdown) { + let (tx, rx) = oneshot::channel (); + + let f = ForcedShutdown { + rx: init (), + tx, + }; + + (rx, f) +}