diff --git a/Cargo.lock b/Cargo.lock index 5bdf8b2..4e83088 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1660,7 +1660,9 @@ dependencies = [ name = "ptth_kv" version = "0.1.0" dependencies = [ + "anyhow", "base64 0.13.0", + "hyper", "thiserror", "tokio", ] diff --git a/crates/ptth_kv/Cargo.toml b/crates/ptth_kv/Cargo.toml index e3b8b2b..7935b66 100644 --- a/crates/ptth_kv/Cargo.toml +++ b/crates/ptth_kv/Cargo.toml @@ -5,6 +5,8 @@ authors = ["Trish"] edition = "2018" [dependencies] +anyhow = "1.0.34" base64 = "0.13.0" +hyper = "0.13.8" thiserror = "1.0.22" tokio = { version = "0.2.22", features = ["full"] } diff --git a/crates/ptth_kv/src/main.rs b/crates/ptth_kv/src/main.rs index 49709f3..9c75e94 100644 --- a/crates/ptth_kv/src/main.rs +++ b/crates/ptth_kv/src/main.rs @@ -4,16 +4,24 @@ use std::{ }, iter::FromIterator, sync::{ - MutexGuard, - PoisonError, + Arc, }, }; +use hyper::{ + Body, + Request, + Response, + StatusCode, +}; use tokio::{ - runtime::Runtime, sync::Mutex, }; +pub struct HttpService { + store: Arc , +} + pub struct Store { status_dirs: HashMap , StatusKeyDirectory>, } @@ -22,8 +30,6 @@ pub struct Store { pub enum Error { #[error ("key too long")] KeyTooLong, - #[error ("mutex poisoned")] - MutexPoisoned, #[error ("no such key dir")] NoSuchKeyDir, #[error ("value too long")] @@ -42,6 +48,50 @@ pub struct GetAfter { pub sequence: u64, } +impl HttpService { + pub fn new (s: Store) -> Self { + Self { + store: Arc::new (s), + } + } + + pub fn inner (&self) -> &Store { + &*self.store + } + + pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> { + use std::net::SocketAddr; + + use hyper::{ + server::Server, + service::{ + make_service_fn, + service_fn, + }, + }; + + let make_svc = make_service_fn (|_conn| { + let state = self.store.clone (); + + async { + Ok::<_, String> (service_fn (move |req| { + let state = state.clone (); + + Self::handle_all (req, state) + })) + } + }); + + let addr = SocketAddr::from(([127, 0, 0, 1], port)); + + let server = Server::bind (&addr) + .serve (make_svc) + ; + + server.await + } +} + impl Store { pub fn new (status_dirs: I) -> Self @@ -63,7 +113,7 @@ impl Store { .collect () } - pub async fn set (&self, name: &[u8], key: Vec , value: Vec ) + pub async fn set (&self, name: &[u8], key: &[u8], value: Vec ) -> Result <(), Error> { let dir = self.status_dirs.get (name) @@ -72,6 +122,15 @@ impl Store { dir.set (key, value).await } + async fn set_multi (&self, name: &[u8], tuples: Vec <(&[u8], Vec )>) + -> Result <(), Error> + { + let dir = self.status_dirs.get (name) + .ok_or (Error::NoSuchKeyDir)?; + + dir.set_multi (tuples).await + } + pub async fn get_after (&self, name: &[u8], thresh: Option ) -> Result { @@ -84,11 +143,7 @@ impl Store { // End of public interface -impl From >> for Error { - fn from (_: PoisonError >) -> Self { - Self::MutexPoisoned - } -} +const SET_BATCH_SIZE: usize = 32; enum StoreCommand { SetStatus (SetStatusCommand), @@ -119,6 +174,15 @@ struct SetStatusCommand { value: Vec , } +impl HttpService { + async fn handle_all (req: Request , store: Arc ) + -> Result , anyhow::Error> + { + Ok (Response::builder () + .body (Body::from ("hello\n"))?) + } +} + impl StatusKeyDirectory { fn new (quotas: StatusQuotas) -> Self { Self { @@ -127,7 +191,7 @@ impl StatusKeyDirectory { } } - async fn set (&self, key: Vec , value: Vec ) -> Result <(), Error> + async fn set (&self, key: &[u8], value: Vec ) -> Result <(), Error> { if key.len () > self.quotas.max_key_bytes { return Err (Error::KeyTooLong); @@ -144,6 +208,17 @@ impl StatusKeyDirectory { Ok (()) } + async fn set_multi (&self, tuples: Vec <(&[u8], Vec )>) -> Result <(), Error> + { + { + let mut guard = self.table.lock ().await; + for (key, value) in tuples { + guard.set (&self.quotas, key, value); + } + } + Ok (()) + } + async fn get_after (&self, thresh: Option ) -> Result { let guard = self.table.lock ().await; Ok (guard.get_after (thresh)) @@ -157,7 +232,7 @@ impl StatusTable { .sum () } - fn set (&mut self, quotas: &StatusQuotas, key: Vec , value: Vec ) { + fn set (&mut self, quotas: &StatusQuotas, key: &[u8], value: Vec ) { self.sequence += 1; if self.map.len () > quotas.max_keys { @@ -175,7 +250,13 @@ impl StatusTable { sequence: self.sequence, }; - self.map.insert (key, value); + // self.map.insert (key, value); + match self.map.get_mut (key) { + None => { + self.map.insert (key.to_vec (), value); + }, + Some (v) => *v = value, + } } fn get_after (&self, thresh: Option ) -> GetAfter { @@ -204,12 +285,24 @@ impl StatusValue { } } -fn main () { - println! ("Hello, world!"); +#[tokio::main] +async fn main () -> Result <(), hyper::Error> { + let service = HttpService::new (Store::new (vec! [ + (b"key_dir".to_vec (), StatusQuotas { + max_keys: 4, + max_key_bytes: 16, + max_value_bytes: 16, + max_payload_bytes: 128, + }), + ].into_iter ())); + + service.serve (4003).await } #[cfg (test)] mod tests { + use tokio::runtime::Runtime; + use super::*; fn get_after_eq (a: &GetAfter, b: &GetAfter) { @@ -244,15 +337,15 @@ mod tests { ]); assert_eq! ( - s.set (b"key_dir", b"this key is too long and will cause an error".to_vec (), b"bar".to_vec ()).await, + s.set (b"key_dir", b"this key is too long and will cause an error", b"bar".to_vec ()).await, Err (Error::KeyTooLong) ); assert_eq! ( - s.set (b"key_dir", b"foo".to_vec (), b"this value is too long and will cause an error".to_vec ()).await, + s.set (b"key_dir", b"foo", b"this value is too long and will cause an error".to_vec ()).await, Err (Error::ValueTooLong) ); assert_eq! ( - s.set (b"invalid_key_dir", b"foo".to_vec (), b"bar".to_vec ()).await, + s.set (b"invalid_key_dir", b"foo", b"bar".to_vec ()).await, Err (Error::NoSuchKeyDir) ); @@ -260,7 +353,7 @@ mod tests { assert_eq! (ga.sequence, expected_sequence); assert_eq! (ga.tuples, vec! []); - s.set (b"key_dir", b"foo_1".to_vec (), b"bar_1".to_vec ()).await.unwrap (); + s.set (b"key_dir", b"foo_1", b"bar_1".to_vec ()).await.unwrap (); expected_sequence += 1; let ga = s.get_after (b"key_dir", None).await.unwrap (); @@ -276,7 +369,7 @@ mod tests { ] }); - s.set (b"key_dir", b"foo_2".to_vec (), b"bar_2".to_vec ()).await.unwrap (); + s.set (b"key_dir", b"foo_2", b"bar_2".to_vec ()).await.unwrap (); expected_sequence += 1; let ga = s.get_after (b"key_dir", None).await.unwrap (); @@ -288,7 +381,7 @@ mod tests { ] }); - s.set (b"key_dir", b"foo_1".to_vec (), b"bar_3".to_vec ()).await.unwrap (); + s.set (b"key_dir", b"foo_1", b"bar_3".to_vec ()).await.unwrap (); expected_sequence += 1; let ga = s.get_after (b"key_dir", None).await.unwrap (); @@ -334,14 +427,14 @@ mod tests { let num_iters = 1_000_000; - let key = b"foo".to_vec (); + let key = b"foo"; let start_time = Instant::now (); for i in 0..num_iters { let value = format! ("{}", i); - s.set (b"key_dir", key.clone (), value.into ()).await.unwrap (); + s.set (b"key_dir", key, value.into ()).await.unwrap (); } let end_time = Instant::now (); @@ -349,7 +442,52 @@ mod tests { let avg_nanos = total_dur.as_nanos () / num_iters; - assert! (avg_nanos < 300, dbg! (avg_nanos)); + assert! (avg_nanos < 250, dbg! (avg_nanos)); + }); + } + + #[test] + #[cfg (not (debug_assertions))] + fn perf_multi () { + use std::time::Instant; + + let mut rt = Runtime::new ().unwrap (); + rt.block_on (async { + let s = Store::new (vec! [ + (b"key_dir".to_vec (), StatusQuotas { + max_keys: 8, + max_key_bytes: 16, + max_value_bytes: 16, + max_payload_bytes: 128, + }), + ].into_iter ()); + + let num_iters = 1_000_000; + + let start_time = Instant::now (); + + for i in 0..num_iters { + let value = Vec::::from (format! ("{}", i)); + let tuples = vec! [ + (&b"foo_0"[..], value.clone ()), + (b"foo_1", value.clone ()), + (b"foo_2", value.clone ()), + (b"foo_3", value.clone ()), + (b"foo_4", value.clone ()), + (b"foo_5", value.clone ()), + (b"foo_6", value.clone ()), + (b"foo_7", value.clone ()), + ]; + + s.set_multi (b"key_dir", tuples).await.unwrap (); + } + + let end_time = Instant::now (); + let total_dur = end_time - start_time; + + let avg_nanos = total_dur.as_nanos () / (num_iters * 8); + + assert! (avg_nanos < 150, dbg! (avg_nanos)); }); } } diff --git a/docs/build-artifacts.md b/docs/build-artifacts.md index fc48ef9..6cc5298 100644 --- a/docs/build-artifacts.md +++ b/docs/build-artifacts.md @@ -10,3 +10,17 @@ Sizes: - ptth_server: 14 MB - ptth_relay gzipped: 3 MB - ptth_server gzipped: 5 MB + +Git commit ??? + +Sizes: + +- ptth_kv: 5.5M +- ptth_relay: 11M +- ptth_server: 14M + +Gzipped sizes: + +- ptth_kv: 1.6 MB +- ptth_relay: 3.3 MB +- ptth_server: 4.9 MB