diff --git a/crates/ptth_server/src/bin/ptth_server.rs b/crates/ptth_server/src/bin/ptth_server.rs index af946b0..fa61f20 100644 --- a/crates/ptth_server/src/bin/ptth_server.rs +++ b/crates/ptth_server/src/bin/ptth_server.rs @@ -20,7 +20,7 @@ struct Opt { auto_gen_key: bool, #[structopt (long)] - simulate_slow_upload: bool, + throttle_upload: bool, #[structopt (long)] file_server_root: Option , @@ -90,7 +90,7 @@ async fn main () -> Result <(), anyhow::Error> { api_key: config_file.api_key, relay_url: opt.relay_url.or (config_file.relay_url).expect ("`relay_url` must be provided in command line or config file"), file_server_root: opt.file_server_root.or (config_file.file_server_root), - simulate_slow_upload: opt.simulate_slow_upload, + throttle_upload: opt.throttle_upload, }; if opt.print_tripcode { diff --git a/crates/ptth_server/src/lib.rs b/crates/ptth_server/src/lib.rs index e3982c6..ae1da06 100644 --- a/crates/ptth_server/src/lib.rs +++ b/crates/ptth_server/src/lib.rs @@ -17,6 +17,7 @@ use futures::FutureExt; use reqwest::Client; use tokio::{ sync::{ + mpsc, oneshot, }, }; @@ -86,8 +87,32 @@ async fn handle_one_req ( if let Some (length) = response.content_length { resp_req = resp_req.header ("Content-Length", length.to_string ()); } - if let Some (body) = response.body { - resp_req = resp_req.body (reqwest::Body::wrap_stream (ReceiverStream::new (body))); + if let Some (mut body) = response.body { + if state.config.throttle_upload { + // Spawn another task to throttle the chunks + + let (tx, rx) = mpsc::channel (1); + + tokio::spawn (async move { + while let Some (chunk) = body.recv ().await { + let len = chunk.as_ref ().map (|x| x.len ()).ok (); + tx.send (chunk).await?; + + if let Some (len) = len { + // debug! ("Throttling {} byte chunk", len); + } + + tokio::time::sleep (Duration::from_millis (1000)).await; + } + + Ok::<_, anyhow::Error> (()) + }); + + resp_req = resp_req.body (reqwest::Body::wrap_stream (ReceiverStream::new (rx))); + } + else { + resp_req = resp_req.body (reqwest::Body::wrap_stream (ReceiverStream::new (body))); + } } let req = resp_req.build ().map_err (ServerError::Step5Responding)?; @@ -155,7 +180,7 @@ pub struct ConfigFile { pub api_key: String, pub relay_url: String, pub file_server_root: Option , - pub simulate_slow_upload: bool, + pub throttle_upload: bool, } impl ConfigFile { @@ -166,7 +191,7 @@ impl ConfigFile { api_key, relay_url, file_server_root: None, - simulate_slow_upload: false, + throttle_upload: false, } } @@ -179,6 +204,7 @@ impl ConfigFile { #[derive (Default)] pub struct Config { pub relay_url: String, + pub throttle_upload: bool, } pub async fn run_server ( @@ -233,6 +259,7 @@ pub async fn run_server ( }, config: Config { relay_url: config_file.relay_url, + throttle_upload: config_file.throttle_upload, }, client, });