use std::{ collections::*, error::Error, }; use std::convert::{Infallible}; use std::io::Write; use std::net::SocketAddr; use std::path::Path; use std::str::FromStr; use std::sync::{ Arc }; use std::time::{Duration, Instant}; use futures::channel::oneshot; use hyper::{Body, Request, Response, Server, StatusCode}; use hyper::service::{make_service_fn, service_fn}; use tokio::{ sync::Mutex, time::delay_for, }; use ptth::watcher::Watchers; #[derive (Clone)] enum Message { Meow, HttpRequest (String), } #[derive (Default)] struct ServerState { watchers: Arc >>, } fn status_reply > (status: StatusCode, b: B) -> Response { Response::builder ().status (status).body (b.into ()).unwrap () } async fn handle_watch (state: Arc , watcher_code: String) -> Response { match Watchers::long_poll (state.watchers.clone (), watcher_code).await { None => status_reply (StatusCode::OK, "no\n"), Some (_) => status_reply (StatusCode::OK, "actually, yes\n"), } } async fn handle_wake (state: Arc , watcher_code: String) -> Response { let mut watchers = state.watchers.lock ().await; if watchers.wake_one (Message::Meow, &watcher_code) { status_reply (StatusCode::OK, "ok\n") } else { status_reply (StatusCode::BAD_REQUEST, "no\n") } } async fn handle_http_listen (state: Arc , watcher_code: String) -> Response { match Watchers::long_poll (state.watchers.clone (), watcher_code).await { Some (Message::HttpRequest (uri)) => status_reply (StatusCode::OK, uri), _ => status_reply (StatusCode::GATEWAY_TIMEOUT, "no\n"), } } async fn handle_http_request ( state: Arc , watcher_code: String, uri: String ) -> Response { let mut watchers = state.watchers.lock ().await; if watchers.wake_one (Message::HttpRequest (uri), &watcher_code) { status_reply (StatusCode::OK, "ok\n") } else { status_reply (StatusCode::BAD_REQUEST, "no\n") } } async fn handle_udp_send () -> Response { use tokio::net::UdpSocket; let mut sock = UdpSocket::bind (SocketAddr::from (([0,0,0,0], 0))).await.unwrap (); sock.send_to (b"handle_udp_send\n", SocketAddr::from (([127,0,0,1], 9000u16))).await.unwrap (); status_reply (StatusCode::OK, "ok\n") } async fn handle_udp_recv () -> Response { use tokio::net::UdpSocket; let mut sock = UdpSocket::bind (SocketAddr::from (([0,0,0,0], 4001))).await.unwrap (); let mut buffer = vec! [0u8; 4096]; let (bytes_received, _addr) = sock.recv_from (&mut buffer [..]).await.unwrap (); buffer.truncate (bytes_received); status_reply (StatusCode::OK, buffer) } fn prefix_match <'a> (hay: &'a str, needle: &str) -> Option <&'a str> { if hay.starts_with (needle) { Some (&hay [needle.len ()..]) } else { None } } async fn handle_all (req: Request , state: Arc ) -> Result , Infallible> { let path = req.uri ().path (); println! ("{}", path); if let Some (watch_code) = prefix_match (path, "/watch/") { Ok (handle_watch (state, watch_code.into ()).await) } else if let Some (watch_code) = prefix_match (path, "/wake/") { Ok (handle_wake (state, watch_code.into ()).await) } else if let Some (listen_code) = prefix_match (path, "/http_listen/") { Ok (handle_http_listen (state, listen_code.into ()).await) } else if let Some (rest) = prefix_match (path, "/http_request/") { if let Some (idx) = rest.find ('/') { let listen_code = &rest [0..idx]; let path = &rest [idx + 1..]; Ok (handle_http_request (state, listen_code.into (), path.into ()).await) } else { Ok (status_reply (StatusCode::BAD_REQUEST, "Bad URI format")) } } /* else if let Some (name) = prefix_match (path, "/udp_send/") { Ok (handle_udp_send ().await) } else if let Some (name) = prefix_match (path, "/udp_recv/") { Ok (handle_udp_recv ().await) } */ else { Ok (status_reply (StatusCode::OK, "Hi\n")) } } #[tokio::main] async fn main () -> Result <(), Box > { let addr = SocketAddr::from(([0, 0, 0, 0], 4000)); let state = Arc::new (ServerState::default ()); let make_svc = make_service_fn (|_conn| { let state = state.clone (); async { Ok::<_, Infallible> (service_fn (move |req| { let state = state.clone (); handle_all (req, state) })) } }); let server = Server::bind (&addr).serve (make_svc); server.await?; Ok (()) }