diff --git a/src/bin/server.rs b/src/bin/server.rs index 6ee5ae4..0423bac 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -50,6 +50,105 @@ fn parse_range_header (range_str: &str) -> (Option , Option ) { (start, end) } +async fn serve_file ( + client: Arc , + mut f: File, + req_id: String, + should_send_body: bool, + range_start: Option , + range_end: Option +) { + let (tx, rx) = channel (2); + let body = if should_send_body { + Some (Body::wrap_stream (rx)) + } + else { + None + }; + + let file_md = f.metadata ().await.unwrap (); + let file_len = file_md.len (); + + let start = range_start.unwrap_or (0); + let end = range_end.unwrap_or (file_len); + + let start = max (0, min (start, file_len)); + let end = max (0, min (end, file_len)); + + f.seek (SeekFrom::Start (start)).await.unwrap (); + + println! ("Serving range {}-{}", start, end); + + if should_send_body { + tokio::spawn (async move { + //println! ("Opening file {:?}", path); + + let mut tx = tx; + //let mut bytes_sent = 0; + let mut bytes_left = end - start; + + loop { + let mut buffer = vec! [0u8; 65_536]; + let bytes_read: u64 = f.read (&mut buffer).await.unwrap ().try_into ().unwrap (); + + let bytes_read = min (bytes_left, bytes_read); + + buffer.truncate (bytes_read.try_into ().unwrap ()); + + if bytes_read == 0 { + break; + } + + if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { + break; + } + + bytes_left -= bytes_read; + if bytes_left == 0 { + break; + } + + //bytes_sent += bytes_read; + //println! ("Sent {} bytes", bytes_sent); + + delay_for (Duration::from_millis (50)).await; + } + }); + } + + let mut headers: HashMap > = Default::default (); + headers.insert (String::from ("accept-ranges"), b"bytes".to_vec ()); + + let status_code; + + if range_start.is_none () && range_end.is_none () { + headers.insert (String::from ("content-length"), end.to_string ().into_bytes ()); + status_code = http_serde::StatusCode::Ok; + } + else { + headers.insert (String::from ("content-range"), format! ("bytes {}-{}/{}", start, end - 1, end).into_bytes ()); + status_code = http_serde::StatusCode::PartialContent; + } + + let resp_parts = http_serde::ResponseParts { + status_code, + headers, + }; + + let mut resp_req = client + .post (&format! ("http://127.0.0.1:4000/http_response/{}", req_id)) + .header ("X-PTTH-2LJYXWC4", base64::encode (rmp_serde::to_vec (&resp_parts).unwrap ())); + + if let Some (body) = body { + resp_req = resp_req.body (body); + } + + //println! ("Step 6"); + if let Err (e) = resp_req.send ().await { + println! ("Err: {:?}", e); + } +} + #[tokio::main] async fn main () -> Result <(), Box > { let client = Arc::new (Client::new ()); @@ -120,99 +219,18 @@ async fn main () -> Result <(), Box > { //println! ("Step 6"); let client = client.clone (); tokio::spawn (async move { - let (tx, rx) = channel (2); - let body = if should_send_body { - Some (Body::wrap_stream (rx)) - } - else { - None - }; - let mut path = PathBuf::from ("/home/user"); - path.push (&uri [1..]); - let mut f = File::open (path).await.unwrap (); + path.push (&uri [1..]); + let f = File::open (path).await.unwrap (); - let file_md = f.metadata ().await.unwrap (); - let file_len = file_md.len (); - - let start = range_start.unwrap_or (0); - let end = range_end.unwrap_or (file_len); - - let start = max (0, min (start, file_len)); - let end = max (0, min (end, file_len)); - - f.seek (SeekFrom::Start (start)).await.unwrap (); - - println! ("Serving range {}-{}", start, end); - - if should_send_body { - tokio::spawn (async move { - //println! ("Opening file {:?}", path); - - let mut tx = tx; - //let mut bytes_sent = 0; - let mut bytes_left = end - start; - - loop { - let mut buffer = vec! [0u8; 65_536]; - let bytes_read: u64 = f.read (&mut buffer).await.unwrap ().try_into ().unwrap (); - - let bytes_read = min (bytes_left, bytes_read); - - buffer.truncate (bytes_read.try_into ().unwrap ()); - - if bytes_read == 0 { - break; - } - - if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { - break; - } - - bytes_left -= bytes_read; - if bytes_left == 0 { - break; - } - - //bytes_sent += bytes_read; - //println! ("Sent {} bytes", bytes_sent); - - delay_for (Duration::from_millis (50)).await; - } - }); - } - - let mut headers: HashMap > = Default::default (); - headers.insert (String::from ("accept-ranges"), b"bytes".to_vec ()); - - let status_code; - - if range_start.is_none () && range_end.is_none () { - headers.insert (String::from ("content-length"), end.to_string ().into_bytes ()); - status_code = http_serde::StatusCode::Ok; - } - else { - headers.insert (String::from ("content-range"), format! ("bytes {}-{}/{}", start, end - 1, end).into_bytes ()); - status_code = http_serde::StatusCode::PartialContent; - } - - let resp_parts = http_serde::ResponseParts { - status_code, - headers, - }; - - let mut resp_req = client - .post (&format! ("http://127.0.0.1:4000/http_response/{}", req_id)) - .header ("X-PTTH-2LJYXWC4", base64::encode (rmp_serde::to_vec (&resp_parts).unwrap ())); - - if let Some (body) = body { - resp_req = resp_req.body (body); - } - - //println! ("Step 6"); - if let Err (e) = resp_req.send ().await { - println! ("Err: {:?}", e); - } + serve_file ( + client, + f, + req_id, + should_send_body, + range_start, + range_end + ).await; }); } } diff --git a/todo.md b/todo.md index 9385a62..a35e4b9 100644 --- a/todo.md +++ b/todo.md @@ -1,2 +1,4 @@ +- Index directories - Set up tokens or something so clients can't trivially impersonate servers +- Fix possible timing gap when refreshing http_listen (Just have client wait a few seconds?)