🐛 Add forced shutdown to ptth_file_server.
parent
75177cec80
commit
f02e12aecc
|
@ -45,8 +45,6 @@ pub struct Config {
|
|||
struct ServerState <'a> {
|
||||
config: Config,
|
||||
handlebars: handlebars::Handlebars <'a>,
|
||||
|
||||
shutdown_watch_rx: watch::Receiver <bool>,
|
||||
}
|
||||
|
||||
fn status_reply <B: Into <Body>> (status: StatusCode, b: B)
|
||||
|
@ -76,19 +74,12 @@ async fn handle_all (req: Request <Body>, state: Arc <ServerState <'static>>)
|
|||
.as_ref ()
|
||||
.unwrap_or (&default_root);
|
||||
|
||||
let mut shutdown_watch_rx = state.shutdown_watch_rx.clone ();
|
||||
if shutdown_watch_rx.recv ().await != Some (false) {
|
||||
error! ("Can't serve, I'm shutting down");
|
||||
panic! ("Can't serve, I'm shutting down");
|
||||
}
|
||||
|
||||
let ptth_resp = file_server::serve_all (
|
||||
&state.handlebars,
|
||||
file_server_root,
|
||||
ptth_req.method,
|
||||
&ptth_req.uri,
|
||||
&ptth_req.headers,
|
||||
Some (shutdown_watch_rx)
|
||||
&ptth_req.headers
|
||||
).await;
|
||||
|
||||
let mut resp = Response::builder ()
|
||||
|
@ -129,15 +120,11 @@ async fn main () -> Result <(), Box <dyn Error>> {
|
|||
|
||||
let handlebars = file_server::load_templates ()?;
|
||||
|
||||
let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false);
|
||||
|
||||
let state = Arc::new (ServerState {
|
||||
config: Config {
|
||||
file_server_root: config_file.file_server_root,
|
||||
},
|
||||
handlebars,
|
||||
|
||||
shutdown_watch_rx,
|
||||
});
|
||||
|
||||
let make_svc = make_service_fn (|_conn| {
|
||||
|
@ -161,13 +148,15 @@ async fn main () -> Result <(), Box <dyn Error>> {
|
|||
shutdown_oneshot.await.ok ();
|
||||
info! ("Received graceful shutdown");
|
||||
|
||||
shutdown_watch_tx.broadcast (true).unwrap ();
|
||||
// Kick off a timer for a forced shutdown
|
||||
force_shutdown_tx.send (()).unwrap ();
|
||||
});
|
||||
|
||||
let force_shutdown_fut = async move {
|
||||
force_shutdown_rx.await.unwrap ();
|
||||
delay_for (Duration::from_secs (5)).await;
|
||||
let timeout = 10;
|
||||
info! ("Forced shutdown in {} seconds", timeout);
|
||||
delay_for (Duration::from_secs (timeout)).await;
|
||||
|
||||
error! ("Forcing shutdown");
|
||||
};
|
||||
|
|
|
@ -141,13 +141,12 @@ async fn serve_dir (
|
|||
resp
|
||||
}
|
||||
|
||||
#[instrument (level = "debug", skip (f, cancel_rx))]
|
||||
#[instrument (level = "debug", skip (f))]
|
||||
async fn serve_file (
|
||||
mut f: File,
|
||||
should_send_body: bool,
|
||||
range_start: Option <u64>,
|
||||
range_end: Option <u64>,
|
||||
mut cancel_rx: Option <watch::Receiver <bool>>
|
||||
range_end: Option <u64>
|
||||
) -> http_serde::Response {
|
||||
let (tx, rx) = channel (1);
|
||||
let body = if should_send_body {
|
||||
|
@ -172,7 +171,6 @@ async fn serve_file (
|
|||
|
||||
if should_send_body {
|
||||
tokio::spawn (async move {
|
||||
{
|
||||
//println! ("Opening file {:?}", path);
|
||||
|
||||
let mut tx = tx;
|
||||
|
@ -191,20 +189,7 @@ async fn serve_file (
|
|||
break;
|
||||
}
|
||||
|
||||
let send_fut = tx.send (Ok::<_, Infallible> (buffer));
|
||||
|
||||
let send_result = match &mut cancel_rx {
|
||||
Some (cancel_rx) => futures::select! {
|
||||
x = send_fut.fuse () => x,
|
||||
_ = cancel_rx.recv ().fuse () => {
|
||||
error! ("Cancelled");
|
||||
break;
|
||||
},
|
||||
},
|
||||
None => send_fut.await,
|
||||
};
|
||||
|
||||
if send_result.is_err () {
|
||||
if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () {
|
||||
warn! ("Cancelling file stream (Sent {} out of {} bytes)", bytes_sent, end - start);
|
||||
break;
|
||||
}
|
||||
|
@ -220,8 +205,6 @@ async fn serve_file (
|
|||
|
||||
//delay_for (Duration::from_millis (50)).await;
|
||||
}
|
||||
}
|
||||
debug! ("Exited stream scope");
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -267,8 +250,7 @@ pub async fn serve_all (
|
|||
root: &Path,
|
||||
method: http_serde::Method,
|
||||
uri: &str,
|
||||
headers: &HashMap <String, Vec <u8>>,
|
||||
cancel_rx: Option <watch::Receiver <bool>>
|
||||
headers: &HashMap <String, Vec <u8>>
|
||||
)
|
||||
-> http_serde::Response
|
||||
{
|
||||
|
@ -317,8 +299,7 @@ pub async fn serve_all (
|
|||
file,
|
||||
should_send_body,
|
||||
range_start,
|
||||
range_end,
|
||||
cancel_rx
|
||||
range_end
|
||||
).await
|
||||
}
|
||||
else {
|
||||
|
|
|
@ -76,8 +76,7 @@ async fn handle_req_resp <'a> (
|
|||
file_server_root,
|
||||
parts.method,
|
||||
uri,
|
||||
&parts.headers,
|
||||
None
|
||||
&parts.headers
|
||||
).await
|
||||
}
|
||||
else {
|
||||
|
|
Loading…
Reference in New Issue