use std::{ collections::*, error::Error, }; use std::convert::{Infallible}; use std::io::Write; use std::net::SocketAddr; use std::path::Path; use std::str::FromStr; use std::sync::{ Arc }; use std::time::{Duration, Instant}; use futures::channel::oneshot; use hyper::{ Body, Method, Request, Response, Server, StatusCode, }; use hyper::service::{make_service_fn, service_fn}; use tokio::{ sync::Mutex, time::delay_for, }; use ptth::watcher::Watchers; #[derive (Clone)] enum Message { Meow, //HttpRequestRequest (String), HttpRequestResponse (String), // HttpResponseRequest (String), HttpResponseResponse (Vec ), } #[derive (Default)] struct ServerState { watchers: Arc >>, } fn status_reply > (status: StatusCode, b: B) -> Response { Response::builder ().status (status).body (b.into ()).unwrap () } async fn handle_watch (state: Arc , watcher_code: String) -> Response { match Watchers::long_poll (state.watchers.clone (), watcher_code).await { None => status_reply (StatusCode::OK, "no\n"), Some (_) => status_reply (StatusCode::OK, "actually, yes\n"), } } async fn handle_wake (state: Arc , watcher_code: String) -> Response { let mut watchers = state.watchers.lock ().await; if watchers.wake_one (Message::Meow, &watcher_code) { status_reply (StatusCode::OK, "ok\n") } else { status_reply (StatusCode::BAD_REQUEST, "no\n") } } async fn handle_http_listen (state: Arc , watcher_code: String) -> Response { println! ("Step 1"); match Watchers::long_poll (state.watchers.clone (), watcher_code).await { Some (Message::HttpRequestResponse (uri)) => { println! ("Step 3"); status_reply (StatusCode::OK, uri) }, _ => status_reply (StatusCode::GATEWAY_TIMEOUT, "no\n"), } } async fn handle_http_response ( req: Request , state: Arc , req_id: String, ) -> Response { println! ("Step 6"); let payload = hyper::body::to_bytes (req.into_body ()).await.unwrap ().to_vec (); { let mut watchers = state.watchers.lock ().await; println! ("Step 7"); if ! watchers.wake_one (Message::HttpResponseResponse (payload), &req_id) { println! ("Step 8"); return status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n"); } else { println! ("Step 8"); status_reply (StatusCode::OK, "ok\n") } } } async fn handle_http_request ( state: Arc , watcher_code: String, uri: String ) -> Response { let req_id = format! ("client_{}", ulid::Ulid::new ().to_string ()); println! ("Step 2 {}", req_id); let (s, r) = oneshot::channel (); let timeout = Duration::from_secs (5); let id_2 = req_id.clone (); { let mut that = state.watchers.lock ().await; that.add_watcher_with_id (s, id_2) } tokio::spawn (async move { { let mut watchers = state.watchers.lock ().await; println! ("Step 3"); if ! watchers.wake_one (Message::HttpRequestResponse (req_id.clone ()), &watcher_code) { watchers.remove_watcher (&req_id); } } delay_for (timeout).await; { let mut that = state.watchers.lock ().await; that.remove_watcher (&req_id); } }); match r.await { Ok (Message::HttpResponseResponse (s)) => { println! ("Step 7"); status_reply (StatusCode::OK, s) }, _ => status_reply (StatusCode::GATEWAY_TIMEOUT, "server didn't reply in time or somethin'"), } } async fn handle_udp_send () -> Response { use tokio::net::UdpSocket; let mut sock = UdpSocket::bind (SocketAddr::from (([0,0,0,0], 0))).await.unwrap (); sock.send_to (b"handle_udp_send\n", SocketAddr::from (([127,0,0,1], 9000u16))).await.unwrap (); status_reply (StatusCode::OK, "ok\n") } async fn handle_udp_recv () -> Response { use tokio::net::UdpSocket; let mut sock = UdpSocket::bind (SocketAddr::from (([0,0,0,0], 4001))).await.unwrap (); let mut buffer = vec! [0u8; 4096]; let (bytes_received, _addr) = sock.recv_from (&mut buffer [..]).await.unwrap (); buffer.truncate (bytes_received); status_reply (StatusCode::OK, buffer) } fn prefix_match <'a> (hay: &'a str, needle: &str) -> Option <&'a str> { if hay.starts_with (needle) { Some (&hay [needle.len ()..]) } else { None } } async fn handle_all (req: Request , state: Arc ) -> Result , Infallible> { let path = req.uri ().path (); println! ("{}", path); if req.method () == Method::POST { return Ok (if let Some (request_code) = prefix_match (path, "/http_response/") { let request_code = request_code.into (); handle_http_response (req, state, request_code).await } else { status_reply (StatusCode::BAD_REQUEST, "Can't POST this\n") }); } if let Some (watch_code) = prefix_match (path, "/watch/") { Ok (handle_watch (state, watch_code.into ()).await) } else if let Some (watch_code) = prefix_match (path, "/wake/") { Ok (handle_wake (state, watch_code.into ()).await) } else if let Some (listen_code) = prefix_match (path, "/http_listen/") { Ok (handle_http_listen (state, listen_code.into ()).await) } else if let Some (rest) = prefix_match (path, "/http_request/") { if let Some (idx) = rest.find ('/') { let listen_code = &rest [0..idx]; let path = &rest [idx + 1..]; Ok (handle_http_request (state, listen_code.into (), path.into ()).await) } else { Ok (status_reply (StatusCode::BAD_REQUEST, "Bad URI format")) } } /* else if let Some (name) = prefix_match (path, "/udp_send/") { Ok (handle_udp_send ().await) } else if let Some (name) = prefix_match (path, "/udp_recv/") { Ok (handle_udp_recv ().await) } */ else { Ok (status_reply (StatusCode::OK, "Hi\n")) } } #[tokio::main] async fn main () -> Result <(), Box > { let addr = SocketAddr::from(([0, 0, 0, 0], 4000)); let state = Arc::new (ServerState::default ()); let make_svc = make_service_fn (|_conn| { let state = state.clone (); async { Ok::<_, Infallible> (service_fn (move |req| { let state = state.clone (); handle_all (req, state) })) } }); let server = Server::bind (&addr).serve (make_svc); server.await?; Ok (()) }