commit 394345cfe271d0648ff2ba2a4a908a089c0f7f0f Author: _ <> Date: Tue Oct 27 01:42:10 2020 +0000 That's long polling diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..d9c404d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "ptth" +version = "0.1.0" +authors = ["_"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures = "0.3.7" +hyper = "0.13.8" +tokio = { version = "0.2", features = ["full"] } diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..c72d830 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,103 @@ +use std::{ + collections::*, + error::Error, +}; +use std::convert::{Infallible}; +use std::io::Write; +use std::net::SocketAddr; +use std::path::Path; +use std::str::FromStr; +use std::sync::{ + Arc +}; +use std::time::{Duration, Instant}; + +use futures::channel::oneshot; +use hyper::{Body, Request, Response, Server, StatusCode}; +use hyper::service::{make_service_fn, service_fn}; + +use tokio::{ + sync::Mutex, + time::delay_for, +}; + +mod watcher; + +use watcher::Watchers; + +#[derive (Clone)] +enum Message { + Meow, +} + +#[derive (Default)] +struct ServerState { + watchers: Arc >>, +} + +fn status_reply > (status: StatusCode, b: B) +-> Response +{ + Response::builder ().status (status).body (b.into ()).unwrap () +} + +async fn handle_watch (state: Arc ) +-> Response +{ + match Watchers::long_poll (state.watchers.clone ()).await { + None => status_reply (StatusCode::OK, "no\n"), + Some (_) => status_reply (StatusCode::OK, "actually, yes\n"), + } +} + +async fn handle_wake (state: Arc ) +-> Response +{ + let mut watchers = state.watchers.lock ().await; + + watchers.wake_all (Message::Meow); + + status_reply (StatusCode::OK, "ok\n") +} + +async fn handle_all (req: Request , state: Arc ) +-> Result , Infallible> +{ + let uri = req.uri (); + println! ("{}", uri); + + if uri == "/watch" { + Ok (handle_watch (state).await) + } + else if uri == "/wake" { + Ok (handle_wake (state).await) + } + else { + Ok (status_reply (StatusCode::OK, "Hi\n")) + } +} + +#[tokio::main] +async fn main () -> Result <(), Box > { + let addr = SocketAddr::from(([0, 0, 0, 0], 4000)); + + let state = Arc::new (ServerState::default ()); + + let make_svc = make_service_fn (|_conn| { + let state = state.clone (); + + async { + Ok::<_, Infallible> (service_fn (move |req| { + let state = state.clone (); + + handle_all (req, state) + })) + } + }); + + let server = Server::bind (&addr).serve (make_svc); + + server.await?; + + Ok (()) +} diff --git a/src/watcher.rs b/src/watcher.rs new file mode 100644 index 0000000..f67d063 --- /dev/null +++ b/src/watcher.rs @@ -0,0 +1,88 @@ +// Watchers on the wall was the last good episode of Game of Thrones + +use std::{ + collections::*, + sync::Arc, + time::Duration, +}; + +use futures::channel::oneshot; +use tokio::{ + sync::Mutex, + time::delay_for, +}; + +pub struct Watchers { + pub next_id: u64, + pub senders: HashMap >, +} + +impl Default for Watchers { + fn default () -> Self { + Self { + next_id: 0, + senders: Default::default (), + } + } +} + +impl Watchers { + fn take_id (&mut self) -> u64 { + let id = self.next_id; + self.next_id += 1; + id + } + + fn add_watcher_with_id (&mut self, id: u64, s: oneshot::Sender ) { + self.senders.insert (id, s); + } + + pub fn add_watcher (&mut self, s: oneshot::Sender ) -> u64 { + let id = self.take_id (); + self.add_watcher_with_id (id, s); + + id + } + + pub fn remove_watcher (&mut self, id: u64) { + self.senders.remove (&id); + } + + pub fn wake_all (&mut self, msg: T) { + let waiters = { + std::mem::take (&mut self.senders) + }; + + for (_id, waiter) in waiters.into_iter () { + waiter.send (msg.clone ()).ok (); + } + } + + pub fn num_watchers (&self) -> usize { + self.senders.len () + } + + pub async fn long_poll (that: Arc >) -> Option { + let (s, r) = oneshot::channel (); + + let timeout = Duration::from_secs (10); + + let id = { + let mut that = that.lock ().await; + that.add_watcher (s) + }; + + tokio::spawn (async move { + delay_for (timeout).await; + let mut that = that.lock ().await; + that.remove_watcher (id); + }); + + if let Ok (message) = r.await { + Some (message) + } + else { + None + } + } +}