diff --git a/Cargo.toml b/Cargo.toml index baa1dd5..d809441 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,6 @@ edition = "2018" futures = "0.3.7" hyper = "0.13.8" -reqwest = "0.10.8" +reqwest = { version = "0.10.8", features = ["stream"] } tokio = { version = "0.2", features = ["full"] } ulid = "0.4.1" diff --git a/src/bin/relay.rs b/src/bin/relay.rs index b5de82a..3d451f0 100644 --- a/src/bin/relay.rs +++ b/src/bin/relay.rs @@ -36,7 +36,7 @@ enum Message { //HttpRequestRequest (String), HttpRequestResponse (String), // HttpResponseRequest (String), - HttpResponseResponse (String), + HttpResponseResponse (Vec ), } #[derive (Default)] @@ -93,7 +93,7 @@ async fn handle_http_response ( -> Response { println! ("Step 6"); - let payload = String::from (std::str::from_utf8 (&hyper::body::to_bytes (req.into_body ()).await.unwrap ()).unwrap ()); + let payload = hyper::body::to_bytes (req.into_body ()).await.unwrap ().to_vec (); { let mut watchers = state.watchers.lock ().await; diff --git a/src/bin/server.rs b/src/bin/server.rs index 80f8cc5..4430459 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,22 +1,39 @@ use std::{ + convert::Infallible, error::Error, sync::Arc, + time::Duration, }; use hyper::{ StatusCode, Uri, }; -use reqwest::Client; -use tokio::fs::File; +use reqwest::{ + Body, + Client, +}; +use tokio::{ + fs::File, + io::AsyncReadExt, + sync::mpsc::{ + channel, + Receiver, + }, + time::delay_for, +}; #[tokio::main] async fn main () -> Result <(), Box > { let client = Arc::new (Client::new ()); - let path = "/home/user/pictures/bzqcChY.jpg"; + let mut backoff_delay = 0; loop { + if backoff_delay > 0 { + delay_for (Duration::from_millis (backoff_delay)).await; + } + let _uri = Uri::builder () .scheme ("http") .authority ("127.0.0.1:4000") @@ -29,9 +46,13 @@ async fn main () -> Result <(), Box > { let req_resp = match req_req.send ().await { Err (e) => { println! ("Err: {:?}", e); + backoff_delay = backoff_delay * 2 + 500; continue; }, - Ok (r) => r, + Ok (r) => { + backoff_delay = 0; + r + }, }; if req_resp.status () != StatusCode::OK { @@ -57,7 +78,29 @@ async fn main () -> Result <(), Box > { .path_and_query ("/listen/alien_wildlands") .build ().unwrap (); - let resp_req = client.post (&format! ("http://127.0.0.1:4000/http_response/{}", body)).body (payload); + let (tx, rx) = channel (2); + //let rx: Receiver > = rx; + + tokio::spawn (async move { + let path = "/home/user/pictures/bzqcChY.jpg"; + let mut f = File::open (path).await.unwrap (); + let mut tx = tx; + + loop { + let mut buffer = vec! [0u8; 256]; + let bytes_read = f.read (&mut buffer).await.unwrap (); + + buffer.truncate (bytes_read); + + if bytes_read == 0 { + break; + } + + tx.send (Ok::<_, Infallible> (buffer)).await; + } + }); + + let resp_req = client.post (&format! ("http://127.0.0.1:4000/http_response/{}", body)).body (Body::wrap_stream (rx)); println! ("Step 6"); match resp_req.send ().await { diff --git a/todo.md b/todo.md new file mode 100644 index 0000000..b3c0cf9 --- /dev/null +++ b/todo.md @@ -0,0 +1 @@ +- Streaming in Step 6/7 on the relay