diff --git a/src/bin/ptth_file_server.rs b/src/bin/ptth_file_server.rs index aec5855..c36bf84 100644 --- a/src/bin/ptth_file_server.rs +++ b/src/bin/ptth_file_server.rs @@ -45,8 +45,6 @@ pub struct Config { struct ServerState <'a> { config: Config, handlebars: handlebars::Handlebars <'a>, - - shutdown_watch_rx: watch::Receiver , } fn status_reply > (status: StatusCode, b: B) @@ -76,19 +74,12 @@ async fn handle_all (req: Request , state: Arc >) .as_ref () .unwrap_or (&default_root); - let mut shutdown_watch_rx = state.shutdown_watch_rx.clone (); - if shutdown_watch_rx.recv ().await != Some (false) { - error! ("Can't serve, I'm shutting down"); - panic! ("Can't serve, I'm shutting down"); - } - let ptth_resp = file_server::serve_all ( &state.handlebars, file_server_root, ptth_req.method, &ptth_req.uri, - &ptth_req.headers, - Some (shutdown_watch_rx) + &ptth_req.headers ).await; let mut resp = Response::builder () @@ -129,15 +120,11 @@ async fn main () -> Result <(), Box > { let handlebars = file_server::load_templates ()?; - let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false); - let state = Arc::new (ServerState { config: Config { file_server_root: config_file.file_server_root, }, handlebars, - - shutdown_watch_rx, }); let make_svc = make_service_fn (|_conn| { @@ -161,13 +148,15 @@ async fn main () -> Result <(), Box > { shutdown_oneshot.await.ok (); info! ("Received graceful shutdown"); - shutdown_watch_tx.broadcast (true).unwrap (); + // Kick off a timer for a forced shutdown force_shutdown_tx.send (()).unwrap (); }); let force_shutdown_fut = async move { force_shutdown_rx.await.unwrap (); - delay_for (Duration::from_secs (5)).await; + let timeout = 10; + info! ("Forced shutdown in {} seconds", timeout); + delay_for (Duration::from_secs (timeout)).await; error! ("Forcing shutdown"); }; diff --git a/src/server/file_server.rs b/src/server/file_server.rs index 4155ebe..fd7e922 100644 --- a/src/server/file_server.rs +++ b/src/server/file_server.rs @@ -141,13 +141,12 @@ async fn serve_dir ( resp } -#[instrument (level = "debug", skip (f, cancel_rx))] +#[instrument (level = "debug", skip (f))] async fn serve_file ( mut f: File, should_send_body: bool, range_start: Option , - range_end: Option , - mut cancel_rx: Option > + range_end: Option ) -> http_serde::Response { let (tx, rx) = channel (1); let body = if should_send_body { @@ -172,7 +171,6 @@ async fn serve_file ( if should_send_body { tokio::spawn (async move { - { //println! ("Opening file {:?}", path); let mut tx = tx; @@ -191,20 +189,7 @@ async fn serve_file ( break; } - let send_fut = tx.send (Ok::<_, Infallible> (buffer)); - - let send_result = match &mut cancel_rx { - Some (cancel_rx) => futures::select! { - x = send_fut.fuse () => x, - _ = cancel_rx.recv ().fuse () => { - error! ("Cancelled"); - break; - }, - }, - None => send_fut.await, - }; - - if send_result.is_err () { + if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { warn! ("Cancelling file stream (Sent {} out of {} bytes)", bytes_sent, end - start); break; } @@ -220,8 +205,6 @@ async fn serve_file ( //delay_for (Duration::from_millis (50)).await; } - } - debug! ("Exited stream scope"); }); } @@ -267,8 +250,7 @@ pub async fn serve_all ( root: &Path, method: http_serde::Method, uri: &str, - headers: &HashMap >, - cancel_rx: Option > + headers: &HashMap > ) -> http_serde::Response { @@ -317,8 +299,7 @@ pub async fn serve_all ( file, should_send_body, range_start, - range_end, - cancel_rx + range_end ).await } else { diff --git a/src/server/mod.rs b/src/server/mod.rs index 86a1bc3..4c74381 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -76,8 +76,7 @@ async fn handle_req_resp <'a> ( file_server_root, parts.method, uri, - &parts.headers, - None + &parts.headers ).await } else {