diff --git a/Cargo.lock b/Cargo.lock index 6042424..5bdf8b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1662,6 +1662,7 @@ version = "0.1.0" dependencies = [ "base64 0.13.0", "thiserror", + "tokio", ] [[package]] diff --git a/crates/ptth_kv/Cargo.toml b/crates/ptth_kv/Cargo.toml index 9bceeaf..e3b8b2b 100644 --- a/crates/ptth_kv/Cargo.toml +++ b/crates/ptth_kv/Cargo.toml @@ -7,3 +7,4 @@ edition = "2018" [dependencies] base64 = "0.13.0" thiserror = "1.0.22" +tokio = { version = "0.2.22", features = ["full"] } diff --git a/crates/ptth_kv/src/main.rs b/crates/ptth_kv/src/main.rs index 95b6476..49709f3 100644 --- a/crates/ptth_kv/src/main.rs +++ b/crates/ptth_kv/src/main.rs @@ -4,12 +4,16 @@ use std::{ }, iter::FromIterator, sync::{ - Mutex, MutexGuard, PoisonError, }, }; +use tokio::{ + runtime::Runtime, + sync::Mutex, +}; + pub struct Store { status_dirs: HashMap , StatusKeyDirectory>, } @@ -59,22 +63,22 @@ impl Store { .collect () } - pub fn set (&self, name: &[u8], key: Vec , value: Vec ) + pub async fn set (&self, name: &[u8], key: Vec , value: Vec ) -> Result <(), Error> { let dir = self.status_dirs.get (name) .ok_or (Error::NoSuchKeyDir)?; - dir.set (key, value) + dir.set (key, value).await } - pub fn get_after (&self, name: &[u8], thresh: Option ) + pub async fn get_after (&self, name: &[u8], thresh: Option ) -> Result { let dir = self.status_dirs.get (name) .ok_or (Error::NoSuchKeyDir)?; - dir.get_after (thresh) + dir.get_after (thresh).await } } @@ -123,7 +127,7 @@ impl StatusKeyDirectory { } } - fn set (&self, key: Vec , value: Vec ) -> Result <(), Error> + async fn set (&self, key: Vec , value: Vec ) -> Result <(), Error> { if key.len () > self.quotas.max_key_bytes { return Err (Error::KeyTooLong); @@ -134,14 +138,14 @@ impl StatusKeyDirectory { } { - let mut guard = self.table.lock ()?; + let mut guard = self.table.lock ().await; guard.set (&self.quotas, key, value); } Ok (()) } - fn get_after (&self, thresh: Option ) -> Result { - let guard = self.table.lock ()?; + async fn get_after (&self, thresh: Option ) -> Result { + let guard = self.table.lock ().await; Ok (guard.get_after (thresh)) } } @@ -222,6 +226,8 @@ mod tests { #[test] fn store () { + let mut rt = Runtime::new ().unwrap (); + rt.block_on (async { let s = Store::new (vec! [ (b"key_dir".to_vec (), StatusQuotas { max_keys: 4, @@ -238,25 +244,25 @@ mod tests { ]); assert_eq! ( - s.set (b"key_dir", b"this key is too long and will cause an error".to_vec (), b"bar".to_vec ()), + s.set (b"key_dir", b"this key is too long and will cause an error".to_vec (), b"bar".to_vec ()).await, Err (Error::KeyTooLong) ); assert_eq! ( - s.set (b"key_dir", b"foo".to_vec (), b"this value is too long and will cause an error".to_vec ()), + s.set (b"key_dir", b"foo".to_vec (), b"this value is too long and will cause an error".to_vec ()).await, Err (Error::ValueTooLong) ); assert_eq! ( - s.set (b"invalid_key_dir", b"foo".to_vec (), b"bar".to_vec ()), + s.set (b"invalid_key_dir", b"foo".to_vec (), b"bar".to_vec ()).await, Err (Error::NoSuchKeyDir) ); - let ga = s.get_after (b"key_dir", None).unwrap (); + let ga = s.get_after (b"key_dir", None).await.unwrap (); assert_eq! (ga.sequence, expected_sequence); assert_eq! (ga.tuples, vec! []); - s.set (b"key_dir", b"foo_1".to_vec (), b"bar_1".to_vec ()).unwrap (); + s.set (b"key_dir", b"foo_1".to_vec (), b"bar_1".to_vec ()).await.unwrap (); expected_sequence += 1; - let ga = s.get_after (b"key_dir", None).unwrap (); + let ga = s.get_after (b"key_dir", None).await.unwrap (); assert_eq! (ga.sequence, expected_sequence); assert_eq! (ga.tuples, vec! [ @@ -270,9 +276,9 @@ mod tests { ] }); - s.set (b"key_dir", b"foo_2".to_vec (), b"bar_2".to_vec ()).unwrap (); + s.set (b"key_dir", b"foo_2".to_vec (), b"bar_2".to_vec ()).await.unwrap (); expected_sequence += 1; - let ga = s.get_after (b"key_dir", None).unwrap (); + let ga = s.get_after (b"key_dir", None).await.unwrap (); get_after_eq (&ga, &GetAfter { sequence: expected_sequence, @@ -282,9 +288,9 @@ mod tests { ] }); - s.set (b"key_dir", b"foo_1".to_vec (), b"bar_3".to_vec ()).unwrap (); + s.set (b"key_dir", b"foo_1".to_vec (), b"bar_3".to_vec ()).await.unwrap (); expected_sequence += 1; - let ga = s.get_after (b"key_dir", None).unwrap (); + let ga = s.get_after (b"key_dir", None).await.unwrap (); get_after_eq (&ga, &GetAfter { sequence: expected_sequence, @@ -294,7 +300,7 @@ mod tests { ] }); - let ga = s.get_after (b"key_dir", Some (2)).unwrap (); + let ga = s.get_after (b"key_dir", Some (2)).await.unwrap (); get_after_eq (&ga, &GetAfter { sequence: expected_sequence, tuples: vec! [ @@ -302,11 +308,12 @@ mod tests { ] }); - let ga = s.get_after (b"key_dir", Some (3)).unwrap (); + let ga = s.get_after (b"key_dir", Some (3)).await.unwrap (); get_after_eq (&ga, &GetAfter { sequence: expected_sequence, tuples: vec! [] }); + }); } #[test] @@ -314,6 +321,8 @@ mod tests { fn perf () { use std::time::Instant; + let mut rt = Runtime::new ().unwrap (); + rt.block_on (async { let s = Store::new (vec! [ (b"key_dir".to_vec (), StatusQuotas { max_keys: 4, @@ -332,7 +341,7 @@ mod tests { for i in 0..num_iters { let value = format! ("{}", i); - s.set (b"key_dir", key.clone (), value.into ()); + s.set (b"key_dir", key.clone (), value.into ()).await.unwrap (); } let end_time = Instant::now (); @@ -340,6 +349,7 @@ mod tests { let avg_nanos = total_dur.as_nanos () / num_iters; - assert! (avg_nanos < 200); + assert! (avg_nanos < 300, dbg! (avg_nanos)); + }); } }