🚧 Guess it's a bug in hyper.

You can't gracefully shutdown while a client is connected.
main
_ 2020-11-07 01:34:58 +00:00
parent 32798e8250
commit 75177cec80
3 changed files with 79 additions and 14 deletions

View File

@ -1,11 +1,13 @@
use std::{ use std::{
convert::Infallible, convert::Infallible,
error::Error, error::Error,
net::SocketAddr,
path::PathBuf, path::PathBuf,
sync::Arc, sync::Arc,
net::SocketAddr, time::Duration,
}; };
use futures::FutureExt;
use hyper::{ use hyper::{
Body, Body,
Request, Request,
@ -18,8 +20,15 @@ use hyper::{
StatusCode, StatusCode,
}; };
use serde::Deserialize; use serde::Deserialize;
use tokio::{
sync::{
oneshot,
watch,
},
time::delay_for,
};
use tracing::{ use tracing::{
debug, info, trace, warn, debug, error, info, trace, warn,
}; };
use ptth::{ use ptth::{
@ -36,6 +45,8 @@ pub struct Config {
struct ServerState <'a> { struct ServerState <'a> {
config: Config, config: Config,
handlebars: handlebars::Handlebars <'a>, handlebars: handlebars::Handlebars <'a>,
shutdown_watch_rx: watch::Receiver <bool>,
} }
fn status_reply <B: Into <Body>> (status: StatusCode, b: B) fn status_reply <B: Into <Body>> (status: StatusCode, b: B)
@ -45,7 +56,7 @@ fn status_reply <B: Into <Body>> (status: StatusCode, b: B)
} }
async fn handle_all (req: Request <Body>, state: Arc <ServerState <'static>>) async fn handle_all (req: Request <Body>, state: Arc <ServerState <'static>>)
-> Result <Response <Body>, Infallible> -> Result <Response <Body>, String>
{ {
let path = req.uri ().path (); let path = req.uri ().path ();
//println! ("{}", path); //println! ("{}", path);
@ -65,12 +76,19 @@ async fn handle_all (req: Request <Body>, state: Arc <ServerState <'static>>)
.as_ref () .as_ref ()
.unwrap_or (&default_root); .unwrap_or (&default_root);
let mut shutdown_watch_rx = state.shutdown_watch_rx.clone ();
if shutdown_watch_rx.recv ().await != Some (false) {
error! ("Can't serve, I'm shutting down");
panic! ("Can't serve, I'm shutting down");
}
let ptth_resp = file_server::serve_all ( let ptth_resp = file_server::serve_all (
&state.handlebars, &state.handlebars,
file_server_root, file_server_root,
ptth_req.method, ptth_req.method,
&ptth_req.uri, &ptth_req.uri,
&ptth_req.headers &ptth_req.headers,
Some (shutdown_watch_rx)
).await; ).await;
let mut resp = Response::builder () let mut resp = Response::builder ()
@ -111,18 +129,22 @@ async fn main () -> Result <(), Box <dyn Error>> {
let handlebars = file_server::load_templates ()?; let handlebars = file_server::load_templates ()?;
let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false);
let state = Arc::new (ServerState { let state = Arc::new (ServerState {
handlebars,
config: Config { config: Config {
file_server_root: config_file.file_server_root, file_server_root: config_file.file_server_root,
}, },
handlebars,
shutdown_watch_rx,
}); });
let make_svc = make_service_fn (|_conn| { let make_svc = make_service_fn (|_conn| {
let state = state.clone (); let state = state.clone ();
async { async {
Ok::<_, Infallible> (service_fn (move |req| { Ok::<_, String> (service_fn (move |req| {
let state = state.clone (); let state = state.clone ();
handle_all (req, state) handle_all (req, state)
@ -130,9 +152,30 @@ async fn main () -> Result <(), Box <dyn Error>> {
} }
}); });
let server = Server::bind (&addr).serve (make_svc); let shutdown_oneshot = ptth::graceful_shutdown::init ();
let (force_shutdown_tx, force_shutdown_rx) = oneshot::channel ();
server.await?; let server = Server::bind (&addr)
.serve (make_svc)
.with_graceful_shutdown (async move {
shutdown_oneshot.await.ok ();
info! ("Received graceful shutdown");
shutdown_watch_tx.broadcast (true).unwrap ();
force_shutdown_tx.send (()).unwrap ();
});
let force_shutdown_fut = async move {
force_shutdown_rx.await.unwrap ();
delay_for (Duration::from_secs (5)).await;
error! ("Forcing shutdown");
};
futures::select! {
x = server.fuse () => x?,
_ = force_shutdown_fut.fuse () => (),
};
Ok (()) Ok (())
} }

View File

@ -9,6 +9,7 @@ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
use futures::FutureExt;
use handlebars::Handlebars; use handlebars::Handlebars;
use tokio::{ use tokio::{
fs::{ fs::{
@ -20,6 +21,7 @@ use tokio::{
sync::mpsc::{ sync::mpsc::{
channel, channel,
}, },
sync::watch,
}; };
use tracing::{ use tracing::{
debug, error, info, trace, warn, debug, error, info, trace, warn,
@ -139,14 +141,15 @@ async fn serve_dir (
resp resp
} }
#[instrument (level = "debug", skip (f))] #[instrument (level = "debug", skip (f, cancel_rx))]
async fn serve_file ( async fn serve_file (
mut f: File, mut f: File,
should_send_body: bool, should_send_body: bool,
range_start: Option <u64>, range_start: Option <u64>,
range_end: Option <u64> range_end: Option <u64>,
mut cancel_rx: Option <watch::Receiver <bool>>
) -> http_serde::Response { ) -> http_serde::Response {
let (tx, rx) = channel (2); let (tx, rx) = channel (1);
let body = if should_send_body { let body = if should_send_body {
Some (rx) Some (rx)
} }
@ -169,6 +172,7 @@ async fn serve_file (
if should_send_body { if should_send_body {
tokio::spawn (async move { tokio::spawn (async move {
{
//println! ("Opening file {:?}", path); //println! ("Opening file {:?}", path);
let mut tx = tx; let mut tx = tx;
@ -187,7 +191,20 @@ async fn serve_file (
break; break;
} }
if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { let send_fut = tx.send (Ok::<_, Infallible> (buffer));
let send_result = match &mut cancel_rx {
Some (cancel_rx) => futures::select! {
x = send_fut.fuse () => x,
_ = cancel_rx.recv ().fuse () => {
error! ("Cancelled");
break;
},
},
None => send_fut.await,
};
if send_result.is_err () {
warn! ("Cancelling file stream (Sent {} out of {} bytes)", bytes_sent, end - start); warn! ("Cancelling file stream (Sent {} out of {} bytes)", bytes_sent, end - start);
break; break;
} }
@ -203,6 +220,8 @@ async fn serve_file (
//delay_for (Duration::from_millis (50)).await; //delay_for (Duration::from_millis (50)).await;
} }
}
debug! ("Exited stream scope");
}); });
} }
@ -249,6 +268,7 @@ pub async fn serve_all (
method: http_serde::Method, method: http_serde::Method,
uri: &str, uri: &str,
headers: &HashMap <String, Vec <u8>>, headers: &HashMap <String, Vec <u8>>,
cancel_rx: Option <watch::Receiver <bool>>
) )
-> http_serde::Response -> http_serde::Response
{ {
@ -297,7 +317,8 @@ pub async fn serve_all (
file, file,
should_send_body, should_send_body,
range_start, range_start,
range_end range_end,
cancel_rx
).await ).await
} }
else { else {

View File

@ -76,7 +76,8 @@ async fn handle_req_resp <'a> (
file_server_root, file_server_root,
parts.method, parts.method,
uri, uri,
&parts.headers &parts.headers,
None
).await ).await
} }
else { else {