From a661ce02eadd130495edda00e105b59e02e980c9 Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Tue, 27 Apr 2021 20:10:32 -0500 Subject: [PATCH] :recycle: refactor: extract park_client function --- crates/ptth_core/src/http_serde.rs | 2 +- crates/ptth_relay/src/lib.rs | 58 +++++---------------------- crates/ptth_relay/src/relay_state.rs | 59 +++++++++++++++++++++++++++- 3 files changed, 69 insertions(+), 50 deletions(-) diff --git a/crates/ptth_core/src/http_serde.rs b/crates/ptth_core/src/http_serde.rs index f533925..f6afb0e 100644 --- a/crates/ptth_core/src/http_serde.rs +++ b/crates/ptth_core/src/http_serde.rs @@ -16,7 +16,7 @@ pub enum Error { UnsupportedMethod, } -#[derive (Debug, Deserialize, Serialize)] +#[derive (Debug, Deserialize, PartialEq, Serialize)] pub enum Method { Get, Head, diff --git a/crates/ptth_relay/src/lib.rs b/crates/ptth_relay/src/lib.rs index 8a88715..8f49341 100644 --- a/crates/ptth_relay/src/lib.rs +++ b/crates/ptth_relay/src/lib.rs @@ -113,6 +113,8 @@ async fn handle_http_request ( AuditEvent, }; + let req_method = req.method.clone (); + { let config = state.config.read ().await; if ! config.servers.contains_key (server_name) { @@ -140,54 +142,7 @@ async fn handle_http_request ( response_rendezvous.insert (req_id.clone (), tx); } - { - use RequestRendezvous::*; - - let mut request_rendezvous = state.request_rendezvous.lock ().await; - - let wrapped = http_serde::WrappedRequest { - id: req_id.clone (), - req, - }; - - let new_rendezvous = match request_rendezvous.remove (server_name) { - Some (ParkedClients (mut v)) => { - debug! ("Parking request {} ({} already queued)", req_id, v.len ()); - v.push (wrapped); - ParkedClients (v) - }, - Some (ParkedServer (s)) => { - // If sending to the server fails, queue it - - match s.send (Ok (wrapped)) { - Ok (()) => { - // TODO: This can actually still fail, if the server - // disconnects right as we're sending this. - // Then what? - - trace! ( - "Sending request {} directly to server {}", - req_id, - server_name, - ); - - ParkedClients (vec! []) - }, - Err (Ok (wrapped)) => { - debug! ("Parking request {}", req_id); - ParkedClients (vec! [wrapped]) - }, - Err (_) => unreachable! (), - } - }, - None => { - debug! ("Parking request {}", req_id); - ParkedClients (vec! [wrapped]) - }, - }; - - request_rendezvous.insert (server_name.to_string (), new_rendezvous); - } + state.park_client (server_name, req, &req_id).await; let timeout = tokio::time::sleep (std::time::Duration::from_secs (30)); @@ -205,6 +160,13 @@ async fn handle_http_request ( let mut resp = Response::builder () .status (hyper::StatusCode::from (parts.status_code)); + if + req_method == hyper::Method::GET && + parts.headers.get ("accept-ranges").is_some () + { + trace! ("Stream restart code could go here"); + } + for (k, v) in parts.headers { resp = resp.header (&k, v); } diff --git a/crates/ptth_relay/src/relay_state.rs b/crates/ptth_relay/src/relay_state.rs index cae2174..8dd85f0 100644 --- a/crates/ptth_relay/src/relay_state.rs +++ b/crates/ptth_relay/src/relay_state.rs @@ -20,7 +20,10 @@ use crate::{ ShuttingDownError, }; -use ptth_core::http_serde; +use ptth_core::{ + http_serde, + prelude::*, +}; /* @@ -188,6 +191,60 @@ impl Relay { pub fn build () -> Builder { Builder::default () } + + pub async fn park_client ( + &self, + server_name: &str, + req: http_serde::RequestParts, + req_id: &str, + ) { + use RequestRendezvous::*; + + let mut request_rendezvous = self.request_rendezvous.lock ().await; + + let wrapped = http_serde::WrappedRequest { + id: req_id.to_string (), + req, + }; + + let new_rendezvous = match request_rendezvous.remove (server_name) { + Some (ParkedClients (mut v)) => { + debug! ("Parking request {} ({} already queued)", req_id, v.len ()); + v.push (wrapped); + ParkedClients (v) + }, + Some (ParkedServer (s)) => { + // If sending to the server fails, queue it + + match s.send (Ok (wrapped)) { + Ok (()) => { + // TODO: This can actually still fail, if the server + // disconnects right as we're sending this. + // Then what? + + trace! ( + "Sending request {} directly to server {}", + req_id, + server_name, + ); + + ParkedClients (vec! []) + }, + Err (Ok (wrapped)) => { + debug! ("Parking request {}", req_id); + ParkedClients (vec! [wrapped]) + }, + Err (_) => unreachable! (), + } + }, + None => { + debug! ("Parking request {}", req_id); + ParkedClients (vec! [wrapped]) + }, + }; + + request_rendezvous.insert (server_name.to_string (), new_rendezvous); + } } #[derive (Default)]