use std::{ convert::Infallible, error::Error, sync::Arc, time::Duration, }; use hyper::{ StatusCode, Uri, }; use reqwest::{ Body, Client, }; use tokio::{ fs::File, io::AsyncReadExt, sync::mpsc::{ channel, Receiver, }, time::delay_for, }; use ptth::http_serde::*; #[tokio::main] async fn main () -> Result <(), Box > { let client = Arc::new (Client::new ()); let mut backoff_delay = 0; loop { if backoff_delay > 0 { delay_for (Duration::from_millis (backoff_delay)).await; } let _uri = Uri::builder () .scheme ("http") .authority ("127.0.0.1:4000") .path_and_query ("/http_listen/alien_wildlands") .build ().unwrap (); let req_req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands"); //println! ("Step 1"); let req_resp = match req_req.send ().await { Err (e) => { println! ("Err: {:?}", e); backoff_delay = backoff_delay * 2 + 500; continue; }, Ok (r) => { backoff_delay = 0; r }, }; if req_resp.status () != StatusCode::OK { continue; } println! ("Step 3"); let body = req_resp.bytes ().await?; let parts: RequestParts = match rmp_serde::from_read_ref (&body) { Ok (x) => x, _ => continue, }; println! ("Client requested {}", parts.uri); println! ("Step 4/5"); println! ("Step 6"); let client = client.clone (); tokio::spawn (async move { let resp_req = Uri::builder () .scheme ("http") .authority ("127.0.0.1:4000") .path_and_query ("/listen/alien_wildlands") .build ().unwrap (); let (tx, rx) = channel (2); //let rx: Receiver > = rx; tokio::spawn (async move { let path = "/home/user/pictures/bzqcChY.jpg"; let path = "/home/user/videos/Decearing Egg.webm"; let path = "/home/user/projects/2020/ptth/README.md"; let mut f = File::open (path).await.unwrap (); let mut tx = tx; let mut bytes_sent = 0; loop { let mut buffer = vec! [0u8; 4096]; let bytes_read = f.read (&mut buffer).await.unwrap (); buffer.truncate (bytes_read); if bytes_read == 0 { break; } tx.send (Ok::<_, Infallible> (buffer)).await.unwrap (); bytes_sent += bytes_read; println! ("Sent {} bytes", bytes_sent); delay_for (Duration::from_millis (50)).await; } }); let resp_req = client.post (&format! ("http://127.0.0.1:4000/http_response/{}", parts.id)).body (Body::wrap_stream (rx)); println! ("Step 6"); match resp_req.send ().await { Err (e) => { println! ("Err: {:?}", e); }, Ok (_) => (), } }); } }