use std::{ convert::Infallible, error::Error, io::SeekFrom, path::PathBuf, sync::Arc, time::Duration, }; use hyper::{ StatusCode, Uri, }; use lazy_static::*; use regex::Regex; use reqwest::{ Body, Client, }; use tokio::{ fs::File, io::AsyncReadExt, sync::mpsc::{ channel, }, time::delay_for, }; use ptth::http_serde::*; fn parse_range_header (range_str: &str) -> (Option , Option ) { lazy_static! { static ref RE: Regex = Regex::new (r"^(\d+)-(\d+)$").expect ("Couldn't compile regex for Range header"); } let caps = match RE.captures (range_str) { Some (x) => x, _ => return (None, None), }; let start = caps.get (1).map (|x| x.as_str ()); let end = caps.get (2).map (|x| x.as_str ()); let start = start.map (|x| u64::from_str_radix (x, 10).ok ()).flatten (); let end = end.map (|x| u64::from_str_radix (x, 10).ok ()).flatten (); (start, end) } #[tokio::main] async fn main () -> Result <(), Box > { let client = Arc::new (Client::new ()); let mut backoff_delay = 0; loop { if backoff_delay > 0 { delay_for (Duration::from_millis (backoff_delay)).await; } let _uri = Uri::builder () .scheme ("http") .authority ("127.0.0.1:4000") .path_and_query ("/http_listen/alien_wildlands") .build ().unwrap (); let req_req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands"); //println! ("Step 1"); let req_resp = match req_req.send ().await { Err (e) => { println! ("Err: {:?}", e); backoff_delay = backoff_delay * 2 + 500; continue; }, Ok (r) => { backoff_delay = 0; r }, }; if req_resp.status () != StatusCode::OK { continue; } println! ("Step 3"); let body = req_resp.bytes ().await?; let parts: RequestParts = match rmp_serde::from_read_ref (&body) { Ok (x) => x, _ => continue, }; let (req_id, uri) = (parts.id, parts.uri); println! ("Client requested {}", uri); let mut range_start = None; let mut range_end = None; for (k, v) in parts.headers.iter () { let v = std::str::from_utf8 (v).unwrap (); println! ("{}: {}", k, v); if k == "range" { let (start, end) = parse_range_header (v); range_start = start; range_end = end; } } println! ("Step 4/5"); println! ("Step 6"); let client = client.clone (); tokio::spawn (async move { let (tx, rx) = channel (2); let mut path = PathBuf::from ("/home/user"); path.push (&uri [1..]); let mut f = File::open (path).await.unwrap (); if let Some (start) = range_start { f.seek (SeekFrom::Start (start)).await.unwrap (); } tokio::spawn (async move { //println! ("Opening file {:?}", path); let mut tx = tx; //let mut bytes_sent = 0; loop { let mut buffer = vec! [0u8; 65_536]; let bytes_read = f.read (&mut buffer).await.unwrap (); buffer.truncate (bytes_read); if bytes_read == 0 { break; } if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { break; } //bytes_sent += bytes_read; //println! ("Sent {} bytes", bytes_sent); delay_for (Duration::from_millis (50)).await; } }); let resp_req = client.post (&format! ("http://127.0.0.1:4000/http_response/{}", req_id)).body (Body::wrap_stream (rx)); println! ("Step 6"); if let Err (e) = resp_req.send ().await { println! ("Err: {:?}", e); } }); } }