diff --git a/crates/ptth_relay/src/lib.rs b/crates/ptth_relay/src/lib.rs index 2f29ca8..d351ce7 100644 --- a/crates/ptth_relay/src/lib.rs +++ b/crates/ptth_relay/src/lib.rs @@ -135,6 +135,11 @@ async fn handle_http_request ( let (tx, rx) = oneshot::channel (); + let tx = relay_state::ResponseRendezvous { + timeout: Instant::now () + Duration::from_secs (120), + tx, + }; + let req_id = rusty_ulid::generate_ulid_string (); debug! ("Forwarding {}", req_id); @@ -791,6 +796,19 @@ pub async fn run_relay ( }); } + let state_2 = Arc::clone (&state); + tokio::spawn (async move { + let mut interval = tokio::time::interval (Duration::from_secs (60)); + interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick ().await; + let now = Instant::now (); + let response_rendezvous = state_2.response_rendezvous.read ().await; + response_rendezvous.retain (|_, v| v.timeout >= now); + } + }); + let make_svc = make_service_fn (|_conn| { let state = state.clone (); let handlebars = handlebars.clone (); @@ -849,7 +867,7 @@ pub async fn run_relay ( std::mem::swap (&mut swapped, &mut response_rendezvous); for (_, sender) in swapped { - sender.send (Err (ShuttingDown)).ok (); + sender.tx.send (Err (ShuttingDown)).ok (); } let mut request_rendezvous = state.request_rendezvous.lock ().await; diff --git a/crates/ptth_relay/src/relay_state.rs b/crates/ptth_relay/src/relay_state.rs index 9f66ffa..4ad3bcb 100644 --- a/crates/ptth_relay/src/relay_state.rs +++ b/crates/ptth_relay/src/relay_state.rs @@ -61,7 +61,10 @@ pub enum RequestRendezvous { ParkedServer (oneshot::Sender >), } -type ResponseRendezvous = oneshot::Sender >; +pub (crate) struct ResponseRendezvous { + pub timeout: Instant, + pub tx: oneshot::Sender >, +} #[derive (Clone)] pub struct ServerStatus { diff --git a/crates/ptth_relay/src/server_endpoint.rs b/crates/ptth_relay/src/server_endpoint.rs index 84333dd..71e8b6a 100644 --- a/crates/ptth_relay/src/server_endpoint.rs +++ b/crates/ptth_relay/src/server_endpoint.rs @@ -188,7 +188,7 @@ pub async fn handle_response ( }; // UKAUFFY4 (Send half) - if tx.send (Ok ((resp_parts, body))).is_err () { + if tx.tx.send (Ok ((resp_parts, body))).is_err () { let msg = "Failed to connect to client"; error! (msg); return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?);