From 1f398462b788af8d0fae011416b9f1d6f735cef0 Mon Sep 17 00:00:00 2001 From: "(on company time)" <_@_> Date: Tue, 2 Aug 2022 10:19:52 -0500 Subject: [PATCH] :bug: bug: sweep request_rendezvous for timed-out requests, too --- crates/ptth_relay/src/lib.rs | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/crates/ptth_relay/src/lib.rs b/crates/ptth_relay/src/lib.rs index d351ce7..80512c1 100644 --- a/crates/ptth_relay/src/lib.rs +++ b/crates/ptth_relay/src/lib.rs @@ -796,16 +796,42 @@ pub async fn run_relay ( }); } + // Set a task to periodically sweep and time-out requests where the client + // and server are never going to rendezvous + let state_2 = Arc::clone (&state); tokio::spawn (async move { let mut interval = tokio::time::interval (Duration::from_secs (60)); interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); loop { + use std::convert::TryFrom; + + use rusty_ulid::Ulid; + interval.tick ().await; - let now = Instant::now (); - let response_rendezvous = state_2.response_rendezvous.read ().await; - response_rendezvous.retain (|_, v| v.timeout >= now); + + { + let timeout_ms = Utc::now ().timestamp () - 120_000; + if let Ok (timeout_ms) = u64::try_from (timeout_ms) { + let timeout_ulid = Ulid::from_timestamp_with_rng (timeout_ms, &mut rand::thread_rng ()).to_string (); + + let mut request_rendezvous = state_2.request_rendezvous.lock ().await; + request_rendezvous.iter_mut () + .for_each (|(k, v)| { + match v { + RequestRendezvous::ParkedServer (_) => (), + RequestRendezvous::ParkedClients (requests) => requests.retain (|req| req.id.as_str () >= timeout_ulid.as_str ()), + } + }); + } + } + + { + let now = Instant::now (); + let response_rendezvous = state_2.response_rendezvous.read ().await; + response_rendezvous.retain (|_, v| v.timeout >= now); + } } });