🚧 Still hunting a bug where the relay can't shut down if Firefox

is connected
main
_ 2020-11-07 00:30:56 +00:00
parent e0298a5289
commit 32798e8250
4 changed files with 86 additions and 38 deletions

View File

@ -159,7 +159,6 @@ For now, either email me (if you know me personally) or make a pull request to a
## License ## License
PTTH is licensed under the PTTH is licensed under the
[GNU AGPLv3](https://www.gnu.org/licenses/agpl-3.0.html), [GNU AGPLv3](https://www.gnu.org/licenses/agpl-3.0.html)
with an exception for my current employer.
Copyright 2020 "Trish" Copyright 2020 "Trish"

View File

@ -17,6 +17,10 @@ use hyper::{
}, },
StatusCode, StatusCode,
}; };
use serde::Deserialize;
use tracing::{
debug, info, trace, warn,
};
use ptth::{ use ptth::{
http_serde::RequestParts, http_serde::RequestParts,
@ -24,8 +28,14 @@ use ptth::{
server::file_server, server::file_server,
}; };
#[derive (Default)]
pub struct Config {
pub file_server_root: Option <PathBuf>,
}
struct ServerState <'a> { struct ServerState <'a> {
handlebars: Arc <handlebars::Handlebars <'a>>, config: Config,
handlebars: handlebars::Handlebars <'a>,
} }
fn status_reply <B: Into <Body>> (status: StatusCode, b: B) fn status_reply <B: Into <Body>> (status: StatusCode, b: B)
@ -41,8 +51,6 @@ async fn handle_all (req: Request <Body>, state: Arc <ServerState <'static>>)
//println! ("{}", path); //println! ("{}", path);
if let Some (path) = prefix_match (path, "/files") { if let Some (path) = prefix_match (path, "/files") {
let root = PathBuf::from ("./");
let path = path.into (); let path = path.into ();
let (parts, _) = req.into_parts (); let (parts, _) = req.into_parts ();
@ -52,7 +60,18 @@ async fn handle_all (req: Request <Body>, state: Arc <ServerState <'static>>)
_ => return Ok (status_reply (StatusCode::BAD_REQUEST, "Bad request")), _ => return Ok (status_reply (StatusCode::BAD_REQUEST, "Bad request")),
}; };
let ptth_resp = file_server::serve_all (&state.handlebars, &root, ptth_req.method, &ptth_req.uri, &ptth_req.headers).await; let default_root = PathBuf::from ("./");
let file_server_root: &std::path::Path = state.config.file_server_root
.as_ref ()
.unwrap_or (&default_root);
let ptth_resp = file_server::serve_all (
&state.handlebars,
file_server_root,
ptth_req.method,
&ptth_req.uri,
&ptth_req.headers
).await;
let mut resp = Response::builder () let mut resp = Response::builder ()
.status (StatusCode::from (ptth_resp.parts.status_code)); .status (StatusCode::from (ptth_resp.parts.status_code));
@ -77,14 +96,26 @@ async fn handle_all (req: Request <Body>, state: Arc <ServerState <'static>>)
} }
} }
#[derive (Deserialize)]
pub struct ConfigFile {
pub file_server_root: Option <PathBuf>,
}
#[tokio::main] #[tokio::main]
async fn main () -> Result <(), Box <dyn Error>> { async fn main () -> Result <(), Box <dyn Error>> {
tracing_subscriber::fmt::init ();
let config_file: ConfigFile = ptth::load_toml::load ("config/ptth_server.toml");
info! ("file_server_root: {:?}", config_file.file_server_root);
let addr = SocketAddr::from(([0, 0, 0, 0], 4000)); let addr = SocketAddr::from(([0, 0, 0, 0], 4000));
let handlebars = Arc::new (file_server::load_templates ()?); let handlebars = file_server::load_templates ()?;
let state = Arc::new (ServerState { let state = Arc::new (ServerState {
handlebars, handlebars,
config: Config {
file_server_root: config_file.file_server_root,
},
}); });
let make_svc = make_service_fn (|_conn| { let make_svc = make_service_fn (|_conn| {

View File

@ -14,7 +14,6 @@ struct Opt {
#[tokio::main] #[tokio::main]
async fn main () -> Result <(), Box <dyn Error>> { async fn main () -> Result <(), Box <dyn Error>> {
tracing_subscriber::fmt::init (); tracing_subscriber::fmt::init ();
let config_file = ptth::load_toml::load ("config/ptth_server.toml"); let config_file = ptth::load_toml::load ("config/ptth_server.toml");
ptth::server::run_server ( ptth::server::run_server (

View File

@ -36,6 +36,7 @@ use tokio::{
mpsc, mpsc,
oneshot, oneshot,
RwLock, RwLock,
watch,
}, },
time::delay_for, time::delay_for,
}; };
@ -133,26 +134,22 @@ pub struct RelayState {
// Key: Request ID // Key: Request ID
response_rendezvous: RwLock <DashMap <String, ResponseRendezvous>>, response_rendezvous: RwLock <DashMap <String, ResponseRendezvous>>,
}
impl Default for RelayState { shutdown_watch_tx: watch::Sender <bool>,
fn default () -> Self { shutdown_watch_rx: watch::Receiver <bool>,
Self {
config: Config::from (&ConfigFile::default ()),
handlebars: Arc::new (load_templates ().unwrap ()),
request_rendezvous: Default::default (),
response_rendezvous: Default::default (),
}
}
} }
impl From <&ConfigFile> for RelayState { impl From <&ConfigFile> for RelayState {
fn from (config_file: &ConfigFile) -> Self { fn from (config_file: &ConfigFile) -> Self {
let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false);
Self { Self {
config: Config::from (config_file), config: Config::from (config_file),
handlebars: Arc::new (load_templates ().unwrap ()), handlebars: Arc::new (load_templates ().unwrap ()),
request_rendezvous: Default::default (), request_rendezvous: Default::default (),
response_rendezvous: Default::default (), response_rendezvous: Default::default (),
shutdown_watch_tx,
shutdown_watch_rx,
} }
} }
} }
@ -270,8 +267,10 @@ async fn handle_http_response (
let (mut body_tx, body_rx) = mpsc::channel (2); let (mut body_tx, body_rx) = mpsc::channel (2);
let (body_finished_tx, body_finished_rx) = oneshot::channel (); let (body_finished_tx, body_finished_rx) = oneshot::channel ();
let mut shutdown_watch_rx = state.shutdown_watch_rx.clone ();
spawn (async move { spawn (async move {
if shutdown_watch_rx.recv ().await == Some (false) {
loop { loop {
let item = body.next ().await; let item = body.next ().await;
@ -280,10 +279,16 @@ async fn handle_http_response (
trace! ("Relaying {} bytes", bytes.len ()); trace! ("Relaying {} bytes", bytes.len ());
} }
if let Err (_e) = body_tx.send (item).await { futures::select! {
x = body_tx.send (item).fuse () => if let Err (_) = x {
info! ("Body closed while relaying. (Client hung up?)"); info! ("Body closed while relaying. (Client hung up?)");
body_finished_tx.send (ClientDisconnected).unwrap (); body_finished_tx.send (ClientDisconnected).unwrap ();
break; break;
},
_ = shutdown_watch_rx.recv ().fuse () => {
debug! ("Closing stream: relay is shutting down");
break;
},
} }
} }
else { else {
@ -292,6 +297,10 @@ async fn handle_http_response (
break; break;
} }
} }
}
else {
debug! ("Can't relay bytes, relay is shutting down");
}
}); });
let body = Body::wrap_stream (body_rx); let body = Body::wrap_stream (body_rx);
@ -315,13 +324,17 @@ async fn handle_http_response (
} }
debug! ("Connected server to client for streaming."); debug! ("Connected server to client for streaming.");
match body_finished_rx.await.unwrap () { match body_finished_rx.await {
StreamFinished => { Ok (StreamFinished) => {
error_reply (StatusCode::OK, "StreamFinished") error_reply (StatusCode::OK, "StreamFinished")
}, },
ClientDisconnected => { Ok (ClientDisconnected) => {
error_reply (StatusCode::OK, "ClientDisconnected") error_reply (StatusCode::OK, "ClientDisconnected")
} },
Err (e) => {
debug! ("body_finished_rx {}", e);
error_reply (StatusCode::OK, "body_finished_rx Err")
},
} }
} }
@ -569,10 +582,14 @@ pub async fn run_relay (
let server = Server::bind (&addr) let server = Server::bind (&addr)
.serve (make_svc); .serve (make_svc);
server.with_graceful_shutdown (async { server.with_graceful_shutdown (async {
shutdown_oneshot.await.ok (); shutdown_oneshot.await.ok ();
info! ("Received graceful shutdown"); info! ("Received graceful shutdown");
state.shutdown_watch_tx.broadcast (true).unwrap ();
use RelayError::*; use RelayError::*;
let mut response_rendezvous = state.response_rendezvous.write ().await; let mut response_rendezvous = state.response_rendezvous.write ().await;
@ -594,6 +611,8 @@ pub async fn run_relay (
ParkedServer (sender) => drop (sender.send (Err (RelayShuttingDown))), ParkedServer (sender) => drop (sender.send (Err (RelayShuttingDown))),
} }
} }
info! ("Performed all cleanup");
}).await?; }).await?;
info! ("Exiting"); info! ("Exiting");