diff --git a/src/bin/server.rs b/src/bin/server.rs index bb45c50..4694070 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,9 +1,5 @@ use std::{ - cmp::{min, max}, - convert::{Infallible, TryInto}, error::Error, - io::SeekFrom, - path::PathBuf, sync::Arc, time::Duration, }; @@ -11,253 +7,15 @@ use std::{ use hyper::{ StatusCode, }; -use lazy_static::*; -use regex::Regex; -use reqwest::{ - Body, - Client, -}; +use reqwest::Client; use tokio::{ - fs::{ - File, - read_dir, - ReadDir, - }, - io::AsyncReadExt, - sync::mpsc::{ - channel, - }, time::delay_for, }; use ptth::http_serde; -fn parse_range_header (range_str: &str) -> (Option , Option ) { - lazy_static! { - static ref RE: Regex = Regex::new (r"^bytes=(\d*)-(\d*)$").expect ("Couldn't compile regex for Range header"); - } - - println! ("{}", range_str); - - let caps = match RE.captures (range_str) { - Some (x) => x, - _ => return (None, None), - }; - let start = caps.get (1).map (|x| x.as_str ()); - let end = caps.get (2).map (|x| x.as_str ()); - - let start = start.map (|x| u64::from_str_radix (x, 10).ok ()).flatten (); - let end = end.map (|x| u64::from_str_radix (x, 10).ok ()).flatten (); - - (start, end) -} - const SERVER_URL: &str = "http://127.0.0.1:4000"; -async fn serve_dir (mut dir: ReadDir) -> http_serde::Response { - let (tx, rx) = channel (2); - - tokio::spawn (async move { - let mut tx = tx; - - tx.send (Ok::<_, Infallible> (String::from ("").into_bytes ())).await.unwrap (); - }); - - let mut response = http_serde::Response::default (); - - response.header ("content-type".into (), String::from ("text/html").into_bytes ()); - response.body (Body::wrap_stream (rx)); - - response -} - -async fn serve_file ( - mut f: File, - should_send_body: bool, - range_start: Option , - range_end: Option -) -> http_serde::Response { - let (tx, rx) = channel (2); - let body = if should_send_body { - Some (Body::wrap_stream (rx)) - } - else { - None - }; - - let file_md = f.metadata ().await.unwrap (); - let file_len = file_md.len (); - - let start = range_start.unwrap_or (0); - let end = range_end.unwrap_or (file_len); - - let start = max (0, min (start, file_len)); - let end = max (0, min (end, file_len)); - - f.seek (SeekFrom::Start (start)).await.unwrap (); - - println! ("Serving range {}-{}", start, end); - - if should_send_body { - tokio::spawn (async move { - //println! ("Opening file {:?}", path); - - let mut tx = tx; - //let mut bytes_sent = 0; - let mut bytes_left = end - start; - - loop { - let mut buffer = vec! [0u8; 65_536]; - let bytes_read: u64 = f.read (&mut buffer).await.unwrap ().try_into ().unwrap (); - - let bytes_read = min (bytes_left, bytes_read); - - buffer.truncate (bytes_read.try_into ().unwrap ()); - - if bytes_read == 0 { - break; - } - - if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { - break; - } - - bytes_left -= bytes_read; - if bytes_left == 0 { - break; - } - - //bytes_sent += bytes_read; - //println! ("Sent {} bytes", bytes_sent); - - //delay_for (Duration::from_millis (50)).await; - } - }); - } - - let mut response = http_serde::Response::default (); - - response.header (String::from ("accept-ranges"), b"bytes".to_vec ()); - - if range_start.is_none () && range_end.is_none () { - response.status_code (http_serde::StatusCode::Ok); - response.header (String::from ("content-length"), end.to_string ().into_bytes ()); - } - else { - response.status_code (http_serde::StatusCode::PartialContent); - response.header (String::from ("content-range"), format! ("bytes {}-{}/{}", start, end - 1, end).into_bytes ()); - } - - if let Some (body) = body { - response.body (body); - } - - response -} - -async fn serve_error ( - status_code: http_serde::StatusCode, - msg: String -) --> http_serde::Response -{ - let mut resp = http_serde::Response::default (); - resp.status_code (status_code) - .body (msg.into ()); - resp -} - -async fn serve_all ( - parts: http_serde::RequestParts -) --> http_serde::Response -{ - println! ("Client requested {}", parts.uri); - let mut range_start = None; - let mut range_end = None; - - for (k, v) in parts.headers.iter () { - let v = std::str::from_utf8 (v).unwrap (); - //println! ("{}: {}", k, v); - - if k == "range" { - let (start, end) = parse_range_header (v); - range_start = start; - range_end = end; - } - } - - let should_send_body = match &parts.method { - http_serde::Method::Get => true, - _ => false, - }; - - use percent_encoding::*; - - let encoded_path = &parts.uri [1..]; - let path = percent_decode (encoded_path.as_bytes ()).decode_utf8 ().unwrap (); - - let mut full_path = PathBuf::from ("/home/user"); - full_path.push (&*path); - - if let Ok (dir) = read_dir (&full_path).await { - serve_dir (dir).await - } - else if let Ok (file) = File::open (&full_path).await { - serve_file ( - file, - should_send_body, - range_start, - range_end - ).await - } - else { - serve_error (http_serde::StatusCode::NotFound, "404 Not Found".into ()).await - } -} - async fn handle_req_resp ( client: &Client, req_resp: reqwest::Response @@ -269,8 +27,6 @@ async fn handle_req_resp ( return Ok (()); } - println! ("Step 3"); - let body = req_resp.bytes ().await?; let wrapped_req: http_serde::WrappedRequest = match rmp_serde::from_read_ref (&body) { @@ -280,7 +36,7 @@ async fn handle_req_resp ( let (req_id, parts) = (wrapped_req.id, wrapped_req.req); - let response = serve_all (parts).await; + let response = ptth::file_server::serve_all (parts).await; let mut resp_req = client .post (&format! ("{}/http_response/{}", SERVER_URL, req_id)) @@ -323,8 +79,8 @@ async fn main () -> Result <(), Box > { }, }; - // Spawn another task for each request so we can immediately listen - // for the next connection + // Spawn another task for each request so we can + // immediately listen for the next connection let client = client.clone (); tokio::spawn (async move { diff --git a/src/file_server.rs b/src/file_server.rs new file mode 100644 index 0000000..b76e439 --- /dev/null +++ b/src/file_server.rs @@ -0,0 +1,254 @@ +// Static file server that can plug into the PTTH reverse server + +use std::{ + cmp::{min, max}, + convert::{Infallible, TryInto}, + io::SeekFrom, + path::PathBuf, +}; + +// file_server shouldn't depend on reqwest, but for now it +// does depend on their Body struct +use reqwest::Body; + +use tokio::{ + fs::{ + File, + read_dir, + ReadDir, + }, + io::AsyncReadExt, + sync::mpsc::{ + channel, + }, +}; + +use regex::Regex; + +use crate::http_serde; + +fn parse_range_header (range_str: &str) -> (Option , Option ) { + use lazy_static::*; + + lazy_static! { + static ref RE: Regex = Regex::new (r"^bytes=(\d*)-(\d*)$").expect ("Couldn't compile regex for Range header"); + } + + println! ("{}", range_str); + + let caps = match RE.captures (range_str) { + Some (x) => x, + _ => return (None, None), + }; + let start = caps.get (1).map (|x| x.as_str ()); + let end = caps.get (2).map (|x| x.as_str ()); + + let start = start.map (|x| u64::from_str_radix (x, 10).ok ()).flatten (); + let end = end.map (|x| u64::from_str_radix (x, 10).ok ()).flatten (); + + (start, end) +} + +async fn serve_dir (mut dir: ReadDir) -> http_serde::Response { + let (tx, rx) = channel (2); + + tokio::spawn (async move { + let mut tx = tx; + + tx.send (Ok::<_, Infallible> (String::from ("
    ").into_bytes ())).await.unwrap (); + + while let Ok (entry) = dir.next_entry ().await { + let entry: tokio::fs::DirEntry = match entry { + Some (x) => x, + None => break, + }; + + let trailing_slash = match entry.file_type ().await { + Ok (t) => if t.is_dir () { + "/" + } + else { + "" + }, + Err (_) => "", + }; + + let file_name = match entry.file_name ().into_string () { + Ok (x) => x, + Err (_) => { + let s = format! ("
  • (file_name error)
  • \n"); + if tx.send (Ok::<_, Infallible> (s.into_bytes ())).await.is_err () + { + break; + } + continue; + } + }; + + use std::borrow::Cow; + use percent_encoding::*; + + let percent_file_name: Cow = utf8_percent_encode (&file_name, CONTROLS).into (); + + let s = format! ("
  • {}{}
  • \n", percent_file_name, trailing_slash, file_name, trailing_slash); + if tx.send (Ok::<_, Infallible> (s.into_bytes ())).await.is_err () + { + break; + } + } + + tx.send (Ok::<_, Infallible> (String::from ("
").into_bytes ())).await.unwrap (); + }); + + let mut response = http_serde::Response::default (); + + response.header ("content-type".into (), String::from ("text/html").into_bytes ()); + response.body (Body::wrap_stream (rx)); + + response +} + +async fn serve_file ( + mut f: File, + should_send_body: bool, + range_start: Option , + range_end: Option +) -> http_serde::Response { + let (tx, rx) = channel (2); + let body = if should_send_body { + Some (Body::wrap_stream (rx)) + } + else { + None + }; + + let file_md = f.metadata ().await.unwrap (); + let file_len = file_md.len (); + + let start = range_start.unwrap_or (0); + let end = range_end.unwrap_or (file_len); + + let start = max (0, min (start, file_len)); + let end = max (0, min (end, file_len)); + + f.seek (SeekFrom::Start (start)).await.unwrap (); + + println! ("Serving range {}-{}", start, end); + + if should_send_body { + tokio::spawn (async move { + //println! ("Opening file {:?}", path); + + let mut tx = tx; + //let mut bytes_sent = 0; + let mut bytes_left = end - start; + + loop { + let mut buffer = vec! [0u8; 65_536]; + let bytes_read: u64 = f.read (&mut buffer).await.unwrap ().try_into ().unwrap (); + + let bytes_read = min (bytes_left, bytes_read); + + buffer.truncate (bytes_read.try_into ().unwrap ()); + + if bytes_read == 0 { + break; + } + + if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { + break; + } + + bytes_left -= bytes_read; + if bytes_left == 0 { + break; + } + + //bytes_sent += bytes_read; + //println! ("Sent {} bytes", bytes_sent); + + //delay_for (Duration::from_millis (50)).await; + } + }); + } + + let mut response = http_serde::Response::default (); + + response.header (String::from ("accept-ranges"), b"bytes".to_vec ()); + + if range_start.is_none () && range_end.is_none () { + response.status_code (http_serde::StatusCode::Ok); + response.header (String::from ("content-length"), end.to_string ().into_bytes ()); + } + else { + response.status_code (http_serde::StatusCode::PartialContent); + response.header (String::from ("content-range"), format! ("bytes {}-{}/{}", start, end - 1, end).into_bytes ()); + } + + if let Some (body) = body { + response.body (body); + } + + response +} + +async fn serve_error ( + status_code: http_serde::StatusCode, + msg: String +) +-> http_serde::Response +{ + let mut resp = http_serde::Response::default (); + resp.status_code (status_code) + .body (msg.into ()); + resp +} + +pub async fn serve_all ( + parts: http_serde::RequestParts +) +-> http_serde::Response +{ + println! ("Client requested {}", parts.uri); + let mut range_start = None; + let mut range_end = None; + + for (k, v) in parts.headers.iter () { + let v = std::str::from_utf8 (v).unwrap (); + //println! ("{}: {}", k, v); + + if k == "range" { + let (start, end) = parse_range_header (v); + range_start = start; + range_end = end; + } + } + + let should_send_body = match &parts.method { + http_serde::Method::Get => true, + _ => false, + }; + + use percent_encoding::*; + + let encoded_path = &parts.uri [1..]; + let path = percent_decode (encoded_path.as_bytes ()).decode_utf8 ().unwrap (); + + let mut full_path = PathBuf::from ("/home/user"); + full_path.push (&*path); + + if let Ok (dir) = read_dir (&full_path).await { + serve_dir (dir).await + } + else if let Ok (file) = File::open (&full_path).await { + serve_file ( + file, + should_send_body, + range_start, + range_end + ).await + } + else { + serve_error (http_serde::StatusCode::NotFound, "404 Not Found".into ()).await + } +} diff --git a/src/lib.rs b/src/lib.rs index 2f2290f..8205e3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,2 +1,3 @@ +pub mod file_server; pub mod http_serde; pub mod watcher;