time out requests if the server doesn't rendezvous in 2 minutes

main
(on company time) 2022-08-01 13:15:53 -05:00
parent c30747d954
commit ce7ce42168
3 changed files with 24 additions and 3 deletions

View File

@ -135,6 +135,11 @@ async fn handle_http_request (
let (tx, rx) = oneshot::channel (); let (tx, rx) = oneshot::channel ();
let tx = relay_state::ResponseRendezvous {
timeout: Instant::now () + Duration::from_secs (120),
tx,
};
let req_id = rusty_ulid::generate_ulid_string (); let req_id = rusty_ulid::generate_ulid_string ();
debug! ("Forwarding {}", req_id); debug! ("Forwarding {}", req_id);
@ -791,6 +796,19 @@ pub async fn run_relay (
}); });
} }
let state_2 = Arc::clone (&state);
tokio::spawn (async move {
let mut interval = tokio::time::interval (Duration::from_secs (60));
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick ().await;
let now = Instant::now ();
let response_rendezvous = state_2.response_rendezvous.read ().await;
response_rendezvous.retain (|_, v| v.timeout >= now);
}
});
let make_svc = make_service_fn (|_conn| { let make_svc = make_service_fn (|_conn| {
let state = state.clone (); let state = state.clone ();
let handlebars = handlebars.clone (); let handlebars = handlebars.clone ();
@ -849,7 +867,7 @@ pub async fn run_relay (
std::mem::swap (&mut swapped, &mut response_rendezvous); std::mem::swap (&mut swapped, &mut response_rendezvous);
for (_, sender) in swapped { for (_, sender) in swapped {
sender.send (Err (ShuttingDown)).ok (); sender.tx.send (Err (ShuttingDown)).ok ();
} }
let mut request_rendezvous = state.request_rendezvous.lock ().await; let mut request_rendezvous = state.request_rendezvous.lock ().await;

View File

@ -61,7 +61,10 @@ pub enum RequestRendezvous {
ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, ShuttingDownError>>), ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, ShuttingDownError>>),
} }
type ResponseRendezvous = oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>; pub (crate) struct ResponseRendezvous {
pub timeout: Instant,
pub tx: oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>,
}
#[derive (Clone)] #[derive (Clone)]
pub struct ServerStatus { pub struct ServerStatus {

View File

@ -188,7 +188,7 @@ pub async fn handle_response (
}; };
// UKAUFFY4 (Send half) // UKAUFFY4 (Send half)
if tx.send (Ok ((resp_parts, body))).is_err () { if tx.tx.send (Ok ((resp_parts, body))).is_err () {
let msg = "Failed to connect to client"; let msg = "Failed to connect to client";
error! (msg); error! (msg);
return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?); return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?);