From 88e7839841ad812f5645a30503955511ac107bdd Mon Sep 17 00:00:00 2001 From: _ <> Date: Sun, 3 Jan 2021 18:09:00 +0000 Subject: [PATCH] :construction: wip: outlining idea for Redis-like KV store in ptth_server --- Cargo.lock | 10 +- Cargo.toml | 2 +- crates/ptth_kv/Cargo.toml | 9 + crates/ptth_kv/src/main.rs | 311 +++++++++++++++++++++++++++ issues/2021-01Jan/status-DMX6CO4G.md | 11 +- 5 files changed, 339 insertions(+), 4 deletions(-) create mode 100644 crates/ptth_kv/Cargo.toml create mode 100644 crates/ptth_kv/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 4c0ece6..6042424 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1612,7 +1612,7 @@ dependencies = [ name = "ptth" version = "0.1.0" dependencies = [ - "base64 0.12.3", + "base64 0.13.0", "blake3", "chrono", "ptth_relay", @@ -1656,6 +1656,14 @@ dependencies = [ "uom", ] +[[package]] +name = "ptth_kv" +version = "0.1.0" +dependencies = [ + "base64 0.13.0", + "thiserror", +] + [[package]] name = "ptth_relay" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index d67c348..69bc604 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ license = "AGPL-3.0" [dev-dependencies] -base64 = "0.12.3" +base64 = "0.13.0" blake3 = "0.3.7" chrono = {version = "0.4.19", features = ["serde"]} reqwest = { version = "0.10.8", features = ["stream"] } diff --git a/crates/ptth_kv/Cargo.toml b/crates/ptth_kv/Cargo.toml new file mode 100644 index 0000000..9bceeaf --- /dev/null +++ b/crates/ptth_kv/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "ptth_kv" +version = "0.1.0" +authors = ["Trish"] +edition = "2018" + +[dependencies] +base64 = "0.13.0" +thiserror = "1.0.22" diff --git a/crates/ptth_kv/src/main.rs b/crates/ptth_kv/src/main.rs new file mode 100644 index 0000000..28f088b --- /dev/null +++ b/crates/ptth_kv/src/main.rs @@ -0,0 +1,311 @@ +use std::{ + collections::{ + HashMap, + }, + iter::FromIterator, + sync::{ + Mutex, + MutexGuard, + PoisonError, + }, +}; + +pub struct Store { + status_dirs: HashMap , StatusKeyDirectory>, +} + +#[derive (thiserror::Error, Debug, PartialEq)] +pub enum Error { + #[error ("key too long")] + KeyTooLong, + #[error ("mutex poisoned")] + MutexPoisoned, + #[error ("no such key dir")] + NoSuchKeyDir, + #[error ("value too long")] + ValueTooLong, +} + +pub struct StatusQuotas { + pub max_keys: usize, + pub max_key_bytes: usize, + pub max_value_bytes: usize, + pub max_payload_bytes: usize, +} + +pub struct GetAfter { + pub tuples: Vec <(Vec , Vec )>, + pub sequence: u64, +} + +impl Store { + pub fn new (status_dirs: I) + -> Self + where I: Iterator , StatusQuotas)> + { + let status_dirs = HashMap::from_iter ( + status_dirs + .map (|(name, quotas)| (name, StatusKeyDirectory::new (quotas))) + ); + + Self { + status_dirs, + } + } + + pub fn list_key_dirs (&self) -> Vec > { + self.status_dirs.iter () + .map (|(k, _)| k.clone ()) + .collect () + } + + pub fn set (&self, name: &[u8], key: Vec , value: Vec ) + -> Result <(), Error> + { + let dir = self.status_dirs.get (name) + .ok_or (Error::NoSuchKeyDir)?; + + dir.set (key, value) + } + + pub fn get_after (&self, name: &[u8], thresh: Option ) + -> Result + { + let dir = self.status_dirs.get (name) + .ok_or (Error::NoSuchKeyDir)?; + + dir.get_after (thresh) + } +} + +// End of public interface + +impl From >> for Error { + fn from (_: PoisonError >) -> Self { + Self::MutexPoisoned + } +} + +enum StoreCommand { + SetStatus (SetStatusCommand), + Multi (Vec ), +} + +struct StatusKeyDirectory { + quotas: StatusQuotas, + + // TODO: Make this tokio::sync::Mutex. + table: Mutex , +} + +#[derive (Default)] +struct StatusTable { + map: HashMap , StatusValue>, + sequence: u64, +} + +struct StatusValue { + value: Vec , + sequence: u64, +} + +struct SetStatusCommand { + key_dir: Vec , + key: Vec , + value: Vec , +} + +impl StatusKeyDirectory { + fn new (quotas: StatusQuotas) -> Self { + Self { + quotas, + table: Mutex::new (Default::default ()), + } + } + + fn set (&self, key: Vec , value: Vec ) -> Result <(), Error> + { + if key.len () > self.quotas.max_key_bytes { + return Err (Error::KeyTooLong); + } + + if value.len () > self.quotas.max_value_bytes { + return Err (Error::ValueTooLong); + } + + { + let mut guard = self.table.lock ()?; + guard.set (&self.quotas, key, value); + } + Ok (()) + } + + fn get_after (&self, thresh: Option ) -> Result { + let guard = self.table.lock ()?; + Ok (guard.get_after (thresh)) + } +} + +impl StatusTable { + fn payload_bytes (&self) -> usize { + self.map.iter () + .map (|(k, v)| k.len () + v.len ()) + .sum () + } + + fn set (&mut self, quotas: &StatusQuotas, key: Vec , value: Vec ) { + self.sequence += 1; + + if self.map.len () > quotas.max_keys { + self.map.clear (); + } + + let new_bytes = key.len () + value.len (); + + if self.payload_bytes () + new_bytes > quotas.max_payload_bytes { + self.map.clear (); + } + + let value = StatusValue { + value, + sequence: self.sequence, + }; + + self.map.insert (key, value); + } + + fn get_after (&self, thresh: Option ) -> GetAfter { + let thresh = thresh.unwrap_or (0); + + let tuples = self.map.iter () + .filter_map (|(key, value)| { + if value.sequence <= thresh { + return None; + } + + Some ((key.clone (), value.value.clone ())) + }) + .collect (); + + GetAfter { + tuples, + sequence: self.sequence, + } + } +} + +impl StatusValue { + fn len (&self) -> usize { + self.value.len () + } +} + +fn main () { + println! ("Hello, world!"); +} + +#[cfg (test)] +mod tests { + use super::*; + + fn get_after_eq (a: &GetAfter, b: &GetAfter) { + assert_eq! (a.sequence, b.sequence); + + let a = a.tuples.clone (); + let b = b.tuples.clone (); + + let a = HashMap::, Vec >::from_iter (a.into_iter ()); + let b = HashMap::from_iter (b.into_iter ()); + + assert_eq! (a, b); + } + + #[test] + fn store () { + let s = Store::new (vec! [ + (b"key_dir".to_vec (), StatusQuotas { + max_keys: 4, + max_key_bytes: 16, + max_value_bytes: 16, + max_payload_bytes: 128, + }), + ].into_iter ()); + + let mut expected_sequence = 0; + + assert_eq! (s.list_key_dirs (), vec! [ + b"key_dir".to_vec (), + ]); + + assert_eq! ( + s.set (b"key_dir", b"this key is too long and will cause an error".to_vec (), b"bar".to_vec ()), + Err (Error::KeyTooLong) + ); + assert_eq! ( + s.set (b"key_dir", b"foo".to_vec (), b"this value is too long and will cause an error".to_vec ()), + Err (Error::ValueTooLong) + ); + assert_eq! ( + s.set (b"invalid_key_dir", b"foo".to_vec (), b"bar".to_vec ()), + Err (Error::NoSuchKeyDir) + ); + + let ga = s.get_after (b"key_dir", None).unwrap (); + assert_eq! (ga.sequence, expected_sequence); + assert_eq! (ga.tuples, vec! []); + + s.set (b"key_dir", b"foo_1".to_vec (), b"bar_1".to_vec ()).unwrap (); + expected_sequence += 1; + let ga = s.get_after (b"key_dir", None).unwrap (); + + assert_eq! (ga.sequence, expected_sequence); + assert_eq! (ga.tuples, vec! [ + (b"foo_1".to_vec (), b"bar_1".to_vec ()), + ]); + + get_after_eq (&ga, &GetAfter { + sequence: expected_sequence, + tuples: vec! [ + (b"foo_1".to_vec (), b"bar_1".to_vec ()), + ] + }); + + s.set (b"key_dir", b"foo_2".to_vec (), b"bar_2".to_vec ()).unwrap (); + expected_sequence += 1; + let ga = s.get_after (b"key_dir", None).unwrap (); + + get_after_eq (&ga, &GetAfter { + sequence: expected_sequence, + tuples: vec! [ + (b"foo_1".to_vec (), b"bar_1".to_vec ()), + (b"foo_2".to_vec (), b"bar_2".to_vec ()), + ] + }); + + s.set (b"key_dir", b"foo_1".to_vec (), b"bar_3".to_vec ()).unwrap (); + expected_sequence += 1; + let ga = s.get_after (b"key_dir", None).unwrap (); + + get_after_eq (&ga, &GetAfter { + sequence: expected_sequence, + tuples: vec! [ + (b"foo_1".to_vec (), b"bar_3".to_vec ()), + (b"foo_2".to_vec (), b"bar_2".to_vec ()), + ] + }); + + let ga = s.get_after (b"key_dir", Some (2)).unwrap (); + get_after_eq (&ga, &GetAfter { + sequence: expected_sequence, + tuples: vec! [ + (b"foo_1".to_vec (), b"bar_3".to_vec ()), + ] + }); + + let ga = s.get_after (b"key_dir", Some (3)).unwrap (); + get_after_eq (&ga, &GetAfter { + sequence: expected_sequence, + tuples: vec! [] + }); + } +} diff --git a/issues/2021-01Jan/status-DMX6CO4G.md b/issues/2021-01Jan/status-DMX6CO4G.md index 999a0d8..2da9ae7 100644 --- a/issues/2021-01Jan/status-DMX6CO4G.md +++ b/issues/2021-01Jan/status-DMX6CO4G.md @@ -2,6 +2,12 @@ (DMX6CO4G) +## Bottom Line Up Front + +A memory-only key-value store for ptth_server. Other processes on the same +system can send data to ptth_server, and it will hold the data in memory +and expose it to ptth_relay. + ## Motivation This feature allows ptth_server to act as a @@ -31,7 +37,7 @@ The client app can send data with these semantics: Logs and status values do not share namespaces or quotas. ptth_server MUST be configured with quotas in order to enable this feature. There is no "unlimited" -quota option, but the quotas MAY be set to any 64-bit value. +quota option, but the quotas MAY be set to any usize value. Keys, values, and log strings are binary-safe, but SHOULD be valid UTF-8. If they are not valid UTF-8, they MAY not appear in client apps, or they may @@ -48,7 +54,7 @@ or at all. All data is subject to quotas based on time, byte count, key count, etc. Logs WILL be evicted in oldest-first order. Status values replace older values written to the same key. If a key directory reaches its quota, old values MAY be evicted in any order, including oldest-first, -alphabetical, or random. +alphabetical, random, or all at once. Quotas: ptth_relay is the source of truth for quotas. ptth_server SHOULD NOT store the quotas on disk, it SHOULD request them from ptth_relay when connecting, @@ -62,6 +68,7 @@ The quotas are all maximums. The quotas (with recommended value in parens) are: - Bytes for 1 key (1,024) - Bytes for 1 value (1,024) - Total payload (i.e. keys and values) bytes (262,144) +- Number of log events per key (2,048) It's hard to set a perfect limit on RAM usage, so the quotas have many different dimensions. There is some upper bound on RAM usage, but it will