From e0298a5289f83f4b0ce34247db6a679e80383b15 Mon Sep 17 00:00:00 2001 From: _ <> Date: Fri, 6 Nov 2020 23:43:52 +0000 Subject: [PATCH] :bug: Working on a bunch of bugs and error handling --- src/bin/ptth_relay.rs | 11 +- src/http_serde.rs | 4 +- src/relay/mod.rs | 236 +++++++++++++++++++++++++++----------- src/server/file_server.rs | 8 +- src/server/mod.rs | 53 ++++++--- todo.md | 1 + 6 files changed, 226 insertions(+), 87 deletions(-) diff --git a/src/bin/ptth_relay.rs b/src/bin/ptth_relay.rs index 8641805..9c6912b 100644 --- a/src/bin/ptth_relay.rs +++ b/src/bin/ptth_relay.rs @@ -4,13 +4,22 @@ use std::{ }; use tracing::{info}; +use tracing_subscriber::{ + fmt, + fmt::format::FmtSpan, + EnvFilter, +}; use ptth::relay; use ptth::relay::RelayState; #[tokio::main] async fn main () -> Result <(), Box > { - tracing_subscriber::fmt::init (); + fmt () + .with_env_filter (EnvFilter::from_default_env ()) + .with_span_events (FmtSpan::FULL) + .init () + ; let config_file = ptth::load_toml::load ("config/ptth_relay.toml"); diff --git a/src/http_serde.rs b/src/http_serde.rs index bc52547..2ef2fc5 100644 --- a/src/http_serde.rs +++ b/src/http_serde.rs @@ -24,6 +24,8 @@ impl From for Error { pub enum Method { Get, Head, + Post, + Put, } impl TryFrom for Method { @@ -42,7 +44,7 @@ impl TryFrom for Method { pub struct RequestParts { pub method: Method, - // Technically URIs are subtle and complex but I don't care + // Technically URIs are subtle and complex, but I don't care pub uri: String, // Technically Hyper has headers in a multi-map diff --git a/src/relay/mod.rs b/src/relay/mod.rs index 803e30f..6b2c1cd 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -5,12 +5,16 @@ use std::{ iter::FromIterator, net::SocketAddr, sync::{ - Arc + Arc, }, + time::Duration, }; use dashmap::DashMap; -use futures::stream::StreamExt; +use futures::{ + FutureExt, + stream::StreamExt, +}; use handlebars::Handlebars; use hyper::{ Body, @@ -31,9 +35,14 @@ use tokio::{ Mutex, mpsc, oneshot, + RwLock, }, + time::delay_for, +}; +use tracing::{ + debug, error, info, trace, + instrument, }; -use tracing::{debug, error, info, trace, warn}; use crate::{ http_serde, @@ -69,12 +78,17 @@ can be parked */ -enum RequestRendezvous { - ParkedClients (Vec ), - ParkedServer (oneshot::Sender ), +#[derive (Debug)] +enum RelayError { + RelayShuttingDown, } -type ResponseRendezvous = oneshot::Sender <(http_serde::ResponseParts, Body)>; +enum RequestRendezvous { + ParkedClients (Vec ), + ParkedServer (oneshot::Sender >), +} + +type ResponseRendezvous = oneshot::Sender >; // Stuff we need to load from the config file and use to // set up the HTTP server @@ -118,7 +132,7 @@ pub struct RelayState { request_rendezvous: Mutex >, // Key: Request ID - response_rendezvous: DashMap , + response_rendezvous: RwLock >, } impl Default for RelayState { @@ -151,10 +165,19 @@ impl RelayState { } } -fn status_reply > (status: StatusCode, b: B) +fn ok_reply > (b: B) -> Response { - Response::builder ().status (status).body (b.into ()).unwrap () + Response::builder ().status (StatusCode::OK).body (b.into ()).unwrap () +} + +fn error_reply (status: StatusCode, b: &str) +-> Response +{ + Response::builder () + .status (status) + .header ("content-type", "text/plain") + .body (format! ("{}\n", b).into ()).unwrap () } // Servers will come here and either handle queued requests from parked clients, @@ -167,7 +190,7 @@ async fn handle_http_listen ( ) -> Response { - let trip_error = status_reply (StatusCode::UNAUTHORIZED, "Bad X-ApiKey"); + let trip_error = error_reply (StatusCode::UNAUTHORIZED, "Bad X-ApiKey"); let expected_tripcode = match state.config.server_tripcodes.get (&watcher_code) { None => { @@ -192,23 +215,35 @@ async fn handle_http_listen ( if let Some (ParkedClients (v)) = request_rendezvous.remove (&watcher_code) { - // 1 or more clients were parked - Make the server - // handle them immediately - - return status_reply (StatusCode::OK, rmp_serde::to_vec (&v).unwrap ()); + if ! v.is_empty () { + // 1 or more clients were parked - Make the server + // handle them immediately + + debug! ("Sending {} parked requests to server {}", v.len (), watcher_code); + return ok_reply (rmp_serde::to_vec (&v).unwrap ()); + } } - request_rendezvous.insert (watcher_code, ParkedServer (tx)); + debug! ("Parking server {}", watcher_code); + request_rendezvous.insert (watcher_code.clone (), ParkedServer (tx)); } // No clients were parked - make the server long-poll - let one_req = match rx.await { - Ok (r) => r, - Err (_) => return status_reply (StatusCode::SERVICE_UNAVAILABLE, "Server is shutting down, try again soon"), - }; - - status_reply (StatusCode::OK, rmp_serde::to_vec (&vec! [one_req]).unwrap ()) + futures::select! { + x = rx.fuse () => match x { + Ok (Ok (one_req)) => { + debug! ("Unparking server {}", watcher_code); + ok_reply (rmp_serde::to_vec (&vec! [one_req]).unwrap ()) + }, + Ok (Err (RelayError::RelayShuttingDown)) => error_reply (StatusCode::SERVICE_UNAVAILABLE, "Server is shutting down, try again soon"), + Err (_) => error_reply (StatusCode::INTERNAL_SERVER_ERROR, "Server error"), + }, + _ = delay_for (Duration::from_secs (30)).fuse () => { + debug! ("Timed out http_listen for server {}", watcher_code); + return error_reply (StatusCode::NO_CONTENT, "No requests now, long-poll again") + } + } } // Servers will come here to stream responses to clients @@ -226,7 +261,15 @@ async fn handle_http_response ( // Intercept the body packets here so we can check when the stream // ends or errors out + #[derive (Debug)] + enum BodyFinishedReason { + StreamFinished, + ClientDisconnected, + } + use BodyFinishedReason::*; + let (mut body_tx, body_rx) = mpsc::channel (2); + let (body_finished_tx, body_finished_rx) = oneshot::channel (); spawn (async move { loop { @@ -237,13 +280,15 @@ async fn handle_http_response ( trace! ("Relaying {} bytes", bytes.len ()); } - if body_tx.send (item).await.is_err () { - error! ("Error relaying bytes"); + if let Err (_e) = body_tx.send (item).await { + info! ("Body closed while relaying. (Client hung up?)"); + body_finished_tx.send (ClientDisconnected).unwrap (); break; } } else { debug! ("Finished relaying bytes"); + body_finished_tx.send (StreamFinished).unwrap (); break; } } @@ -251,26 +296,32 @@ async fn handle_http_response ( let body = Body::wrap_stream (body_rx); - match state.response_rendezvous.remove (&req_id) { - Some ((_, tx)) => { - // UKAUFFY4 (Send half) - match tx.send ((resp_parts, body)) { - Ok (()) => { - debug! ("Responding to server"); - status_reply (StatusCode::OK, "http_response completed.") - }, - _ => { - let msg = "Failed to connect to client"; - error! (msg); - status_reply (StatusCode::BAD_GATEWAY, msg) - }, - } - - }, - None => { - error! ("Server tried to respond to non-existent request"); - status_reply (StatusCode::BAD_REQUEST, "Request ID not found in response_rendezvous") + let tx = { + let response_rendezvous = state.response_rendezvous.read ().await; + match response_rendezvous.remove (&req_id) { + None => { + error! ("Server tried to respond to non-existent request"); + return error_reply (StatusCode::BAD_REQUEST, "Request ID not found in response_rendezvous"); + }, + Some ((_, x)) => x, + } + }; + + // UKAUFFY4 (Send half) + if tx.send (Ok ((resp_parts, body))).is_err () { + let msg = "Failed to connect to client"; + error! (msg); + return error_reply (StatusCode::BAD_GATEWAY, msg); + } + + debug! ("Connected server to client for streaming."); + match body_finished_rx.await.unwrap () { + StreamFinished => { + error_reply (StatusCode::OK, "StreamFinished") }, + ClientDisconnected => { + error_reply (StatusCode::OK, "ClientDisconnected") + } } } @@ -286,24 +337,29 @@ async fn handle_http_request ( -> Response { if ! state.config.server_tripcodes.contains_key (&watcher_code) { - return status_reply (StatusCode::NOT_FOUND, "Unknown server"); + return error_reply (StatusCode::NOT_FOUND, "Unknown server"); } let req = match http_serde::RequestParts::from_hyper (req.method, uri, req.headers) { Ok (x) => x, - _ => return status_reply (StatusCode::BAD_REQUEST, "Bad request"), + _ => return error_reply (StatusCode::BAD_REQUEST, "Bad request"), }; let (tx, rx) = oneshot::channel (); - let id = ulid::Ulid::new ().to_string (); - state.response_rendezvous.insert (id.clone (), tx); + let req_id = ulid::Ulid::new ().to_string (); + { + let response_rendezvous = state.response_rendezvous.read ().await; + response_rendezvous.insert (req_id.clone (), tx); + } + + trace! ("Created request {}", req_id); { let mut request_rendezvous = state.request_rendezvous.lock ().await; let wrapped = http_serde::WrappedRequest { - id, + id: req_id.clone (), req, }; @@ -311,18 +367,38 @@ async fn handle_http_request ( let new_rendezvous = match request_rendezvous.remove (&watcher_code) { Some (ParkedClients (mut v)) => { + debug! ("Parking request {} ({} already queued)", req_id, v.len ()); v.push (wrapped); ParkedClients (v) }, Some (ParkedServer (s)) => { // If sending to the server fails, queue it - match s.send (wrapped) { - Ok (()) => ParkedClients (vec! []), - Err (wrapped) => ParkedClients (vec! [wrapped]), + match s.send (Ok (wrapped)) { + Ok (()) => { + // TODO: This can actually still fail, if the server + // disconnects right as we're sending this. + // Then what? + + debug! ( + "Sending request {} directly to server {}", + req_id, + watcher_code, + ); + + ParkedClients (vec! []) + }, + Err (Ok (wrapped)) => { + debug! ("Parking request {}", req_id); + ParkedClients (vec! [wrapped]) + }, + Err (_) => unreachable! (), } }, - None => ParkedClients (vec! [wrapped]), + None => { + debug! ("Parking request {}", req_id); + ParkedClients (vec! [wrapped]) + }, }; request_rendezvous.insert (watcher_code, new_rendezvous); @@ -333,13 +409,14 @@ async fn handle_http_request ( let received = tokio::select! { val = rx => val, () = timeout => { - return status_reply (StatusCode::GATEWAY_TIMEOUT, "Remote server never responded") + debug! ("Timed out request {}", req_id); + return error_reply (StatusCode::GATEWAY_TIMEOUT, "Remote server never responded") }, }; // UKAUFFY4 (Receive half) match received { - Ok ((parts, body)) => { + Ok (Ok ((parts, body))) => { let mut resp = Response::builder () .status (hyper::StatusCode::from (parts.status_code)); @@ -347,13 +424,22 @@ async fn handle_http_request ( resp = resp.header (&k, v); } + debug! ("Unparked request {}", req_id); + resp.body (body) .unwrap () }, - _ => status_reply (StatusCode::GATEWAY_TIMEOUT, "Remote server timed out"), + Ok (Err (RelayError::RelayShuttingDown)) => { + error_reply (StatusCode::GATEWAY_TIMEOUT, "Relay shutting down") + }, + Err (_) => { + debug! ("Responder sender dropped for request {}", req_id); + error_reply (StatusCode::GATEWAY_TIMEOUT, "Remote server timed out") + }, } } +#[instrument (level = "trace", skip (req, state))] async fn handle_all (req: Request , state: Arc ) -> Result , Infallible> { @@ -371,13 +457,13 @@ async fn handle_all (req: Request , state: Arc ) handle_http_response (req, state, request_code).await } else { - status_reply (StatusCode::BAD_REQUEST, "Can't POST this\n") + error_reply (StatusCode::BAD_REQUEST, "Can't POST this") }); } Ok (if let Some (listen_code) = prefix_match (path, "/7ZSFUKGV/http_listen/") { let api_key = match api_key { - None => return Ok (status_reply (StatusCode::UNAUTHORIZED, "Can't register as server without an API key")), + None => return Ok (error_reply (StatusCode::UNAUTHORIZED, "Can't register as server without an API key")), Some (x) => x, }; handle_http_listen (state, listen_code.into (), api_key.as_bytes ()).await @@ -409,7 +495,7 @@ async fn handle_all (req: Request , state: Arc ) }; let s = state.handlebars.render ("relay_server_list", &page).unwrap (); - status_reply (StatusCode::OK, s) + ok_reply (s) } else if let Some (idx) = rest.find ('/') { let listen_code = String::from (&rest [0..idx]); @@ -419,14 +505,14 @@ async fn handle_all (req: Request , state: Arc ) handle_http_request (parts, path, state, listen_code).await } else { - status_reply (StatusCode::BAD_REQUEST, "Bad URI format") + error_reply (StatusCode::BAD_REQUEST, "Bad URI format") } } else if path == "/frontend/relay_up_check" { - status_reply (StatusCode::OK, "Relay is up\n") + error_reply (StatusCode::OK, "Relay is up") } else { - status_reply (StatusCode::OK, "Hi\n") + error_reply (StatusCode::OK, "Hi") }) } @@ -485,13 +571,29 @@ pub async fn run_relay ( server.with_graceful_shutdown (async { shutdown_oneshot.await.ok (); - - state.response_rendezvous.clear (); - - let mut request_rendezvoux = state.request_rendezvous.lock ().await; - request_rendezvoux.clear (); - info! ("Received graceful shutdown"); + + use RelayError::*; + + let mut response_rendezvous = state.response_rendezvous.write ().await; + let mut swapped = DashMap::default (); + + std::mem::swap (&mut swapped, &mut response_rendezvous); + + for (_, sender) in swapped.into_iter () { + sender.send (Err (RelayShuttingDown)).ok (); + } + + let mut request_rendezvous = state.request_rendezvous.lock ().await; + + for (_, x) in request_rendezvous.drain () { + use RequestRendezvous::*; + + match x { + ParkedClients (_) => (), + ParkedServer (sender) => drop (sender.send (Err (RelayShuttingDown))), + } + } }).await?; info! ("Exiting"); diff --git a/src/server/file_server.rs b/src/server/file_server.rs index 1105993..11e4cdd 100644 --- a/src/server/file_server.rs +++ b/src/server/file_server.rs @@ -22,7 +22,7 @@ use tokio::{ }, }; use tracing::{ - debug, error, info, + debug, error, info, trace, warn, instrument, }; @@ -188,7 +188,7 @@ async fn serve_file ( } if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { - error! ("Send failed while streaming file ({} bytes sent)", bytes_sent); + warn! ("Cancelling file stream (Sent {} out of {} bytes)", bytes_sent, end - start); break; } @@ -199,7 +199,7 @@ async fn serve_file ( } bytes_sent += bytes_read; - debug! ("Sent {} bytes", bytes_sent); + trace! ("Sent {} bytes", bytes_sent); //delay_for (Duration::from_millis (50)).await; } @@ -242,7 +242,7 @@ async fn serve_error ( resp } -#[instrument (level = "debug", skip (handlebars))] +#[instrument (level = "debug", skip (handlebars, headers))] pub async fn serve_all ( handlebars: &Handlebars <'static>, root: &Path, diff --git a/src/server/mod.rs b/src/server/mod.rs index 59050ca..4c74381 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -16,7 +16,7 @@ use tokio::{ sync::oneshot, time::delay_for, }; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::{ http_serde, @@ -45,12 +45,6 @@ async fn handle_req_resp <'a> ( ) { //println! ("Step 1"); - if req_resp.status () != StatusCode::OK { - // TODO: Error handling - error! ("http_listen didn't respond with 200 OK"); - return; - } - let body = req_resp.bytes ().await.unwrap (); let wrapped_reqs: Vec = match rmp_serde::from_read_ref (&body) { @@ -112,7 +106,14 @@ async fn handle_req_resp <'a> ( let text = r.text ().await.unwrap (); debug! ("{:?} {:?}", status, text); }, - Err (e) => error! ("Err: {:?}", e), + Err (e) => { + if e.is_request () { + warn! ("Error while POSTing response. Client probably hung up."); + } + else { + error! ("Err: {:?}", e); + } + }, } }); @@ -154,7 +155,7 @@ pub async fn run_server ( let client = Client::builder () .default_headers (headers) - .timeout (Duration::from_secs (30)) + .timeout (Duration::from_secs (40)) .build ().unwrap (); let handlebars = file_server::load_templates ()?; @@ -171,6 +172,8 @@ pub async fn run_server ( let mut shutdown_oneshot = shutdown_oneshot.fuse (); loop { + // TODO: Extract loop body to function? + if backoff_delay > 0 { let mut delay = delay_for (Duration::from_millis (backoff_delay)).fuse (); @@ -199,22 +202,44 @@ pub async fn run_server ( let req_resp = match req_req { Err (e) => { - error! ("Err: {:?}", e); - backoff_delay = err_backoff_delay; + if e.is_timeout () { + error! ("Client-side timeout. Is an overly-aggressive firewall closing long-lived connections? Is the network flakey?"); + if backoff_delay != 0 { + debug! ("backoff_delay = 0"); + backoff_delay = 0; + } + } + else { + error! ("Err: {:?}", e); + if backoff_delay != err_backoff_delay { + error! ("Non-timeout issue, increasing backoff_delay"); + backoff_delay = err_backoff_delay; + } + } continue; }, Ok (r) => { - backoff_delay = 0; + if backoff_delay != 0 { + debug! ("backoff_delay = 0"); + backoff_delay = 0; + } r }, }; - if req_resp.status () != StatusCode::OK { + if req_resp.status () == StatusCode::NO_CONTENT { + debug! ("http_listen long poll timed out on the server, good."); + continue; + } + else if req_resp.status () != StatusCode::OK { error! ("{}", req_resp.status ()); let body = req_resp.bytes ().await.unwrap (); let body = String::from_utf8 (body.to_vec ()).unwrap (); error! ("{}", body); - backoff_delay = err_backoff_delay; + if backoff_delay != err_backoff_delay { + error! ("Non-timeout issue, increasing backoff_delay"); + backoff_delay = err_backoff_delay; + } continue; } diff --git a/todo.md b/todo.md index 259d4f9..37fed23 100644 --- a/todo.md +++ b/todo.md @@ -1,3 +1,4 @@ +- Relay doesn't always shut down _if_ accessed by Firefox? - Not working behind Nginx - Try sending the http_response "OK" _after_ the request body is received