From 841987142882df300547d3efb99d2550eb33970f Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Wed, 28 Apr 2021 19:50:40 -0500 Subject: [PATCH] :construction: wip: testing a hypothesis about collecting metrics --- Cargo.lock | 14 ++++ crates/metrics_test/Cargo.toml | 11 +++ crates/metrics_test/src/main.rs | 130 ++++++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+) create mode 100644 crates/metrics_test/Cargo.toml create mode 100644 crates/metrics_test/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index fe92ba9..1f502c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -425,6 +425,12 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "devtimer" +version = "4.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907339959a92f6b98846570500c0a567c9aecbb3871cef00561eb5d20d47b7c1" + [[package]] name = "digest" version = "0.8.1" @@ -1053,6 +1059,14 @@ version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" +[[package]] +name = "metrics_test" +version = "0.1.0" +dependencies = [ + "devtimer", + "tokio", +] + [[package]] name = "mime" version = "0.3.16" diff --git a/crates/metrics_test/Cargo.toml b/crates/metrics_test/Cargo.toml new file mode 100644 index 0000000..fef35f8 --- /dev/null +++ b/crates/metrics_test/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "metrics_test" +version = "0.1.0" +authors = ["_ <_@_>"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +devtimer = "4.0.1" +tokio = { version = "1.4.0", features = ["full"] } diff --git a/crates/metrics_test/src/main.rs b/crates/metrics_test/src/main.rs new file mode 100644 index 0000000..abe3bf7 --- /dev/null +++ b/crates/metrics_test/src/main.rs @@ -0,0 +1,130 @@ +use std::{ + sync::{ + Arc, + }, + time::Duration, +}; + +use devtimer::DevTime; +use tokio::{ + sync::{ + RwLock, + mpsc, + }, +}; + +#[tokio::main] +async fn main () { + let num_iters = 1_000_000; + let mut timer = DevTime::new_simple (); + + let metric_lock = MetricLock::default (); + + timer.start (); + for _ in 0..num_iters { + metric_lock.add (10).await; + } + timer.stop (); + println!("The operation took: {} milliseconds", timer.time_in_millis().unwrap()); + + assert_eq! (metric_lock.read ().await, Counter { + count: num_iters, + time: 10 * num_iters, + }); + + // This is about 4 times slower with a single thread sending + // events. Also they're both faster enough that I don't care. + + let metric_channel = MetricChannel::new (); + + timer.start (); + for _ in 0..num_iters { + metric_channel.add (10_u64).await; + } + timer.stop (); + println!("The operation took: {} milliseconds", timer.time_in_millis().unwrap()); + + tokio::time::sleep (Duration::from_millis (100)).await; + + assert_eq! (metric_channel.read ().await, Counter { + count: num_iters, + time: 10 * num_iters, + }); +} + +#[derive (Default)] +struct MetricLock { + counter: RwLock , +} + +impl MetricLock { + async fn add (&self, time: u64) { + let mut guard = self.counter.write ().await; + guard.add (time); + } + + async fn read (&self) -> Counter { + let guard = self.counter.read ().await; + *guard + } +} + +struct MetricChannel { + counter: RwLock , + sender: mpsc::Sender , +} + +impl MetricChannel { + fn new () -> Arc { + let counter = Default::default (); + let (sender, mut receiver) = mpsc::channel (64); + + let mc = Self { + counter, + sender, + }; + let mc = Arc::new (mc); + + { + let mc = Arc::clone (&mc); + tokio::spawn (async move { + let mut unflushed = Counter::default (); + + while let Some (time) = receiver.recv ().await { + unflushed.add (time); + + if unflushed.count >= 10 { + let mut guard = mc.counter.write ().await; + guard.count += unflushed.count; + guard.time += unflushed.time; + unflushed = Counter::default (); + } + } + }); + } + + mc + } + + async fn add (&self, time: u64) { + self.sender.send (time).await.unwrap () + } + + async fn read (&self) -> Counter { + let guard = self.counter.read ().await; + *guard + } +} + +#[derive (Copy, Clone, Debug, Default, PartialEq)] +struct Counter { + count: u64, + time: u64, +} + +impl Counter { + fn add (&mut self, time: u64) { + self.count += 1; + self.time += time; + } +}