diff --git a/src/bin/relay.rs b/src/bin/relay.rs index 33188bd..2918cc1 100644 --- a/src/bin/relay.rs +++ b/src/bin/relay.rs @@ -1,16 +1,17 @@ use std::{ collections::*, error::Error, + convert::{ + Infallible, + TryInto, + }, + net::SocketAddr, + str::FromStr, + sync::{ + Arc + }, + time::{Duration, Instant}, }; -use std::convert::{Infallible}; -use std::io::Write; -use std::net::SocketAddr; -use std::path::Path; -use std::str::FromStr; -use std::sync::{ - Arc -}; -use std::time::{Duration, Instant}; use futures::channel::oneshot; use hyper::{ @@ -24,22 +25,18 @@ use hyper::{ use hyper::service::{make_service_fn, service_fn}; use tokio::{ - sync::mpsc::{ - channel, - Receiver, - }, sync::Mutex, time::delay_for, }; -use ptth::watcher::Watchers; +use ptth::{ + http_serde::*, + watcher::Watchers, +}; enum Message { Meow, - //HttpRequestRequest (String), - HttpRequestResponse (String), - // HttpResponseRequest (String), - HttpResponseResponse (Vec ), + HttpRequestResponse (RequestParts), HttpResponseResponseStream (Body), } @@ -79,11 +76,11 @@ async fn handle_wake (state: Arc , watcher_code: String) async fn handle_http_listen (state: Arc , watcher_code: String) -> Response { - println! ("Step 1"); + //println! ("Step 1"); match Watchers::long_poll (state.watchers.clone (), watcher_code).await { - Some (Message::HttpRequestResponse (uri)) => { + Some (Message::HttpRequestResponse (parts)) => { println! ("Step 3"); - status_reply (StatusCode::OK, uri) + status_reply (StatusCode::OK, rmp_serde::to_vec (&parts).unwrap ()) }, _ => status_reply (StatusCode::GATEWAY_TIMEOUT, "no\n"), } @@ -105,7 +102,7 @@ async fn handle_http_response ( println! ("Step 7"); if ! watchers.wake_one (Message::HttpResponseResponseStream (body), &req_id) { - println! ("Step 8"); + println! ("Step 8 (bad thing)"); return status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n"); } else { @@ -116,31 +113,31 @@ async fn handle_http_response ( } async fn handle_http_request ( + parts: RequestParts, state: Arc , - watcher_code: String, - uri: String + watcher_code: String ) -> Response { - let req_id = format! ("client_{}", ulid::Ulid::new ().to_string ()); - - println! ("Step 2 {}", req_id); + println! ("Step 2 {}", parts.id); let (s, r) = oneshot::channel (); let timeout = Duration::from_secs (5); - let id_2 = req_id.clone (); + let id_2 = parts.id.clone (); { let mut that = state.watchers.lock ().await; that.add_watcher_with_id (s, id_2) } + let req_id = parts.id.clone (); + tokio::spawn (async move { { let mut watchers = state.watchers.lock ().await; println! ("Step 3"); - if ! watchers.wake_one (Message::HttpRequestResponse (req_id.clone ()), &watcher_code) { + if ! watchers.wake_one (Message::HttpRequestResponse (parts), &watcher_code) { watchers.remove_watcher (&req_id); } } @@ -201,7 +198,7 @@ async fn handle_all (req: Request , state: Arc ) -> Result , Infallible> { let path = req.uri ().path (); - println! ("{}", path); + //println! ("{}", path); if req.method () == Method::POST { return Ok (if let Some (request_code) = prefix_match (path, "/http_response/") { @@ -224,9 +221,14 @@ async fn handle_all (req: Request , state: Arc ) } else if let Some (rest) = prefix_match (path, "/http_request/") { if let Some (idx) = rest.find ('/') { - let listen_code = &rest [0..idx]; - let path = &rest [idx + 1..]; - Ok (handle_http_request (state, listen_code.into (), path.into ()).await) + let listen_code = String::from (&rest [0..idx]); + let path = String::from (&rest [idx + 1..]); + let (parts, _) = req.into_parts (); + let parts = match parts.try_into () { + Ok (x) => x, + _ => return Ok (status_reply (StatusCode::BAD_REQUEST, "Couldn't convert request")), + }; + Ok (handle_http_request (parts, state, listen_code).await) } else { Ok (status_reply (StatusCode::BAD_REQUEST, "Bad URI format")) diff --git a/src/bin/server.rs b/src/bin/server.rs index 8e2a7e3..b24230c 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -23,6 +23,8 @@ use tokio::{ time::delay_for, }; +use ptth::http_serde::*; + #[tokio::main] async fn main () -> Result <(), Box > { let client = Arc::new (Client::new ()); @@ -42,7 +44,7 @@ async fn main () -> Result <(), Box > { let req_req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands"); - println! ("Step 1"); + //println! ("Step 1"); let req_resp = match req_req.send ().await { Err (e) => { println! ("Err: {:?}", e); @@ -62,12 +64,15 @@ async fn main () -> Result <(), Box > { println! ("Step 3"); let body = req_resp.bytes ().await?; - let body = String::from (std::str::from_utf8 (&body)?); + let parts: RequestParts = match rmp_serde::from_read_ref (&body) + { + Ok (x) => x, + _ => continue, + }; - println! ("Client requested {}", body); + println! ("Client requested {}", parts.uri); println! ("Step 4/5"); - let payload = String::from ("Ha ha hue hue it worked.\n"); println! ("Step 6"); let client = client.clone (); @@ -84,6 +89,8 @@ async fn main () -> Result <(), Box > { tokio::spawn (async move { let path = "/home/user/pictures/bzqcChY.jpg"; let path = "/home/user/videos/Decearing Egg.webm"; + let path = "/home/user/projects/2020/ptth/README.md"; + let mut f = File::open (path).await.unwrap (); let mut tx = tx; let mut bytes_sent = 0; @@ -98,7 +105,7 @@ async fn main () -> Result <(), Box > { break; } - tx.send (Ok::<_, Infallible> (buffer)).await; + tx.send (Ok::<_, Infallible> (buffer)).await.unwrap (); bytes_sent += bytes_read; println! ("Sent {} bytes", bytes_sent); @@ -107,7 +114,7 @@ async fn main () -> Result <(), Box > { } }); - let resp_req = client.post (&format! ("http://127.0.0.1:4000/http_response/{}", body)).body (Body::wrap_stream (rx)); + let resp_req = client.post (&format! ("http://127.0.0.1:4000/http_response/{}", parts.id)).body (Body::wrap_stream (rx)); println! ("Step 6"); match resp_req.send ().await { diff --git a/src/http_serde.rs b/src/http_serde.rs index ebdcc72..e52a2fa 100644 --- a/src/http_serde.rs +++ b/src/http_serde.rs @@ -44,6 +44,7 @@ impl From for hyper::Method { #[derive (Deserialize, Serialize)] pub struct RequestParts { + pub id: String, pub method: Method, // Technically URIs are subtle and complex but I don't care @@ -70,6 +71,7 @@ impl TryFrom for RequestParts { ); Ok (Self { + id: ulid::Ulid::new ().to_string (), method, uri, headers, diff --git a/src/watcher.rs b/src/watcher.rs index 40f8b13..f8f636f 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -34,7 +34,7 @@ impl Watchers { } pub fn wake_one (&mut self, msg: T, id: &str) -> bool { - println! ("wake_one {}", id); + //println! ("wake_one {}", id); if let Some (waiter) = self.senders.remove (id) { waiter.send (msg).ok (); @@ -50,7 +50,7 @@ impl Watchers { } pub async fn long_poll (that: Arc >, id: String) -> Option { - println! ("long_poll {}", id); + //println! ("long_poll {}", id); let (s, r) = oneshot::channel (); let timeout = Duration::from_secs (5);