That's long polling
commit
394345cfe2
|
@ -0,0 +1 @@
|
||||||
|
/target
|
|
@ -0,0 +1,12 @@
|
||||||
|
[package]
|
||||||
|
name = "ptth"
|
||||||
|
version = "0.1.0"
|
||||||
|
authors = ["_"]
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
futures = "0.3.7"
|
||||||
|
hyper = "0.13.8"
|
||||||
|
tokio = { version = "0.2", features = ["full"] }
|
|
@ -0,0 +1,103 @@
|
||||||
|
use std::{
|
||||||
|
collections::*,
|
||||||
|
error::Error,
|
||||||
|
};
|
||||||
|
use std::convert::{Infallible};
|
||||||
|
use std::io::Write;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use std::sync::{
|
||||||
|
Arc
|
||||||
|
};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use futures::channel::oneshot;
|
||||||
|
use hyper::{Body, Request, Response, Server, StatusCode};
|
||||||
|
use hyper::service::{make_service_fn, service_fn};
|
||||||
|
|
||||||
|
use tokio::{
|
||||||
|
sync::Mutex,
|
||||||
|
time::delay_for,
|
||||||
|
};
|
||||||
|
|
||||||
|
mod watcher;
|
||||||
|
|
||||||
|
use watcher::Watchers;
|
||||||
|
|
||||||
|
#[derive (Clone)]
|
||||||
|
enum Message {
|
||||||
|
Meow,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive (Default)]
|
||||||
|
struct ServerState {
|
||||||
|
watchers: Arc <Mutex <Watchers <Message>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn status_reply <B: Into <Body>> (status: StatusCode, b: B)
|
||||||
|
-> Response <Body>
|
||||||
|
{
|
||||||
|
Response::builder ().status (status).body (b.into ()).unwrap ()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_watch (state: Arc <ServerState>)
|
||||||
|
-> Response <Body>
|
||||||
|
{
|
||||||
|
match Watchers::long_poll (state.watchers.clone ()).await {
|
||||||
|
None => status_reply (StatusCode::OK, "no\n"),
|
||||||
|
Some (_) => status_reply (StatusCode::OK, "actually, yes\n"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_wake (state: Arc <ServerState>)
|
||||||
|
-> Response <Body>
|
||||||
|
{
|
||||||
|
let mut watchers = state.watchers.lock ().await;
|
||||||
|
|
||||||
|
watchers.wake_all (Message::Meow);
|
||||||
|
|
||||||
|
status_reply (StatusCode::OK, "ok\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_all (req: Request <Body>, state: Arc <ServerState>)
|
||||||
|
-> Result <Response <Body>, Infallible>
|
||||||
|
{
|
||||||
|
let uri = req.uri ();
|
||||||
|
println! ("{}", uri);
|
||||||
|
|
||||||
|
if uri == "/watch" {
|
||||||
|
Ok (handle_watch (state).await)
|
||||||
|
}
|
||||||
|
else if uri == "/wake" {
|
||||||
|
Ok (handle_wake (state).await)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Ok (status_reply (StatusCode::OK, "Hi\n"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main () -> Result <(), Box <dyn Error>> {
|
||||||
|
let addr = SocketAddr::from(([0, 0, 0, 0], 4000));
|
||||||
|
|
||||||
|
let state = Arc::new (ServerState::default ());
|
||||||
|
|
||||||
|
let make_svc = make_service_fn (|_conn| {
|
||||||
|
let state = state.clone ();
|
||||||
|
|
||||||
|
async {
|
||||||
|
Ok::<_, Infallible> (service_fn (move |req| {
|
||||||
|
let state = state.clone ();
|
||||||
|
|
||||||
|
handle_all (req, state)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let server = Server::bind (&addr).serve (make_svc);
|
||||||
|
|
||||||
|
server.await?;
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
|
@ -0,0 +1,88 @@
|
||||||
|
// Watchers on the wall was the last good episode of Game of Thrones
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
collections::*,
|
||||||
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::channel::oneshot;
|
||||||
|
use tokio::{
|
||||||
|
sync::Mutex,
|
||||||
|
time::delay_for,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct Watchers <T> {
|
||||||
|
pub next_id: u64,
|
||||||
|
pub senders: HashMap <u64, oneshot::Sender <T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <T> Default for Watchers <T> {
|
||||||
|
fn default () -> Self {
|
||||||
|
Self {
|
||||||
|
next_id: 0,
|
||||||
|
senders: Default::default (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <T: 'static + Clone + Send + Sync> Watchers <T> {
|
||||||
|
fn take_id (&mut self) -> u64 {
|
||||||
|
let id = self.next_id;
|
||||||
|
self.next_id += 1;
|
||||||
|
id
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_watcher_with_id (&mut self, id: u64, s: oneshot::Sender <T>) {
|
||||||
|
self.senders.insert (id, s);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_watcher (&mut self, s: oneshot::Sender <T>) -> u64 {
|
||||||
|
let id = self.take_id ();
|
||||||
|
self.add_watcher_with_id (id, s);
|
||||||
|
|
||||||
|
id
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_watcher (&mut self, id: u64) {
|
||||||
|
self.senders.remove (&id);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn wake_all (&mut self, msg: T) {
|
||||||
|
let waiters = {
|
||||||
|
std::mem::take (&mut self.senders)
|
||||||
|
};
|
||||||
|
|
||||||
|
for (_id, waiter) in waiters.into_iter () {
|
||||||
|
waiter.send (msg.clone ()).ok ();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn num_watchers (&self) -> usize {
|
||||||
|
self.senders.len ()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn long_poll (that: Arc <Mutex <Self>>) -> Option <T> {
|
||||||
|
let (s, r) = oneshot::channel ();
|
||||||
|
|
||||||
|
let timeout = Duration::from_secs (10);
|
||||||
|
|
||||||
|
let id = {
|
||||||
|
let mut that = that.lock ().await;
|
||||||
|
that.add_watcher (s)
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::spawn (async move {
|
||||||
|
delay_for (timeout).await;
|
||||||
|
let mut that = that.lock ().await;
|
||||||
|
that.remove_watcher (id);
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Ok (message) = r.await {
|
||||||
|
Some (message)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue