From 6f31c93c1f25e045033754bc0a2c2604e9f683ea Mon Sep 17 00:00:00 2001 From: _ <> Date: Thu, 29 Oct 2020 12:45:35 +0000 Subject: [PATCH] :recycle: --- src/bin/server.rs | 235 ++++++++++++++++++++++++---------------------- todo.md | 3 + 2 files changed, 128 insertions(+), 110 deletions(-) diff --git a/src/bin/server.rs b/src/bin/server.rs index ab2662c..7232851 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -50,8 +50,30 @@ fn parse_range_header (range_str: &str) -> (Option , Option ) { (start, end) } +const SERVER_URL: &str = "http://127.0.0.1:4000"; + +async fn serve_response ( + client: &Client, + req_id: String, + resp_parts: http_serde::ResponseParts, + body: Option +) { + let mut resp_req = client + .post (&format! ("{}/http_response/{}", SERVER_URL, req_id)) + .header ("X-PTTH-2LJYXWC4", base64::encode (rmp_serde::to_vec (&resp_parts).unwrap ())); + + if let Some (body) = body { + resp_req = resp_req.body (body); + } + + //println! ("Step 6"); + if let Err (e) = resp_req.send ().await { + println! ("Err: {:?}", e); + } +} + async fn serve_file ( - client: Arc , + client: &Client, mut f: File, req_id: String, should_send_body: bool, @@ -80,40 +102,40 @@ async fn serve_file ( println! ("Serving range {}-{}", start, end); if should_send_body { - tokio::spawn (async move { - //println! ("Opening file {:?}", path); - - let mut tx = tx; - //let mut bytes_sent = 0; - let mut bytes_left = end - start; - - loop { - let mut buffer = vec! [0u8; 65_536]; - let bytes_read: u64 = f.read (&mut buffer).await.unwrap ().try_into ().unwrap (); + tokio::spawn (async move { + //println! ("Opening file {:?}", path); - let bytes_read = min (bytes_left, bytes_read); + let mut tx = tx; + //let mut bytes_sent = 0; + let mut bytes_left = end - start; - buffer.truncate (bytes_read.try_into ().unwrap ()); - - if bytes_read == 0 { - break; + loop { + let mut buffer = vec! [0u8; 65_536]; + let bytes_read: u64 = f.read (&mut buffer).await.unwrap ().try_into ().unwrap (); + + let bytes_read = min (bytes_left, bytes_read); + + buffer.truncate (bytes_read.try_into ().unwrap ()); + + if bytes_read == 0 { + break; + } + + if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { + break; + } + + bytes_left -= bytes_read; + if bytes_left == 0 { + break; + } + + //bytes_sent += bytes_read; + //println! ("Sent {} bytes", bytes_sent); + + delay_for (Duration::from_millis (50)).await; } - - if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { - break; - } - - bytes_left -= bytes_read; - if bytes_left == 0 { - break; - } - - //bytes_sent += bytes_read; - //println! ("Sent {} bytes", bytes_sent); - - delay_for (Duration::from_millis (50)).await; - } - }); + }); } let mut headers: HashMap > = Default::default (); @@ -135,22 +157,11 @@ async fn serve_file ( headers, }; - let mut resp_req = client - .post (&format! ("http://127.0.0.1:4000/http_response/{}", req_id)) - .header ("X-PTTH-2LJYXWC4", base64::encode (rmp_serde::to_vec (&resp_parts).unwrap ())); - - if let Some (body) = body { - resp_req = resp_req.body (body); - } - - //println! ("Step 6"); - if let Err (e) = resp_req.send ().await { - println! ("Err: {:?}", e); - } + serve_response (client, req_id, resp_parts, body).await; } async fn serve_error ( - client: Arc , + client: &Client, req_id: String, status_code: ptth::http_serde::StatusCode ) { @@ -161,16 +172,75 @@ async fn serve_error ( headers, }; - let resp_req = client - .post (&format! ("http://127.0.0.1:4000/http_response/{}", req_id)) - .header ("X-PTTH-2LJYXWC4", base64::encode (rmp_serde::to_vec (&resp_parts).unwrap ())) - .body ("404 Not Found") - ; + serve_response (client, req_id, resp_parts, Some ("404 Not Found".into ())).await; +} + +async fn handle_all ( + client: Arc , + req_resp: reqwest::Response +) -> Result <(), Box > { + //println! ("Step 1"); + + if req_resp.status () != StatusCode::OK { + // TODO: Error handling + return Ok (()); + } + + println! ("Step 3"); + + let body = req_resp.bytes ().await?; + let parts: http_serde::RequestParts = match rmp_serde::from_read_ref (&body) + { + Ok (x) => x, + _ => return Ok (()), + }; + + let (req_id, uri) = (parts.id, parts.uri); + + println! ("Client requested {}", uri); + let mut range_start = None; + let mut range_end = None; + + for (k, v) in parts.headers.iter () { + let v = std::str::from_utf8 (v).unwrap (); + //println! ("{}: {}", k, v); + + if k == "range" { + let (start, end) = parse_range_header (v); + range_start = start; + range_end = end; + } + } + + let should_send_body = match &parts.method { + http_serde::Method::Get => true, + _ => false, + }; //println! ("Step 6"); - if let Err (e) = resp_req.send ().await { - println! ("Err: {:?}", e); + + let mut path = PathBuf::from ("/home/user"); + path.push (&uri [1..]); + + match File::open (path).await { + Ok (f) => serve_file ( + &client, + f, + req_id, + should_send_body, + range_start, + range_end + ).await, + Err (_) => { + serve_error ( + &client, + req_id, + ptth::http_serde::StatusCode::NotFound + ).await; + }, } + + Ok (()) } #[tokio::main] @@ -184,15 +254,8 @@ async fn main () -> Result <(), Box > { delay_for (Duration::from_millis (backoff_delay)).await; } - let _uri = Uri::builder () - .scheme ("http") - .authority ("127.0.0.1:4000") - .path_and_query ("/http_listen/alien_wildlands") - .build ().unwrap (); - let req_req = client.get ("http://127.0.0.1:4000/http_listen/alien_wildlands"); - //println! ("Step 1"); let req_resp = match req_req.send ().await { Err (e) => { println! ("Err: {:?}", e); @@ -205,60 +268,12 @@ async fn main () -> Result <(), Box > { }, }; - if req_resp.status () != StatusCode::OK { - continue; - } + // Spawn another task for each request so we can immediately listen + // for the next connection - println! ("Step 3"); - - let body = req_resp.bytes ().await?; - let parts: http_serde::RequestParts = match rmp_serde::from_read_ref (&body) - { - Ok (x) => x, - _ => continue, - }; - - let (req_id, uri) = (parts.id, parts.uri); - - println! ("Client requested {}", uri); - let mut range_start = None; - let mut range_end = None; - - for (k, v) in parts.headers.iter () { - let v = std::str::from_utf8 (v).unwrap (); - //println! ("{}: {}", k, v); - - if k == "range" { - let (start, end) = parse_range_header (v); - range_start = start; - range_end = end; - } - } - - let should_send_body = match &parts.method { - http_serde::Method::Get => true, - _ => false, - }; - - //println! ("Step 6"); let client = client.clone (); tokio::spawn (async move { - let mut path = PathBuf::from ("/home/user"); - path.push (&uri [1..]); - - match File::open (path).await { - Ok (f) => serve_file ( - client, - f, - req_id, - should_send_body, - range_start, - range_end - ).await, - Err (_) => { - serve_error (client, req_id, ptth::http_serde::StatusCode::NotFound).await; - }, - } + handle_all (client, req_resp).await; }); } } diff --git a/todo.md b/todo.md index a35e4b9..09d38d2 100644 --- a/todo.md +++ b/todo.md @@ -2,3 +2,6 @@ - Set up tokens or something so clients can't trivially impersonate servers - Fix possible timing gap when refreshing http_listen (Just have client wait a few seconds?) +- Parameter for server URL +- Parameter for static file serve path +- Error handling