diff --git a/Cargo.toml b/Cargo.toml index c38bf38..baa1dd5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [package] + name = "ptth" version = "0.1.0" authors = ["_"] @@ -7,7 +8,9 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] + futures = "0.3.7" hyper = "0.13.8" reqwest = "0.10.8" tokio = { version = "0.2", features = ["full"] } +ulid = "0.4.1" diff --git a/src/bin/relay.rs b/src/bin/relay.rs index 519a679..7dc37c1 100644 --- a/src/bin/relay.rs +++ b/src/bin/relay.rs @@ -26,7 +26,10 @@ use ptth::watcher::Watchers; #[derive (Clone)] enum Message { Meow, - HttpRequest (String), + //HttpRequestRequest (String), + HttpRequestResponse (String), + // HttpResponseRequest (String), + HttpResponseResponse (String), } #[derive (Default)] @@ -65,12 +68,41 @@ async fn handle_wake (state: Arc , watcher_code: String) async fn handle_http_listen (state: Arc , watcher_code: String) -> Response { + println! ("Step 1"); match Watchers::long_poll (state.watchers.clone (), watcher_code).await { - Some (Message::HttpRequest (uri)) => status_reply (StatusCode::OK, uri), + Some (Message::HttpRequestResponse (uri)) => { + println! ("Step 3"); + status_reply (StatusCode::OK, uri) + }, _ => status_reply (StatusCode::GATEWAY_TIMEOUT, "no\n"), } } +async fn handle_http_response ( + state: Arc , + req_id: String, + response: String +) + -> Response +{ + println! ("Step 6"); + + { + let mut watchers = state.watchers.lock ().await; + + println! ("Step 7"); + if ! watchers.wake_one (Message::HttpResponseResponse (response), &req_id) + { + println! ("Step 8"); + return status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n"); + } + else { + println! ("Step 8"); + status_reply (StatusCode::OK, "ok\n") + } + } +} + async fn handle_http_request ( state: Arc , watcher_code: String, @@ -78,13 +110,42 @@ async fn handle_http_request ( ) -> Response { - let mut watchers = state.watchers.lock ().await; + let req_id = format! ("client_{}", ulid::Ulid::new ().to_string ()); - if watchers.wake_one (Message::HttpRequest (uri), &watcher_code) { - status_reply (StatusCode::OK, "ok\n") + println! ("Step 2 {}", req_id); + + let (s, r) = oneshot::channel (); + let timeout = Duration::from_secs (5); + + let id_2 = req_id.clone (); + { + let mut that = state.watchers.lock ().await; + that.add_watcher_with_id (s, id_2) } - else { - status_reply (StatusCode::BAD_REQUEST, "no\n") + + tokio::spawn (async move { + { + let mut watchers = state.watchers.lock ().await; + + println! ("Step 3"); + if ! watchers.wake_one (Message::HttpRequestResponse (req_id.clone ()), &watcher_code) { + watchers.remove_watcher (&req_id); + } + } + + delay_for (timeout).await; + { + let mut that = state.watchers.lock ().await; + that.remove_watcher (&req_id); + } + }); + + match r.await { + Ok (Message::HttpResponseResponse (s)) => { + println! ("Step 7"); + status_reply (StatusCode::OK, s) + }, + _ => status_reply (StatusCode::GATEWAY_TIMEOUT, "server didn't reply in time or somethin'"), } } @@ -149,6 +210,16 @@ async fn handle_all (req: Request , state: Arc ) Ok (status_reply (StatusCode::BAD_REQUEST, "Bad URI format")) } } + else if let Some (rest) = prefix_match (path, "/http_response/") { + if let Some (idx) = rest.find ('/') { + let request_code = &rest [0..idx]; + let response = &rest [idx + 1..]; + Ok (handle_http_response (state, request_code.into (), response.into ()).await) + } + else { + Ok (status_reply (StatusCode::BAD_REQUEST, "Bad URI format")) + } + } /* else if let Some (name) = prefix_match (path, "/udp_send/") { Ok (handle_udp_send ().await) diff --git a/src/bin/server.rs b/src/bin/server.rs index 6308ca0..081e0c0 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,4 +1,7 @@ -use std::error::Error; +use std::{ + error::Error, + sync::Arc, +}; use hyper::{ StatusCode, @@ -9,7 +12,7 @@ use tokio::fs::File; #[tokio::main] async fn main () -> Result <(), Box > { - let client = Client::new (); + let client = Arc::new (Client::new ()); let path = "/home/user/pictures/bzqcChY.jpg"; @@ -17,11 +20,13 @@ async fn main () -> Result <(), Box > { let _uri = Uri::builder () .scheme ("http") .authority ("127.0.0.1:4000") - .path_and_query ("/listen/alien_wildlands") + .path_and_query ("/http_listen/alien_wildlands") .build ().unwrap (); - let req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands"); - let response = match req.send ().await { + let req_req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands"); + + println! ("Step 1"); + let req_resp = match req_req.send ().await { Err (e) => { println! ("Err: {:?}", e); continue; @@ -29,15 +34,38 @@ async fn main () -> Result <(), Box > { Ok (r) => r, }; - if response.status () != StatusCode::OK { + if req_resp.status () != StatusCode::OK { continue; } - let body = response.bytes ().await?; - let body = std::str::from_utf8 (&body)?; + println! ("Step 3"); + + let body = req_resp.bytes ().await?; + let body = String::from (std::str::from_utf8 (&body)?); println! ("Client requested {}", body); + + println! ("Step 4/5"); + let payload = String::from ("Ha ha hue hue it worked."); + + println! ("Step 6"); + let client = client.clone (); + tokio::spawn (async move { + let resp_req = Uri::builder () + .scheme ("http") + .authority ("127.0.0.1:4000") + .path_and_query ("/listen/alien_wildlands") + .build ().unwrap (); + + let resp_req = client.get (&format! ("http://127.0.0.1:4000/http_response/{}/{}", body, payload)); + + println! ("Step 6"); + match resp_req.send ().await { + Err (e) => { + println! ("Err: {:?}", e); + }, + Ok (_) => (), + } + }); } - - Ok (()) } diff --git a/src/watcher.rs b/src/watcher.rs index 95a3957..7d31852 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -29,8 +29,8 @@ impl Watchers { self.senders.insert (id, s); } - pub fn remove_watcher (&mut self, id: String) { - self.senders.remove (&id); + pub fn remove_watcher (&mut self, id: &str) { + self.senders.remove (id); } /* fn wake_all (&mut self, msg: T) { @@ -63,7 +63,6 @@ impl Watchers { println! ("long_poll {}", id); let (s, r) = oneshot::channel (); - let timeout = Duration::from_secs (5); let id_2 = id.clone (); @@ -75,7 +74,7 @@ impl Watchers { tokio::spawn (async move { delay_for (timeout).await; let mut that = that.lock ().await; - that.remove_watcher (id); + that.remove_watcher (&id); }); if let Ok (message) = r.await {