From 8b3f9520914b93105609c9e629533c7fdccd2163 Mon Sep 17 00:00:00 2001 From: _ <> Date: Tue, 27 Oct 2020 02:20:37 +0000 Subject: [PATCH] Multiple watchers --- src/bin/relay.rs | 43 +++++++++++++++++++++++++++++-------------- src/watcher.rs | 48 ++++++++++++++++++++++++------------------------ 2 files changed, 53 insertions(+), 38 deletions(-) diff --git a/src/bin/relay.rs b/src/bin/relay.rs index 4cdbd06..66d58f9 100644 --- a/src/bin/relay.rs +++ b/src/bin/relay.rs @@ -39,23 +39,26 @@ fn status_reply > (status: StatusCode, b: B) Response::builder ().status (status).body (b.into ()).unwrap () } -async fn handle_watch (state: Arc ) +async fn handle_watch (state: Arc , watcher_code: String) -> Response { - match Watchers::long_poll (state.watchers.clone ()).await { + match Watchers::long_poll (state.watchers.clone (), watcher_code).await { None => status_reply (StatusCode::OK, "no\n"), Some (_) => status_reply (StatusCode::OK, "actually, yes\n"), } } -async fn handle_wake (state: Arc ) +async fn handle_wake (state: Arc , watcher_code: String) -> Response { let mut watchers = state.watchers.lock ().await; - watchers.wake_all (Message::Meow); - - status_reply (StatusCode::OK, "ok\n") + if watchers.wake_one (Message::Meow, &watcher_code) { + status_reply (StatusCode::OK, "ok\n") + } + else { + status_reply (StatusCode::BAD_REQUEST, "no\n") + } } async fn handle_udp_send () -> Response @@ -84,24 +87,36 @@ async fn handle_udp_recv () -> Response status_reply (StatusCode::OK, buffer) } +fn prefix_match <'a> (hay: &'a str, needle: &str) -> Option <&'a str> +{ + if hay.starts_with (needle) { + Some (&hay [needle.len ()..]) + } + else { + None + } +} + async fn handle_all (req: Request , state: Arc ) -> Result , Infallible> { - let uri = req.uri (); - println! ("{}", uri); + let path = req.uri ().path (); + println! ("{}", path); - if uri == "/watch" { - Ok (handle_watch (state).await) + if let Some (watch_code) = prefix_match (path, "/watch/") { + Ok (handle_watch (state, watch_code.into ()).await) } - else if uri == "/wake" { - Ok (handle_wake (state).await) + else if let Some (watch_code) = prefix_match (path, "/wake/") { + Ok (handle_wake (state, watch_code.into ()).await) } - else if uri == "/udp_send" { + /* + else if let Some (name) = prefix_match (path, "/udp_send/") { Ok (handle_udp_send ().await) } - else if uri == "/udp_recv" { + else if let Some (name) = prefix_match (path, "/udp_recv/") { Ok (handle_udp_recv ().await) } + */ else { Ok (status_reply (StatusCode::OK, "Hi\n")) } diff --git a/src/watcher.rs b/src/watcher.rs index f67d063..34e374c 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -13,42 +13,27 @@ use tokio::{ }; pub struct Watchers { - pub next_id: u64, - pub senders: HashMap >, + pub senders: HashMap >, } impl Default for Watchers { fn default () -> Self { Self { - next_id: 0, senders: Default::default (), } } } impl Watchers { - fn take_id (&mut self) -> u64 { - let id = self.next_id; - self.next_id += 1; - id - } - - fn add_watcher_with_id (&mut self, id: u64, s: oneshot::Sender ) { + pub fn add_watcher_with_id (&mut self, s: oneshot::Sender , id: String) { self.senders.insert (id, s); } - pub fn add_watcher (&mut self, s: oneshot::Sender ) -> u64 { - let id = self.take_id (); - self.add_watcher_with_id (id, s); - - id - } - - pub fn remove_watcher (&mut self, id: u64) { + pub fn remove_watcher (&mut self, id: String) { self.senders.remove (&id); } - - pub fn wake_all (&mut self, msg: T) { + /* + fn wake_all (&mut self, msg: T) { let waiters = { std::mem::take (&mut self.senders) }; @@ -57,20 +42,35 @@ impl Watchers { waiter.send (msg.clone ()).ok (); } } + */ + pub fn wake_one (&mut self, msg: T, id: &str) -> bool { + println! ("wake_one {}", id); + + if let Some (waiter) = self.senders.remove (id) { + waiter.send (msg).ok (); + true + } + else { + false + } + } pub fn num_watchers (&self) -> usize { self.senders.len () } - pub async fn long_poll (that: Arc >) -> Option { + pub async fn long_poll (that: Arc >, id: String) -> Option { + println! ("long_poll {}", id); + let (s, r) = oneshot::channel (); let timeout = Duration::from_secs (10); - let id = { + let id_2 = id.clone (); + { let mut that = that.lock ().await; - that.add_watcher (s) - }; + that.add_watcher_with_id (s, id_2) + } tokio::spawn (async move { delay_for (timeout).await;