131 lines
2.4 KiB
Rust
131 lines
2.4 KiB
Rust
|
use std::{
|
||
|
sync::{
|
||
|
Arc,
|
||
|
},
|
||
|
time::Duration,
|
||
|
};
|
||
|
|
||
|
use devtimer::DevTime;
|
||
|
use tokio::{
|
||
|
sync::{
|
||
|
RwLock,
|
||
|
mpsc,
|
||
|
},
|
||
|
};
|
||
|
|
||
|
#[tokio::main]
|
||
|
async fn main () {
|
||
|
let num_iters = 1_000_000;
|
||
|
let mut timer = DevTime::new_simple ();
|
||
|
|
||
|
let metric_lock = MetricLock::default ();
|
||
|
|
||
|
timer.start ();
|
||
|
for _ in 0..num_iters {
|
||
|
metric_lock.add (10).await;
|
||
|
}
|
||
|
timer.stop ();
|
||
|
println!("The operation took: {} milliseconds", timer.time_in_millis().unwrap());
|
||
|
|
||
|
assert_eq! (metric_lock.read ().await, Counter {
|
||
|
count: num_iters,
|
||
|
time: 10 * num_iters,
|
||
|
});
|
||
|
|
||
|
// This is about 4 times slower with a single thread sending
|
||
|
// events. Also they're both faster enough that I don't care.
|
||
|
|
||
|
let metric_channel = MetricChannel::new ();
|
||
|
|
||
|
timer.start ();
|
||
|
for _ in 0..num_iters {
|
||
|
metric_channel.add (10_u64).await;
|
||
|
}
|
||
|
timer.stop ();
|
||
|
println!("The operation took: {} milliseconds", timer.time_in_millis().unwrap());
|
||
|
|
||
|
tokio::time::sleep (Duration::from_millis (100)).await;
|
||
|
|
||
|
assert_eq! (metric_channel.read ().await, Counter {
|
||
|
count: num_iters,
|
||
|
time: 10 * num_iters,
|
||
|
});
|
||
|
}
|
||
|
|
||
|
#[derive (Default)]
|
||
|
struct MetricLock {
|
||
|
counter: RwLock <Counter>,
|
||
|
}
|
||
|
|
||
|
impl MetricLock {
|
||
|
async fn add (&self, time: u64) {
|
||
|
let mut guard = self.counter.write ().await;
|
||
|
guard.add (time);
|
||
|
}
|
||
|
|
||
|
async fn read (&self) -> Counter {
|
||
|
let guard = self.counter.read ().await;
|
||
|
*guard
|
||
|
}
|
||
|
}
|
||
|
|
||
|
struct MetricChannel {
|
||
|
counter: RwLock <Counter>,
|
||
|
sender: mpsc::Sender <u64>,
|
||
|
}
|
||
|
|
||
|
impl MetricChannel {
|
||
|
fn new () -> Arc <Self> {
|
||
|
let counter = Default::default ();
|
||
|
let (sender, mut receiver) = mpsc::channel (64);
|
||
|
|
||
|
let mc = Self {
|
||
|
counter,
|
||
|
sender,
|
||
|
};
|
||
|
let mc = Arc::new (mc);
|
||
|
|
||
|
{
|
||
|
let mc = Arc::clone (&mc);
|
||
|
tokio::spawn (async move {
|
||
|
let mut unflushed = Counter::default ();
|
||
|
|
||
|
while let Some (time) = receiver.recv ().await {
|
||
|
unflushed.add (time);
|
||
|
|
||
|
if unflushed.count >= 10 {
|
||
|
let mut guard = mc.counter.write ().await;
|
||
|
guard.count += unflushed.count;
|
||
|
guard.time += unflushed.time;
|
||
|
unflushed = Counter::default ();
|
||
|
}
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
|
||
|
mc
|
||
|
}
|
||
|
|
||
|
async fn add (&self, time: u64) {
|
||
|
self.sender.send (time).await.unwrap ()
|
||
|
}
|
||
|
|
||
|
async fn read (&self) -> Counter {
|
||
|
let guard = self.counter.read ().await;
|
||
|
*guard
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[derive (Copy, Clone, Debug, Default, PartialEq)]
|
||
|
struct Counter {
|
||
|
count: u64,
|
||
|
time: u64,
|
||
|
}
|
||
|
|
||
|
impl Counter {
|
||
|
fn add (&mut self, time: u64) {
|
||
|
self.count += 1;
|
||
|
self.time += time;
|
||
|
}
|
||
|
}
|