♻️ refactor: extract park_client function

main
_ 2021-04-27 20:10:32 -05:00
parent 7bae7f45ff
commit a661ce02ea
3 changed files with 69 additions and 50 deletions

View File

@ -16,7 +16,7 @@ pub enum Error {
UnsupportedMethod,
}
#[derive (Debug, Deserialize, Serialize)]
#[derive (Debug, Deserialize, PartialEq, Serialize)]
pub enum Method {
Get,
Head,

View File

@ -113,6 +113,8 @@ async fn handle_http_request (
AuditEvent,
};
let req_method = req.method.clone ();
{
let config = state.config.read ().await;
if ! config.servers.contains_key (server_name) {
@ -140,54 +142,7 @@ async fn handle_http_request (
response_rendezvous.insert (req_id.clone (), tx);
}
{
use RequestRendezvous::*;
let mut request_rendezvous = state.request_rendezvous.lock ().await;
let wrapped = http_serde::WrappedRequest {
id: req_id.clone (),
req,
};
let new_rendezvous = match request_rendezvous.remove (server_name) {
Some (ParkedClients (mut v)) => {
debug! ("Parking request {} ({} already queued)", req_id, v.len ());
v.push (wrapped);
ParkedClients (v)
},
Some (ParkedServer (s)) => {
// If sending to the server fails, queue it
match s.send (Ok (wrapped)) {
Ok (()) => {
// TODO: This can actually still fail, if the server
// disconnects right as we're sending this.
// Then what?
trace! (
"Sending request {} directly to server {}",
req_id,
server_name,
);
ParkedClients (vec! [])
},
Err (Ok (wrapped)) => {
debug! ("Parking request {}", req_id);
ParkedClients (vec! [wrapped])
},
Err (_) => unreachable! (),
}
},
None => {
debug! ("Parking request {}", req_id);
ParkedClients (vec! [wrapped])
},
};
request_rendezvous.insert (server_name.to_string (), new_rendezvous);
}
state.park_client (server_name, req, &req_id).await;
let timeout = tokio::time::sleep (std::time::Duration::from_secs (30));
@ -205,6 +160,13 @@ async fn handle_http_request (
let mut resp = Response::builder ()
.status (hyper::StatusCode::from (parts.status_code));
if
req_method == hyper::Method::GET &&
parts.headers.get ("accept-ranges").is_some ()
{
trace! ("Stream restart code could go here");
}
for (k, v) in parts.headers {
resp = resp.header (&k, v);
}

View File

@ -20,7 +20,10 @@ use crate::{
ShuttingDownError,
};
use ptth_core::http_serde;
use ptth_core::{
http_serde,
prelude::*,
};
/*
@ -188,6 +191,60 @@ impl Relay {
pub fn build () -> Builder {
Builder::default ()
}
pub async fn park_client (
&self,
server_name: &str,
req: http_serde::RequestParts,
req_id: &str,
) {
use RequestRendezvous::*;
let mut request_rendezvous = self.request_rendezvous.lock ().await;
let wrapped = http_serde::WrappedRequest {
id: req_id.to_string (),
req,
};
let new_rendezvous = match request_rendezvous.remove (server_name) {
Some (ParkedClients (mut v)) => {
debug! ("Parking request {} ({} already queued)", req_id, v.len ());
v.push (wrapped);
ParkedClients (v)
},
Some (ParkedServer (s)) => {
// If sending to the server fails, queue it
match s.send (Ok (wrapped)) {
Ok (()) => {
// TODO: This can actually still fail, if the server
// disconnects right as we're sending this.
// Then what?
trace! (
"Sending request {} directly to server {}",
req_id,
server_name,
);
ParkedClients (vec! [])
},
Err (Ok (wrapped)) => {
debug! ("Parking request {}", req_id);
ParkedClients (vec! [wrapped])
},
Err (_) => unreachable! (),
}
},
None => {
debug! ("Parking request {}", req_id);
ParkedClients (vec! [wrapped])
},
};
request_rendezvous.insert (server_name.to_string (), new_rendezvous);
}
}
#[derive (Default)]