🐛 Working on a bunch of bugs and error handling

main
_ 2020-11-06 23:43:52 +00:00
parent 3786cec8ab
commit e0298a5289
6 changed files with 226 additions and 87 deletions

View File

@ -4,13 +4,22 @@ use std::{
}; };
use tracing::{info}; use tracing::{info};
use tracing_subscriber::{
fmt,
fmt::format::FmtSpan,
EnvFilter,
};
use ptth::relay; use ptth::relay;
use ptth::relay::RelayState; use ptth::relay::RelayState;
#[tokio::main] #[tokio::main]
async fn main () -> Result <(), Box <dyn Error>> { async fn main () -> Result <(), Box <dyn Error>> {
tracing_subscriber::fmt::init (); fmt ()
.with_env_filter (EnvFilter::from_default_env ())
.with_span_events (FmtSpan::FULL)
.init ()
;
let config_file = ptth::load_toml::load ("config/ptth_relay.toml"); let config_file = ptth::load_toml::load ("config/ptth_relay.toml");

View File

@ -24,6 +24,8 @@ impl From <hyper::header::InvalidHeaderName> for Error {
pub enum Method { pub enum Method {
Get, Get,
Head, Head,
Post,
Put,
} }
impl TryFrom <hyper::Method> for Method { impl TryFrom <hyper::Method> for Method {
@ -42,7 +44,7 @@ impl TryFrom <hyper::Method> for Method {
pub struct RequestParts { pub struct RequestParts {
pub method: Method, pub method: Method,
// Technically URIs are subtle and complex but I don't care // Technically URIs are subtle and complex, but I don't care
pub uri: String, pub uri: String,
// Technically Hyper has headers in a multi-map // Technically Hyper has headers in a multi-map

View File

@ -5,12 +5,16 @@ use std::{
iter::FromIterator, iter::FromIterator,
net::SocketAddr, net::SocketAddr,
sync::{ sync::{
Arc Arc,
}, },
time::Duration,
}; };
use dashmap::DashMap; use dashmap::DashMap;
use futures::stream::StreamExt; use futures::{
FutureExt,
stream::StreamExt,
};
use handlebars::Handlebars; use handlebars::Handlebars;
use hyper::{ use hyper::{
Body, Body,
@ -31,9 +35,14 @@ use tokio::{
Mutex, Mutex,
mpsc, mpsc,
oneshot, oneshot,
RwLock,
}, },
time::delay_for,
};
use tracing::{
debug, error, info, trace,
instrument,
}; };
use tracing::{debug, error, info, trace, warn};
use crate::{ use crate::{
http_serde, http_serde,
@ -69,12 +78,17 @@ can be parked
*/ */
enum RequestRendezvous { #[derive (Debug)]
ParkedClients (Vec <http_serde::WrappedRequest>), enum RelayError {
ParkedServer (oneshot::Sender <http_serde::WrappedRequest>), RelayShuttingDown,
} }
type ResponseRendezvous = oneshot::Sender <(http_serde::ResponseParts, Body)>; enum RequestRendezvous {
ParkedClients (Vec <http_serde::WrappedRequest>),
ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, RelayError>>),
}
type ResponseRendezvous = oneshot::Sender <Result <(http_serde::ResponseParts, Body), RelayError>>;
// Stuff we need to load from the config file and use to // Stuff we need to load from the config file and use to
// set up the HTTP server // set up the HTTP server
@ -118,7 +132,7 @@ pub struct RelayState {
request_rendezvous: Mutex <HashMap <String, RequestRendezvous>>, request_rendezvous: Mutex <HashMap <String, RequestRendezvous>>,
// Key: Request ID // Key: Request ID
response_rendezvous: DashMap <String, ResponseRendezvous>, response_rendezvous: RwLock <DashMap <String, ResponseRendezvous>>,
} }
impl Default for RelayState { impl Default for RelayState {
@ -151,10 +165,19 @@ impl RelayState {
} }
} }
fn status_reply <B: Into <Body>> (status: StatusCode, b: B) fn ok_reply <B: Into <Body>> (b: B)
-> Response <Body> -> Response <Body>
{ {
Response::builder ().status (status).body (b.into ()).unwrap () Response::builder ().status (StatusCode::OK).body (b.into ()).unwrap ()
}
fn error_reply (status: StatusCode, b: &str)
-> Response <Body>
{
Response::builder ()
.status (status)
.header ("content-type", "text/plain")
.body (format! ("{}\n", b).into ()).unwrap ()
} }
// Servers will come here and either handle queued requests from parked clients, // Servers will come here and either handle queued requests from parked clients,
@ -167,7 +190,7 @@ async fn handle_http_listen (
) )
-> Response <Body> -> Response <Body>
{ {
let trip_error = status_reply (StatusCode::UNAUTHORIZED, "Bad X-ApiKey"); let trip_error = error_reply (StatusCode::UNAUTHORIZED, "Bad X-ApiKey");
let expected_tripcode = match state.config.server_tripcodes.get (&watcher_code) { let expected_tripcode = match state.config.server_tripcodes.get (&watcher_code) {
None => { None => {
@ -192,23 +215,35 @@ async fn handle_http_listen (
if let Some (ParkedClients (v)) = request_rendezvous.remove (&watcher_code) if let Some (ParkedClients (v)) = request_rendezvous.remove (&watcher_code)
{ {
// 1 or more clients were parked - Make the server if ! v.is_empty () {
// handle them immediately // 1 or more clients were parked - Make the server
// handle them immediately
return status_reply (StatusCode::OK, rmp_serde::to_vec (&v).unwrap ());
debug! ("Sending {} parked requests to server {}", v.len (), watcher_code);
return ok_reply (rmp_serde::to_vec (&v).unwrap ());
}
} }
request_rendezvous.insert (watcher_code, ParkedServer (tx)); debug! ("Parking server {}", watcher_code);
request_rendezvous.insert (watcher_code.clone (), ParkedServer (tx));
} }
// No clients were parked - make the server long-poll // No clients were parked - make the server long-poll
let one_req = match rx.await { futures::select! {
Ok (r) => r, x = rx.fuse () => match x {
Err (_) => return status_reply (StatusCode::SERVICE_UNAVAILABLE, "Server is shutting down, try again soon"), Ok (Ok (one_req)) => {
}; debug! ("Unparking server {}", watcher_code);
ok_reply (rmp_serde::to_vec (&vec! [one_req]).unwrap ())
status_reply (StatusCode::OK, rmp_serde::to_vec (&vec! [one_req]).unwrap ()) },
Ok (Err (RelayError::RelayShuttingDown)) => error_reply (StatusCode::SERVICE_UNAVAILABLE, "Server is shutting down, try again soon"),
Err (_) => error_reply (StatusCode::INTERNAL_SERVER_ERROR, "Server error"),
},
_ = delay_for (Duration::from_secs (30)).fuse () => {
debug! ("Timed out http_listen for server {}", watcher_code);
return error_reply (StatusCode::NO_CONTENT, "No requests now, long-poll again")
}
}
} }
// Servers will come here to stream responses to clients // Servers will come here to stream responses to clients
@ -226,7 +261,15 @@ async fn handle_http_response (
// Intercept the body packets here so we can check when the stream // Intercept the body packets here so we can check when the stream
// ends or errors out // ends or errors out
#[derive (Debug)]
enum BodyFinishedReason {
StreamFinished,
ClientDisconnected,
}
use BodyFinishedReason::*;
let (mut body_tx, body_rx) = mpsc::channel (2); let (mut body_tx, body_rx) = mpsc::channel (2);
let (body_finished_tx, body_finished_rx) = oneshot::channel ();
spawn (async move { spawn (async move {
loop { loop {
@ -237,13 +280,15 @@ async fn handle_http_response (
trace! ("Relaying {} bytes", bytes.len ()); trace! ("Relaying {} bytes", bytes.len ());
} }
if body_tx.send (item).await.is_err () { if let Err (_e) = body_tx.send (item).await {
error! ("Error relaying bytes"); info! ("Body closed while relaying. (Client hung up?)");
body_finished_tx.send (ClientDisconnected).unwrap ();
break; break;
} }
} }
else { else {
debug! ("Finished relaying bytes"); debug! ("Finished relaying bytes");
body_finished_tx.send (StreamFinished).unwrap ();
break; break;
} }
} }
@ -251,26 +296,32 @@ async fn handle_http_response (
let body = Body::wrap_stream (body_rx); let body = Body::wrap_stream (body_rx);
match state.response_rendezvous.remove (&req_id) { let tx = {
Some ((_, tx)) => { let response_rendezvous = state.response_rendezvous.read ().await;
// UKAUFFY4 (Send half) match response_rendezvous.remove (&req_id) {
match tx.send ((resp_parts, body)) { None => {
Ok (()) => { error! ("Server tried to respond to non-existent request");
debug! ("Responding to server"); return error_reply (StatusCode::BAD_REQUEST, "Request ID not found in response_rendezvous");
status_reply (StatusCode::OK, "http_response completed.") },
}, Some ((_, x)) => x,
_ => { }
let msg = "Failed to connect to client"; };
error! (msg);
status_reply (StatusCode::BAD_GATEWAY, msg) // UKAUFFY4 (Send half)
}, if tx.send (Ok ((resp_parts, body))).is_err () {
} let msg = "Failed to connect to client";
error! (msg);
}, return error_reply (StatusCode::BAD_GATEWAY, msg);
None => { }
error! ("Server tried to respond to non-existent request");
status_reply (StatusCode::BAD_REQUEST, "Request ID not found in response_rendezvous") debug! ("Connected server to client for streaming.");
match body_finished_rx.await.unwrap () {
StreamFinished => {
error_reply (StatusCode::OK, "StreamFinished")
}, },
ClientDisconnected => {
error_reply (StatusCode::OK, "ClientDisconnected")
}
} }
} }
@ -286,24 +337,29 @@ async fn handle_http_request (
-> Response <Body> -> Response <Body>
{ {
if ! state.config.server_tripcodes.contains_key (&watcher_code) { if ! state.config.server_tripcodes.contains_key (&watcher_code) {
return status_reply (StatusCode::NOT_FOUND, "Unknown server"); return error_reply (StatusCode::NOT_FOUND, "Unknown server");
} }
let req = match http_serde::RequestParts::from_hyper (req.method, uri, req.headers) { let req = match http_serde::RequestParts::from_hyper (req.method, uri, req.headers) {
Ok (x) => x, Ok (x) => x,
_ => return status_reply (StatusCode::BAD_REQUEST, "Bad request"), _ => return error_reply (StatusCode::BAD_REQUEST, "Bad request"),
}; };
let (tx, rx) = oneshot::channel (); let (tx, rx) = oneshot::channel ();
let id = ulid::Ulid::new ().to_string (); let req_id = ulid::Ulid::new ().to_string ();
state.response_rendezvous.insert (id.clone (), tx); {
let response_rendezvous = state.response_rendezvous.read ().await;
response_rendezvous.insert (req_id.clone (), tx);
}
trace! ("Created request {}", req_id);
{ {
let mut request_rendezvous = state.request_rendezvous.lock ().await; let mut request_rendezvous = state.request_rendezvous.lock ().await;
let wrapped = http_serde::WrappedRequest { let wrapped = http_serde::WrappedRequest {
id, id: req_id.clone (),
req, req,
}; };
@ -311,18 +367,38 @@ async fn handle_http_request (
let new_rendezvous = match request_rendezvous.remove (&watcher_code) { let new_rendezvous = match request_rendezvous.remove (&watcher_code) {
Some (ParkedClients (mut v)) => { Some (ParkedClients (mut v)) => {
debug! ("Parking request {} ({} already queued)", req_id, v.len ());
v.push (wrapped); v.push (wrapped);
ParkedClients (v) ParkedClients (v)
}, },
Some (ParkedServer (s)) => { Some (ParkedServer (s)) => {
// If sending to the server fails, queue it // If sending to the server fails, queue it
match s.send (wrapped) { match s.send (Ok (wrapped)) {
Ok (()) => ParkedClients (vec! []), Ok (()) => {
Err (wrapped) => ParkedClients (vec! [wrapped]), // TODO: This can actually still fail, if the server
// disconnects right as we're sending this.
// Then what?
debug! (
"Sending request {} directly to server {}",
req_id,
watcher_code,
);
ParkedClients (vec! [])
},
Err (Ok (wrapped)) => {
debug! ("Parking request {}", req_id);
ParkedClients (vec! [wrapped])
},
Err (_) => unreachable! (),
} }
}, },
None => ParkedClients (vec! [wrapped]), None => {
debug! ("Parking request {}", req_id);
ParkedClients (vec! [wrapped])
},
}; };
request_rendezvous.insert (watcher_code, new_rendezvous); request_rendezvous.insert (watcher_code, new_rendezvous);
@ -333,13 +409,14 @@ async fn handle_http_request (
let received = tokio::select! { let received = tokio::select! {
val = rx => val, val = rx => val,
() = timeout => { () = timeout => {
return status_reply (StatusCode::GATEWAY_TIMEOUT, "Remote server never responded") debug! ("Timed out request {}", req_id);
return error_reply (StatusCode::GATEWAY_TIMEOUT, "Remote server never responded")
}, },
}; };
// UKAUFFY4 (Receive half) // UKAUFFY4 (Receive half)
match received { match received {
Ok ((parts, body)) => { Ok (Ok ((parts, body))) => {
let mut resp = Response::builder () let mut resp = Response::builder ()
.status (hyper::StatusCode::from (parts.status_code)); .status (hyper::StatusCode::from (parts.status_code));
@ -347,13 +424,22 @@ async fn handle_http_request (
resp = resp.header (&k, v); resp = resp.header (&k, v);
} }
debug! ("Unparked request {}", req_id);
resp.body (body) resp.body (body)
.unwrap () .unwrap ()
}, },
_ => status_reply (StatusCode::GATEWAY_TIMEOUT, "Remote server timed out"), Ok (Err (RelayError::RelayShuttingDown)) => {
error_reply (StatusCode::GATEWAY_TIMEOUT, "Relay shutting down")
},
Err (_) => {
debug! ("Responder sender dropped for request {}", req_id);
error_reply (StatusCode::GATEWAY_TIMEOUT, "Remote server timed out")
},
} }
} }
#[instrument (level = "trace", skip (req, state))]
async fn handle_all (req: Request <Body>, state: Arc <RelayState>) async fn handle_all (req: Request <Body>, state: Arc <RelayState>)
-> Result <Response <Body>, Infallible> -> Result <Response <Body>, Infallible>
{ {
@ -371,13 +457,13 @@ async fn handle_all (req: Request <Body>, state: Arc <RelayState>)
handle_http_response (req, state, request_code).await handle_http_response (req, state, request_code).await
} }
else { else {
status_reply (StatusCode::BAD_REQUEST, "Can't POST this\n") error_reply (StatusCode::BAD_REQUEST, "Can't POST this")
}); });
} }
Ok (if let Some (listen_code) = prefix_match (path, "/7ZSFUKGV/http_listen/") { Ok (if let Some (listen_code) = prefix_match (path, "/7ZSFUKGV/http_listen/") {
let api_key = match api_key { let api_key = match api_key {
None => return Ok (status_reply (StatusCode::UNAUTHORIZED, "Can't register as server without an API key")), None => return Ok (error_reply (StatusCode::UNAUTHORIZED, "Can't register as server without an API key")),
Some (x) => x, Some (x) => x,
}; };
handle_http_listen (state, listen_code.into (), api_key.as_bytes ()).await handle_http_listen (state, listen_code.into (), api_key.as_bytes ()).await
@ -409,7 +495,7 @@ async fn handle_all (req: Request <Body>, state: Arc <RelayState>)
}; };
let s = state.handlebars.render ("relay_server_list", &page).unwrap (); let s = state.handlebars.render ("relay_server_list", &page).unwrap ();
status_reply (StatusCode::OK, s) ok_reply (s)
} }
else if let Some (idx) = rest.find ('/') { else if let Some (idx) = rest.find ('/') {
let listen_code = String::from (&rest [0..idx]); let listen_code = String::from (&rest [0..idx]);
@ -419,14 +505,14 @@ async fn handle_all (req: Request <Body>, state: Arc <RelayState>)
handle_http_request (parts, path, state, listen_code).await handle_http_request (parts, path, state, listen_code).await
} }
else { else {
status_reply (StatusCode::BAD_REQUEST, "Bad URI format") error_reply (StatusCode::BAD_REQUEST, "Bad URI format")
} }
} }
else if path == "/frontend/relay_up_check" { else if path == "/frontend/relay_up_check" {
status_reply (StatusCode::OK, "Relay is up\n") error_reply (StatusCode::OK, "Relay is up")
} }
else { else {
status_reply (StatusCode::OK, "Hi\n") error_reply (StatusCode::OK, "Hi")
}) })
} }
@ -485,13 +571,29 @@ pub async fn run_relay (
server.with_graceful_shutdown (async { server.with_graceful_shutdown (async {
shutdown_oneshot.await.ok (); shutdown_oneshot.await.ok ();
state.response_rendezvous.clear ();
let mut request_rendezvoux = state.request_rendezvous.lock ().await;
request_rendezvoux.clear ();
info! ("Received graceful shutdown"); info! ("Received graceful shutdown");
use RelayError::*;
let mut response_rendezvous = state.response_rendezvous.write ().await;
let mut swapped = DashMap::default ();
std::mem::swap (&mut swapped, &mut response_rendezvous);
for (_, sender) in swapped.into_iter () {
sender.send (Err (RelayShuttingDown)).ok ();
}
let mut request_rendezvous = state.request_rendezvous.lock ().await;
for (_, x) in request_rendezvous.drain () {
use RequestRendezvous::*;
match x {
ParkedClients (_) => (),
ParkedServer (sender) => drop (sender.send (Err (RelayShuttingDown))),
}
}
}).await?; }).await?;
info! ("Exiting"); info! ("Exiting");

View File

@ -22,7 +22,7 @@ use tokio::{
}, },
}; };
use tracing::{ use tracing::{
debug, error, info, debug, error, info, trace, warn,
instrument, instrument,
}; };
@ -188,7 +188,7 @@ async fn serve_file (
} }
if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () {
error! ("Send failed while streaming file ({} bytes sent)", bytes_sent); warn! ("Cancelling file stream (Sent {} out of {} bytes)", bytes_sent, end - start);
break; break;
} }
@ -199,7 +199,7 @@ async fn serve_file (
} }
bytes_sent += bytes_read; bytes_sent += bytes_read;
debug! ("Sent {} bytes", bytes_sent); trace! ("Sent {} bytes", bytes_sent);
//delay_for (Duration::from_millis (50)).await; //delay_for (Duration::from_millis (50)).await;
} }
@ -242,7 +242,7 @@ async fn serve_error (
resp resp
} }
#[instrument (level = "debug", skip (handlebars))] #[instrument (level = "debug", skip (handlebars, headers))]
pub async fn serve_all ( pub async fn serve_all (
handlebars: &Handlebars <'static>, handlebars: &Handlebars <'static>,
root: &Path, root: &Path,

View File

@ -16,7 +16,7 @@ use tokio::{
sync::oneshot, sync::oneshot,
time::delay_for, time::delay_for,
}; };
use tracing::{debug, error, info}; use tracing::{debug, error, info, warn};
use crate::{ use crate::{
http_serde, http_serde,
@ -45,12 +45,6 @@ async fn handle_req_resp <'a> (
) { ) {
//println! ("Step 1"); //println! ("Step 1");
if req_resp.status () != StatusCode::OK {
// TODO: Error handling
error! ("http_listen didn't respond with 200 OK");
return;
}
let body = req_resp.bytes ().await.unwrap (); let body = req_resp.bytes ().await.unwrap ();
let wrapped_reqs: Vec <http_serde::WrappedRequest> = match rmp_serde::from_read_ref (&body) let wrapped_reqs: Vec <http_serde::WrappedRequest> = match rmp_serde::from_read_ref (&body)
{ {
@ -112,7 +106,14 @@ async fn handle_req_resp <'a> (
let text = r.text ().await.unwrap (); let text = r.text ().await.unwrap ();
debug! ("{:?} {:?}", status, text); debug! ("{:?} {:?}", status, text);
}, },
Err (e) => error! ("Err: {:?}", e), Err (e) => {
if e.is_request () {
warn! ("Error while POSTing response. Client probably hung up.");
}
else {
error! ("Err: {:?}", e);
}
},
} }
}); });
@ -154,7 +155,7 @@ pub async fn run_server (
let client = Client::builder () let client = Client::builder ()
.default_headers (headers) .default_headers (headers)
.timeout (Duration::from_secs (30)) .timeout (Duration::from_secs (40))
.build ().unwrap (); .build ().unwrap ();
let handlebars = file_server::load_templates ()?; let handlebars = file_server::load_templates ()?;
@ -171,6 +172,8 @@ pub async fn run_server (
let mut shutdown_oneshot = shutdown_oneshot.fuse (); let mut shutdown_oneshot = shutdown_oneshot.fuse ();
loop { loop {
// TODO: Extract loop body to function?
if backoff_delay > 0 { if backoff_delay > 0 {
let mut delay = delay_for (Duration::from_millis (backoff_delay)).fuse (); let mut delay = delay_for (Duration::from_millis (backoff_delay)).fuse ();
@ -199,22 +202,44 @@ pub async fn run_server (
let req_resp = match req_req { let req_resp = match req_req {
Err (e) => { Err (e) => {
error! ("Err: {:?}", e); if e.is_timeout () {
backoff_delay = err_backoff_delay; error! ("Client-side timeout. Is an overly-aggressive firewall closing long-lived connections? Is the network flakey?");
if backoff_delay != 0 {
debug! ("backoff_delay = 0");
backoff_delay = 0;
}
}
else {
error! ("Err: {:?}", e);
if backoff_delay != err_backoff_delay {
error! ("Non-timeout issue, increasing backoff_delay");
backoff_delay = err_backoff_delay;
}
}
continue; continue;
}, },
Ok (r) => { Ok (r) => {
backoff_delay = 0; if backoff_delay != 0 {
debug! ("backoff_delay = 0");
backoff_delay = 0;
}
r r
}, },
}; };
if req_resp.status () != StatusCode::OK { if req_resp.status () == StatusCode::NO_CONTENT {
debug! ("http_listen long poll timed out on the server, good.");
continue;
}
else if req_resp.status () != StatusCode::OK {
error! ("{}", req_resp.status ()); error! ("{}", req_resp.status ());
let body = req_resp.bytes ().await.unwrap (); let body = req_resp.bytes ().await.unwrap ();
let body = String::from_utf8 (body.to_vec ()).unwrap (); let body = String::from_utf8 (body.to_vec ()).unwrap ();
error! ("{}", body); error! ("{}", body);
backoff_delay = err_backoff_delay; if backoff_delay != err_backoff_delay {
error! ("Non-timeout issue, increasing backoff_delay");
backoff_delay = err_backoff_delay;
}
continue; continue;
} }

View File

@ -1,3 +1,4 @@
- Relay doesn't always shut down _if_ accessed by Firefox?
- Not working behind Nginx - Not working behind Nginx
- Try sending the http_response "OK" _after_ the request body is received - Try sending the http_response "OK" _after_ the request body is received