use std::{ collections::{ HashMap, }, iter::FromIterator, sync::{ Arc, }, }; use hyper::{ Body, Request, Response, StatusCode, }; use tokio::{ sync::Mutex, }; pub struct HttpService { store: Arc , } pub struct Store { status_dirs: HashMap , StatusKeyDirectory>, } #[derive (thiserror::Error, Debug, PartialEq)] pub enum Error { #[error ("key too long")] KeyTooLong, #[error ("no such key dir")] NoSuchKeyDir, #[error ("value too long")] ValueTooLong, } pub struct StatusQuotas { pub max_keys: usize, pub max_key_bytes: usize, pub max_value_bytes: usize, pub max_payload_bytes: usize, } pub struct GetAfter { pub tuples: Vec <(Vec , Vec )>, pub sequence: u64, } impl HttpService { pub fn new (s: Store) -> Self { Self { store: Arc::new (s), } } pub fn inner (&self) -> &Store { &*self.store } pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> { use std::net::SocketAddr; use hyper::{ Server, service::{ make_service_fn, service_fn, }, }; let make_svc = make_service_fn (|_conn| { let state = self.store.clone (); async { Ok::<_, String> (service_fn (move |req| { let state = state.clone (); Self::handle_all (req, state) })) } }); let addr = SocketAddr::from(([127, 0, 0, 1], port)); let server = Server::bind (&addr) .serve (make_svc) ; server.await } } impl Store { pub fn new (status_dirs: I) -> Self where I: Iterator , StatusQuotas)> { let status_dirs = status_dirs .map (|(name, quotas)| (name, StatusKeyDirectory::new (quotas))) .collect (); Self { status_dirs, } } pub fn list_key_dirs (&self) -> Vec > { self.status_dirs.iter () .map (|(k, _)| k.clone ()) .collect () } pub async fn set (&self, name: &[u8], key: &[u8], value: Vec ) -> Result <(), Error> { let dir = self.status_dirs.get (name) .ok_or (Error::NoSuchKeyDir)?; dir.set (key, value).await } async fn set_multi (&self, name: &[u8], tuples: Vec <(&[u8], Vec )>) -> Result <(), Error> { let dir = self.status_dirs.get (name) .ok_or (Error::NoSuchKeyDir)?; dir.set_multi (tuples).await } pub async fn get_after (&self, name: &[u8], thresh: Option ) -> Result { let dir = self.status_dirs.get (name) .ok_or (Error::NoSuchKeyDir)?; dir.get_after (thresh).await } } // End of public interface const SET_BATCH_SIZE: usize = 32; enum StoreCommand { SetStatus (SetStatusCommand), Multi (Vec ), } struct StatusKeyDirectory { quotas: StatusQuotas, // TODO: Make this tokio::sync::Mutex. table: Mutex , } #[derive (Default)] struct StatusTable { map: HashMap , StatusValue>, sequence: u64, } struct StatusValue { value: Vec , sequence: u64, } struct SetStatusCommand { key_dir: Vec , key: Vec , value: Vec , } impl HttpService { async fn handle_all (req: Request , store: Arc ) -> Result , anyhow::Error> { Ok (Response::builder () .body (Body::from ("hello\n"))?) } } impl StatusKeyDirectory { fn new (quotas: StatusQuotas) -> Self { Self { quotas, table: Mutex::new (Default::default ()), } } async fn set (&self, key: &[u8], value: Vec ) -> Result <(), Error> { if key.len () > self.quotas.max_key_bytes { return Err (Error::KeyTooLong); } if value.len () > self.quotas.max_value_bytes { return Err (Error::ValueTooLong); } { let mut guard = self.table.lock ().await; guard.set (&self.quotas, key, value); } Ok (()) } async fn set_multi (&self, tuples: Vec <(&[u8], Vec )>) -> Result <(), Error> { { let mut guard = self.table.lock ().await; for (key, value) in tuples { guard.set (&self.quotas, key, value); } } Ok (()) } async fn get_after (&self, thresh: Option ) -> Result { let guard = self.table.lock ().await; Ok (guard.get_after (thresh)) } } impl StatusTable { fn payload_bytes (&self) -> usize { self.map.iter () .map (|(k, v)| k.len () + v.len ()) .sum () } fn set (&mut self, quotas: &StatusQuotas, key: &[u8], value: Vec ) { self.sequence += 1; if self.map.len () > quotas.max_keys { self.map.clear (); } let new_bytes = key.len () + value.len (); if self.payload_bytes () + new_bytes > quotas.max_payload_bytes { self.map.clear (); } let value = StatusValue { value, sequence: self.sequence, }; // self.map.insert (key, value); match self.map.get_mut (key) { None => { self.map.insert (key.to_vec (), value); }, Some (v) => *v = value, } } fn get_after (&self, thresh: Option ) -> GetAfter { let thresh = thresh.unwrap_or (0); let tuples = self.map.iter () .filter_map (|(key, value)| { if value.sequence <= thresh { return None; } Some ((key.clone (), value.value.clone ())) }) .collect (); GetAfter { tuples, sequence: self.sequence, } } } impl StatusValue { fn len (&self) -> usize { self.value.len () } } #[tokio::main] async fn main () -> Result <(), hyper::Error> { use std::time::Duration; use tokio::{ spawn, time::interval, }; let service = HttpService::new (Store::new (vec! [ (b"key_dir".to_vec (), StatusQuotas { max_keys: 4, max_key_bytes: 16, max_value_bytes: 16, max_payload_bytes: 128, }), ].into_iter ())); service.serve (4003).await } #[cfg (test)] mod tests { use tokio::runtime::Runtime; use super::*; fn get_after_eq (a: &GetAfter, b: &GetAfter) { assert_eq! (a.sequence, b.sequence); let a = a.tuples.clone (); let b = b.tuples.clone (); let a = HashMap::, Vec >::from_iter (a.into_iter ()); let b = HashMap::from_iter (b.into_iter ()); assert_eq! (a, b); } #[test] fn store () { let mut rt = Runtime::new ().unwrap (); rt.block_on (async { let s = Store::new (vec! [ (b"key_dir".to_vec (), StatusQuotas { max_keys: 4, max_key_bytes: 16, max_value_bytes: 16, max_payload_bytes: 128, }), ].into_iter ()); let mut expected_sequence = 0; assert_eq! (s.list_key_dirs (), vec! [ b"key_dir".to_vec (), ]); assert_eq! ( s.set (b"key_dir", b"this key is too long and will cause an error", b"bar".to_vec ()).await, Err (Error::KeyTooLong) ); assert_eq! ( s.set (b"key_dir", b"foo", b"this value is too long and will cause an error".to_vec ()).await, Err (Error::ValueTooLong) ); assert_eq! ( s.set (b"invalid_key_dir", b"foo", b"bar".to_vec ()).await, Err (Error::NoSuchKeyDir) ); let ga = s.get_after (b"key_dir", None).await.unwrap (); assert_eq! (ga.sequence, expected_sequence); assert_eq! (ga.tuples, vec! []); s.set (b"key_dir", b"foo_1", b"bar_1".to_vec ()).await.unwrap (); expected_sequence += 1; let ga = s.get_after (b"key_dir", None).await.unwrap (); assert_eq! (ga.sequence, expected_sequence); assert_eq! (ga.tuples, vec! [ (b"foo_1".to_vec (), b"bar_1".to_vec ()), ]); get_after_eq (&ga, &GetAfter { sequence: expected_sequence, tuples: vec! [ (b"foo_1".to_vec (), b"bar_1".to_vec ()), ] }); s.set (b"key_dir", b"foo_2", b"bar_2".to_vec ()).await.unwrap (); expected_sequence += 1; let ga = s.get_after (b"key_dir", None).await.unwrap (); get_after_eq (&ga, &GetAfter { sequence: expected_sequence, tuples: vec! [ (b"foo_1".to_vec (), b"bar_1".to_vec ()), (b"foo_2".to_vec (), b"bar_2".to_vec ()), ] }); s.set (b"key_dir", b"foo_1", b"bar_3".to_vec ()).await.unwrap (); expected_sequence += 1; let ga = s.get_after (b"key_dir", None).await.unwrap (); get_after_eq (&ga, &GetAfter { sequence: expected_sequence, tuples: vec! [ (b"foo_1".to_vec (), b"bar_3".to_vec ()), (b"foo_2".to_vec (), b"bar_2".to_vec ()), ] }); let ga = s.get_after (b"key_dir", Some (2)).await.unwrap (); get_after_eq (&ga, &GetAfter { sequence: expected_sequence, tuples: vec! [ (b"foo_1".to_vec (), b"bar_3".to_vec ()), ] }); let ga = s.get_after (b"key_dir", Some (3)).await.unwrap (); get_after_eq (&ga, &GetAfter { sequence: expected_sequence, tuples: vec! [] }); }); } #[test] #[cfg (not (debug_assertions))] fn perf () { use std::time::Instant; let mut rt = Runtime::new ().unwrap (); rt.block_on (async { let s = Store::new (vec! [ (b"key_dir".to_vec (), StatusQuotas { max_keys: 4, max_key_bytes: 16, max_value_bytes: 16, max_payload_bytes: 128, }), ].into_iter ()); let num_iters = 1_000_000; let key = b"foo"; let start_time = Instant::now (); for i in 0..num_iters { let value = format! ("{}", i); s.set (b"key_dir", key, value.into ()).await.unwrap (); } let end_time = Instant::now (); let total_dur = end_time - start_time; let avg_nanos = total_dur.as_nanos () / num_iters; assert! (avg_nanos < 250, dbg! (avg_nanos)); }); } #[test] #[cfg (not (debug_assertions))] fn perf_multi () { use std::time::Instant; let mut rt = Runtime::new ().unwrap (); rt.block_on (async { let s = Store::new (vec! [ (b"key_dir".to_vec (), StatusQuotas { max_keys: 8, max_key_bytes: 16, max_value_bytes: 16, max_payload_bytes: 128, }), ].into_iter ()); let num_iters = 1_000_000; let start_time = Instant::now (); for i in 0..num_iters { let value = Vec::::from (format! ("{}", i)); let tuples = vec! [ (&b"foo_0"[..], value.clone ()), (b"foo_1", value.clone ()), (b"foo_2", value.clone ()), (b"foo_3", value.clone ()), (b"foo_4", value.clone ()), (b"foo_5", value.clone ()), (b"foo_6", value.clone ()), (b"foo_7", value.clone ()), ]; s.set_multi (b"key_dir", tuples).await.unwrap (); } let end_time = Instant::now (); let total_dur = end_time - start_time; let avg_nanos = total_dur.as_nanos () / (num_iters * 8); assert! (avg_nanos < 150, dbg! (avg_nanos)); }); } }