add --throttle-upload option

main
_ 2021-03-21 15:43:15 +00:00
parent 235e134cb2
commit c1118971b0
2 changed files with 33 additions and 6 deletions

View File

@ -20,7 +20,7 @@ struct Opt {
auto_gen_key: bool, auto_gen_key: bool,
#[structopt (long)] #[structopt (long)]
simulate_slow_upload: bool, throttle_upload: bool,
#[structopt (long)] #[structopt (long)]
file_server_root: Option <PathBuf>, file_server_root: Option <PathBuf>,
@ -90,7 +90,7 @@ async fn main () -> Result <(), anyhow::Error> {
api_key: config_file.api_key, api_key: config_file.api_key,
relay_url: opt.relay_url.or (config_file.relay_url).expect ("`relay_url` must be provided in command line or config file"), relay_url: opt.relay_url.or (config_file.relay_url).expect ("`relay_url` must be provided in command line or config file"),
file_server_root: opt.file_server_root.or (config_file.file_server_root), file_server_root: opt.file_server_root.or (config_file.file_server_root),
simulate_slow_upload: opt.simulate_slow_upload, throttle_upload: opt.throttle_upload,
}; };
if opt.print_tripcode { if opt.print_tripcode {

View File

@ -17,6 +17,7 @@ use futures::FutureExt;
use reqwest::Client; use reqwest::Client;
use tokio::{ use tokio::{
sync::{ sync::{
mpsc,
oneshot, oneshot,
}, },
}; };
@ -86,9 +87,33 @@ async fn handle_one_req (
if let Some (length) = response.content_length { if let Some (length) = response.content_length {
resp_req = resp_req.header ("Content-Length", length.to_string ()); resp_req = resp_req.header ("Content-Length", length.to_string ());
} }
if let Some (body) = response.body { if let Some (mut body) = response.body {
if state.config.throttle_upload {
// Spawn another task to throttle the chunks
let (tx, rx) = mpsc::channel (1);
tokio::spawn (async move {
while let Some (chunk) = body.recv ().await {
let len = chunk.as_ref ().map (|x| x.len ()).ok ();
tx.send (chunk).await?;
if let Some (len) = len {
// debug! ("Throttling {} byte chunk", len);
}
tokio::time::sleep (Duration::from_millis (1000)).await;
}
Ok::<_, anyhow::Error> (())
});
resp_req = resp_req.body (reqwest::Body::wrap_stream (ReceiverStream::new (rx)));
}
else {
resp_req = resp_req.body (reqwest::Body::wrap_stream (ReceiverStream::new (body))); resp_req = resp_req.body (reqwest::Body::wrap_stream (ReceiverStream::new (body)));
} }
}
let req = resp_req.build ().map_err (ServerError::Step5Responding)?; let req = resp_req.build ().map_err (ServerError::Step5Responding)?;
@ -155,7 +180,7 @@ pub struct ConfigFile {
pub api_key: String, pub api_key: String,
pub relay_url: String, pub relay_url: String,
pub file_server_root: Option <PathBuf>, pub file_server_root: Option <PathBuf>,
pub simulate_slow_upload: bool, pub throttle_upload: bool,
} }
impl ConfigFile { impl ConfigFile {
@ -166,7 +191,7 @@ impl ConfigFile {
api_key, api_key,
relay_url, relay_url,
file_server_root: None, file_server_root: None,
simulate_slow_upload: false, throttle_upload: false,
} }
} }
@ -179,6 +204,7 @@ impl ConfigFile {
#[derive (Default)] #[derive (Default)]
pub struct Config { pub struct Config {
pub relay_url: String, pub relay_url: String,
pub throttle_upload: bool,
} }
pub async fn run_server ( pub async fn run_server (
@ -233,6 +259,7 @@ pub async fn run_server (
}, },
config: Config { config: Config {
relay_url: config_file.relay_url, relay_url: config_file.relay_url,
throttle_upload: config_file.throttle_upload,
}, },
client, client,
}); });