Multiple watchers
parent
6bb2678eb5
commit
8b3f952091
|
@ -39,24 +39,27 @@ fn status_reply <B: Into <Body>> (status: StatusCode, b: B)
|
|||
Response::builder ().status (status).body (b.into ()).unwrap ()
|
||||
}
|
||||
|
||||
async fn handle_watch (state: Arc <ServerState>)
|
||||
async fn handle_watch (state: Arc <ServerState>, watcher_code: String)
|
||||
-> Response <Body>
|
||||
{
|
||||
match Watchers::long_poll (state.watchers.clone ()).await {
|
||||
match Watchers::long_poll (state.watchers.clone (), watcher_code).await {
|
||||
None => status_reply (StatusCode::OK, "no\n"),
|
||||
Some (_) => status_reply (StatusCode::OK, "actually, yes\n"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_wake (state: Arc <ServerState>)
|
||||
async fn handle_wake (state: Arc <ServerState>, watcher_code: String)
|
||||
-> Response <Body>
|
||||
{
|
||||
let mut watchers = state.watchers.lock ().await;
|
||||
|
||||
watchers.wake_all (Message::Meow);
|
||||
|
||||
if watchers.wake_one (Message::Meow, &watcher_code) {
|
||||
status_reply (StatusCode::OK, "ok\n")
|
||||
}
|
||||
else {
|
||||
status_reply (StatusCode::BAD_REQUEST, "no\n")
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_udp_send () -> Response <Body>
|
||||
{
|
||||
|
@ -84,24 +87,36 @@ async fn handle_udp_recv () -> Response <Body>
|
|||
status_reply (StatusCode::OK, buffer)
|
||||
}
|
||||
|
||||
fn prefix_match <'a> (hay: &'a str, needle: &str) -> Option <&'a str>
|
||||
{
|
||||
if hay.starts_with (needle) {
|
||||
Some (&hay [needle.len ()..])
|
||||
}
|
||||
else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_all (req: Request <Body>, state: Arc <ServerState>)
|
||||
-> Result <Response <Body>, Infallible>
|
||||
{
|
||||
let uri = req.uri ();
|
||||
println! ("{}", uri);
|
||||
let path = req.uri ().path ();
|
||||
println! ("{}", path);
|
||||
|
||||
if uri == "/watch" {
|
||||
Ok (handle_watch (state).await)
|
||||
if let Some (watch_code) = prefix_match (path, "/watch/") {
|
||||
Ok (handle_watch (state, watch_code.into ()).await)
|
||||
}
|
||||
else if uri == "/wake" {
|
||||
Ok (handle_wake (state).await)
|
||||
else if let Some (watch_code) = prefix_match (path, "/wake/") {
|
||||
Ok (handle_wake (state, watch_code.into ()).await)
|
||||
}
|
||||
else if uri == "/udp_send" {
|
||||
/*
|
||||
else if let Some (name) = prefix_match (path, "/udp_send/") {
|
||||
Ok (handle_udp_send ().await)
|
||||
}
|
||||
else if uri == "/udp_recv" {
|
||||
else if let Some (name) = prefix_match (path, "/udp_recv/") {
|
||||
Ok (handle_udp_recv ().await)
|
||||
}
|
||||
*/
|
||||
else {
|
||||
Ok (status_reply (StatusCode::OK, "Hi\n"))
|
||||
}
|
||||
|
|
|
@ -13,42 +13,27 @@ use tokio::{
|
|||
};
|
||||
|
||||
pub struct Watchers <T> {
|
||||
pub next_id: u64,
|
||||
pub senders: HashMap <u64, oneshot::Sender <T>>,
|
||||
pub senders: HashMap <String, oneshot::Sender <T>>,
|
||||
}
|
||||
|
||||
impl <T> Default for Watchers <T> {
|
||||
fn default () -> Self {
|
||||
Self {
|
||||
next_id: 0,
|
||||
senders: Default::default (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl <T: 'static + Clone + Send + Sync> Watchers <T> {
|
||||
fn take_id (&mut self) -> u64 {
|
||||
let id = self.next_id;
|
||||
self.next_id += 1;
|
||||
id
|
||||
}
|
||||
|
||||
fn add_watcher_with_id (&mut self, id: u64, s: oneshot::Sender <T>) {
|
||||
pub fn add_watcher_with_id (&mut self, s: oneshot::Sender <T>, id: String) {
|
||||
self.senders.insert (id, s);
|
||||
}
|
||||
|
||||
pub fn add_watcher (&mut self, s: oneshot::Sender <T>) -> u64 {
|
||||
let id = self.take_id ();
|
||||
self.add_watcher_with_id (id, s);
|
||||
|
||||
id
|
||||
}
|
||||
|
||||
pub fn remove_watcher (&mut self, id: u64) {
|
||||
pub fn remove_watcher (&mut self, id: String) {
|
||||
self.senders.remove (&id);
|
||||
}
|
||||
|
||||
pub fn wake_all (&mut self, msg: T) {
|
||||
/*
|
||||
fn wake_all (&mut self, msg: T) {
|
||||
let waiters = {
|
||||
std::mem::take (&mut self.senders)
|
||||
};
|
||||
|
@ -57,20 +42,35 @@ impl <T: 'static + Clone + Send + Sync> Watchers <T> {
|
|||
waiter.send (msg.clone ()).ok ();
|
||||
}
|
||||
}
|
||||
*/
|
||||
pub fn wake_one (&mut self, msg: T, id: &str) -> bool {
|
||||
println! ("wake_one {}", id);
|
||||
|
||||
if let Some (waiter) = self.senders.remove (id) {
|
||||
waiter.send (msg).ok ();
|
||||
true
|
||||
}
|
||||
else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn num_watchers (&self) -> usize {
|
||||
self.senders.len ()
|
||||
}
|
||||
|
||||
pub async fn long_poll (that: Arc <Mutex <Self>>) -> Option <T> {
|
||||
pub async fn long_poll (that: Arc <Mutex <Self>>, id: String) -> Option <T> {
|
||||
println! ("long_poll {}", id);
|
||||
|
||||
let (s, r) = oneshot::channel ();
|
||||
|
||||
let timeout = Duration::from_secs (10);
|
||||
|
||||
let id = {
|
||||
let id_2 = id.clone ();
|
||||
{
|
||||
let mut that = that.lock ().await;
|
||||
that.add_watcher (s)
|
||||
};
|
||||
that.add_watcher_with_id (s, id_2)
|
||||
}
|
||||
|
||||
tokio::spawn (async move {
|
||||
delay_for (timeout).await;
|
||||
|
|
Loading…
Reference in New Issue