use std::{ cmp::{min, max}, convert::{Infallible, TryInto}, error::Error, io::SeekFrom, path::PathBuf, sync::Arc, time::Duration, }; use hyper::{ StatusCode, }; use lazy_static::*; use regex::Regex; use reqwest::{ Body, Client, }; use tokio::{ fs::{ File, read_dir, ReadDir, }, io::AsyncReadExt, sync::mpsc::{ channel, }, time::delay_for, }; use ptth::http_serde; fn parse_range_header (range_str: &str) -> (Option , Option ) { lazy_static! { static ref RE: Regex = Regex::new (r"^bytes=(\d*)-(\d*)$").expect ("Couldn't compile regex for Range header"); } println! ("{}", range_str); let caps = match RE.captures (range_str) { Some (x) => x, _ => return (None, None), }; let start = caps.get (1).map (|x| x.as_str ()); let end = caps.get (2).map (|x| x.as_str ()); let start = start.map (|x| u64::from_str_radix (x, 10).ok ()).flatten (); let end = end.map (|x| u64::from_str_radix (x, 10).ok ()).flatten (); (start, end) } const SERVER_URL: &str = "http://127.0.0.1:4000"; struct ResponseHandle <'a> { client: &'a Client, req_id: &'a str, } impl <'a> ResponseHandle <'a> { async fn respond ( self, resp_parts: http_serde::ResponseParts, body: Option ) { let mut resp_req = self.client .post (&format! ("{}/http_response/{}", SERVER_URL, self.req_id)) .header ("X-PTTH-2LJYXWC4", base64::encode (rmp_serde::to_vec (&resp_parts).unwrap ())); if let Some (body) = body { resp_req = resp_req.body (body); } //println! ("Step 6"); if let Err (e) = resp_req.send ().await { println! ("Err: {:?}", e); } } async fn respond_2 ( self, r: http_serde::Response ) { self.respond (r.parts, r.body).await } } async fn serve_dir (mut dir: ReadDir) -> http_serde::Response { let (tx, rx) = channel (2); tokio::spawn (async move { let mut tx = tx; tx.send (Ok::<_, Infallible> (String::from ("").into_bytes ())).await.unwrap (); }); let mut response = http_serde::Response::default (); response.header ("content-type".into (), String::from ("text/html").into_bytes ()); response.body (Body::wrap_stream (rx)); response } async fn serve_file ( mut f: File, should_send_body: bool, range_start: Option , range_end: Option ) -> http_serde::Response { let (tx, rx) = channel (2); let body = if should_send_body { Some (Body::wrap_stream (rx)) } else { None }; let file_md = f.metadata ().await.unwrap (); let file_len = file_md.len (); let start = range_start.unwrap_or (0); let end = range_end.unwrap_or (file_len); let start = max (0, min (start, file_len)); let end = max (0, min (end, file_len)); f.seek (SeekFrom::Start (start)).await.unwrap (); println! ("Serving range {}-{}", start, end); if should_send_body { tokio::spawn (async move { //println! ("Opening file {:?}", path); let mut tx = tx; //let mut bytes_sent = 0; let mut bytes_left = end - start; loop { let mut buffer = vec! [0u8; 65_536]; let bytes_read: u64 = f.read (&mut buffer).await.unwrap ().try_into ().unwrap (); let bytes_read = min (bytes_left, bytes_read); buffer.truncate (bytes_read.try_into ().unwrap ()); if bytes_read == 0 { break; } if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { break; } bytes_left -= bytes_read; if bytes_left == 0 { break; } //bytes_sent += bytes_read; //println! ("Sent {} bytes", bytes_sent); //delay_for (Duration::from_millis (50)).await; } }); } let mut response = http_serde::Response::default (); response.header (String::from ("accept-ranges"), b"bytes".to_vec ()); if range_start.is_none () && range_end.is_none () { response.status_code (http_serde::StatusCode::Ok); response.header (String::from ("content-length"), end.to_string ().into_bytes ()); } else { response.status_code (http_serde::StatusCode::PartialContent); response.header (String::from ("content-range"), format! ("bytes {}-{}/{}", start, end - 1, end).into_bytes ()); } if let Some (body) = body { response.body (body); } response } async fn serve_error ( status_code: http_serde::StatusCode, msg: String ) -> http_serde::Response { let mut response = http_serde::Response::default (); response.status_code (status_code); response.body (msg.into ()); response } async fn handle_all ( client: &Client, req_resp: reqwest::Response ) -> Result <(), Box > { //println! ("Step 1"); if req_resp.status () != StatusCode::OK { // TODO: Error handling return Ok (()); } println! ("Step 3"); let body = req_resp.bytes ().await?; let parts: http_serde::RequestParts = match rmp_serde::from_read_ref (&body) { Ok (x) => x, _ => return Ok (()), }; let (req_id, uri) = (parts.id, parts.uri); println! ("Client requested {}", uri); let mut range_start = None; let mut range_end = None; for (k, v) in parts.headers.iter () { let v = std::str::from_utf8 (v).unwrap (); //println! ("{}: {}", k, v); if k == "range" { let (start, end) = parse_range_header (v); range_start = start; range_end = end; } } let should_send_body = match &parts.method { http_serde::Method::Get => true, _ => false, }; //println! ("Step 6"); use percent_encoding::*; let encoded_path = &uri [1..]; let path = percent_decode (encoded_path.as_bytes ()).decode_utf8 ().unwrap (); let mut full_path = PathBuf::from ("/home/user"); full_path.push (&*path); let response = if let Ok (dir) = read_dir (&full_path).await { serve_dir (dir).await } else if let Ok (file) = File::open (&full_path).await { serve_file ( file, should_send_body, range_start, range_end ).await } else { serve_error (http_serde::StatusCode::NotFound, "404 Not Found".into ()).await }; let response_handle = ResponseHandle { client, req_id: &req_id, }; response_handle.respond_2 (response).await; Ok (()) } #[tokio::main] async fn main () -> Result <(), Box > { let client = Arc::new (Client::new ()); let mut backoff_delay = 0; loop { if backoff_delay > 0 { delay_for (Duration::from_millis (backoff_delay)).await; } let req_req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands"); let req_resp = match req_req.send ().await { Err (e) => { println! ("Err: {:?}", e); backoff_delay = backoff_delay * 2 + 500; continue; }, Ok (r) => { backoff_delay = 0; r }, }; // Spawn another task for each request so we can immediately listen // for the next connection let client = client.clone (); tokio::spawn (async move { match handle_all (&client, req_resp).await { _ => (), } }); } }