use std::{ cmp::{min, max}, collections::*, convert::{Infallible, TryInto}, error::Error, io::SeekFrom, path::PathBuf, sync::Arc, time::Duration, }; use hyper::{ StatusCode, Uri, }; use lazy_static::*; use regex::Regex; use reqwest::{ Body, Client, }; use tokio::{ fs::File, io::AsyncReadExt, sync::mpsc::{ channel, }, time::delay_for, }; use ptth::http_serde; fn parse_range_header (range_str: &str) -> (Option , Option ) { lazy_static! { static ref RE: Regex = Regex::new (r"^bytes=(\d*)-(\d*)$").expect ("Couldn't compile regex for Range header"); } println! ("{}", range_str); let caps = match RE.captures (range_str) { Some (x) => x, _ => return (None, None), }; let start = caps.get (1).map (|x| x.as_str ()); let end = caps.get (2).map (|x| x.as_str ()); let start = start.map (|x| u64::from_str_radix (x, 10).ok ()).flatten (); let end = end.map (|x| u64::from_str_radix (x, 10).ok ()).flatten (); (start, end) } async fn serve_file ( client: Arc , mut f: File, req_id: String, should_send_body: bool, range_start: Option , range_end: Option ) { let (tx, rx) = channel (2); let body = if should_send_body { Some (Body::wrap_stream (rx)) } else { None }; let file_md = f.metadata ().await.unwrap (); let file_len = file_md.len (); let start = range_start.unwrap_or (0); let end = range_end.unwrap_or (file_len); let start = max (0, min (start, file_len)); let end = max (0, min (end, file_len)); f.seek (SeekFrom::Start (start)).await.unwrap (); println! ("Serving range {}-{}", start, end); if should_send_body { tokio::spawn (async move { //println! ("Opening file {:?}", path); let mut tx = tx; //let mut bytes_sent = 0; let mut bytes_left = end - start; loop { let mut buffer = vec! [0u8; 65_536]; let bytes_read: u64 = f.read (&mut buffer).await.unwrap ().try_into ().unwrap (); let bytes_read = min (bytes_left, bytes_read); buffer.truncate (bytes_read.try_into ().unwrap ()); if bytes_read == 0 { break; } if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { break; } bytes_left -= bytes_read; if bytes_left == 0 { break; } //bytes_sent += bytes_read; //println! ("Sent {} bytes", bytes_sent); delay_for (Duration::from_millis (50)).await; } }); } let mut headers: HashMap > = Default::default (); headers.insert (String::from ("accept-ranges"), b"bytes".to_vec ()); let status_code; if range_start.is_none () && range_end.is_none () { headers.insert (String::from ("content-length"), end.to_string ().into_bytes ()); status_code = http_serde::StatusCode::Ok; } else { headers.insert (String::from ("content-range"), format! ("bytes {}-{}/{}", start, end - 1, end).into_bytes ()); status_code = http_serde::StatusCode::PartialContent; } let resp_parts = http_serde::ResponseParts { status_code, headers, }; let mut resp_req = client .post (&format! ("http://127.0.0.1:4000/http_response/{}", req_id)) .header ("X-PTTH-2LJYXWC4", base64::encode (rmp_serde::to_vec (&resp_parts).unwrap ())); if let Some (body) = body { resp_req = resp_req.body (body); } //println! ("Step 6"); if let Err (e) = resp_req.send ().await { println! ("Err: {:?}", e); } } #[tokio::main] async fn main () -> Result <(), Box > { let client = Arc::new (Client::new ()); let mut backoff_delay = 0; loop { if backoff_delay > 0 { delay_for (Duration::from_millis (backoff_delay)).await; } let _uri = Uri::builder () .scheme ("http") .authority ("127.0.0.1:4000") .path_and_query ("/http_listen/alien_wildlands") .build ().unwrap (); let req_req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands"); //println! ("Step 1"); let req_resp = match req_req.send ().await { Err (e) => { println! ("Err: {:?}", e); backoff_delay = backoff_delay * 2 + 500; continue; }, Ok (r) => { backoff_delay = 0; r }, }; if req_resp.status () != StatusCode::OK { continue; } println! ("Step 3"); let body = req_resp.bytes ().await?; let parts: http_serde::RequestParts = match rmp_serde::from_read_ref (&body) { Ok (x) => x, _ => continue, }; let (req_id, uri) = (parts.id, parts.uri); println! ("Client requested {}", uri); let mut range_start = None; let mut range_end = None; for (k, v) in parts.headers.iter () { let v = std::str::from_utf8 (v).unwrap (); //println! ("{}: {}", k, v); if k == "range" { let (start, end) = parse_range_header (v); range_start = start; range_end = end; } } let should_send_body = match &parts.method { http_serde::Method::Get => true, _ => false, }; //println! ("Step 6"); let client = client.clone (); tokio::spawn (async move { let mut path = PathBuf::from ("/home/user"); path.push (&uri [1..]); let f = File::open (path).await.unwrap (); serve_file ( client, f, req_id, should_send_body, range_start, range_end ).await; }); } }