🐛 bug: sweep request_rendezvous for timed-out requests, too

main
(on company time) 2022-08-02 10:19:52 -05:00
parent ce7ce42168
commit 1f398462b7
1 changed files with 29 additions and 3 deletions

View File

@ -796,16 +796,42 @@ pub async fn run_relay (
}); });
} }
// Set a task to periodically sweep and time-out requests where the client
// and server are never going to rendezvous
let state_2 = Arc::clone (&state); let state_2 = Arc::clone (&state);
tokio::spawn (async move { tokio::spawn (async move {
let mut interval = tokio::time::interval (Duration::from_secs (60)); let mut interval = tokio::time::interval (Duration::from_secs (60));
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
loop { loop {
use std::convert::TryFrom;
use rusty_ulid::Ulid;
interval.tick ().await; interval.tick ().await;
let now = Instant::now ();
let response_rendezvous = state_2.response_rendezvous.read ().await; {
response_rendezvous.retain (|_, v| v.timeout >= now); let timeout_ms = Utc::now ().timestamp () - 120_000;
if let Ok (timeout_ms) = u64::try_from (timeout_ms) {
let timeout_ulid = Ulid::from_timestamp_with_rng (timeout_ms, &mut rand::thread_rng ()).to_string ();
let mut request_rendezvous = state_2.request_rendezvous.lock ().await;
request_rendezvous.iter_mut ()
.for_each (|(k, v)| {
match v {
RequestRendezvous::ParkedServer (_) => (),
RequestRendezvous::ParkedClients (requests) => requests.retain (|req| req.id.as_str () >= timeout_ulid.as_str ()),
}
});
}
}
{
let now = Instant::now ();
let response_rendezvous = state_2.response_rendezvous.read ().await;
response_rendezvous.retain (|_, v| v.timeout >= now);
}
} }
}); });