From 498d69eeb93dcf687bddea049b797546df27f889 Mon Sep 17 00:00:00 2001 From: _ <> Date: Sat, 20 Feb 2021 17:28:39 +0000 Subject: [PATCH] update: add throttled endless random garbage --- crates/ptth_relay/src/lib.rs | 55 +++++++++++++++++++++++++++++------- handlebars/relay/debug.hbs | 5 ++-- 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/crates/ptth_relay/src/lib.rs b/crates/ptth_relay/src/lib.rs index 3558370..d699512 100644 --- a/crates/ptth_relay/src/lib.rs +++ b/crates/ptth_relay/src/lib.rs @@ -303,22 +303,54 @@ async fn handle_server_list ( Ok (ok_reply (s)?) } -async fn handle_endless_source (gib: usize) -> Result , http::Error> { +async fn handle_endless_source (gib: usize, throttle: Option ) +-> Result , http::Error> +{ use futures::stream::StreamExt; + use tokio::sync::mpsc; - let random_block = { - use rand::RngCore; + let block_bytes = 64 * 1024; + let num_blocks = (1024 * 1024 * 1024 / block_bytes) * gib; + + let (tx, rx) = mpsc::channel (1); + + tokio::spawn (async move { + let mut tx = tx; - let mut rng = rand::thread_rng (); - let mut block = vec! [0u8; 64 * 1024]; - rng.fill_bytes (&mut block); - block - }; + let random_block = { + use rand::RngCore; + + let mut rng = rand::thread_rng (); + let mut block = vec! [0u8; 64 * 1024]; + rng.fill_bytes (&mut block); + block + }; + + let mut interval = tokio::time::interval (Duration::from_millis (1000)); + let mut blocks_sent = 0; + + while blocks_sent < num_blocks { + if throttle.is_some () { + interval.tick ().await; + } + + for _ in 0..throttle.unwrap_or (1) { + let item = Ok::<_, Infallible> (random_block.clone ()); + if let Err (_) = tx.send (item).await { + debug! ("Endless source dropped"); + return; + } + blocks_sent += 1; + } + } + + debug! ("Endless source ended"); + }); Response::builder () .status (StatusCode::OK) .header ("content-type", "application/octet-stream") - .body (Body::wrap_stream (futures::stream::repeat (Ok::<_, Infallible> (random_block)).take (gib * 1024 * 1024 / 64))) + .body (Body::wrap_stream (rx)) } #[instrument (level = "trace", skip (req, state, handlebars))] @@ -379,7 +411,10 @@ async fn handle_all ( Ok (ok_reply (s)?) } else if rest == "endless_source" { - Ok (handle_endless_source (1).await?) + Ok (handle_endless_source (1, None).await?) + } + else if rest == "endless_source_throttled" { + Ok (handle_endless_source (1, Some (1024 / 64)).await?) } else { Ok (error_reply (StatusCode::OK, "Can't route URL")?) diff --git a/handlebars/relay/debug.hbs b/handlebars/relay/debug.hbs index 0b0a8a0..2a79e1e 100644 --- a/handlebars/relay/debug.hbs +++ b/handlebars/relay/debug.hbs @@ -41,12 +41,13 @@ AIABAACAAQAAgAEAAIABAACAAQAAgAEAAIABAACAAQAA" rel="icon" type="image/x-icon" />

Debugging tools (relay)

- Lorem ipsum dolor set amet

- 1 GiB random garbage data source +

+1 GiB random garbage data source (Throttled to 1 MiB/s) +