🚧 wip: outlining idea for Redis-like KV store in ptth_server

_ 2021-01-03 18:09:00 +00:00
parent 075fd604ee
commit 88e7839841
5 changed files with 339 additions and 4 deletions

Cargo.lock generated
View File

@ -1612,7 +1612,7 @@ dependencies = [
name = "ptth"
version = "0.1.0"
dependencies = [
"base64 0.12.3",
"base64 0.13.0",
@ -1656,6 +1656,14 @@ dependencies = [
name = "ptth_kv"
version = "0.1.0"
dependencies = [
"base64 0.13.0",
name = "ptth_relay"
version = "0.1.0"

View File

@ -10,7 +10,7 @@ license = "AGPL-3.0"
base64 = "0.12.3"
base64 = "0.13.0"
blake3 = "0.3.7"
chrono = {version = "0.4.19", features = ["serde"]}
reqwest = { version = "0.10.8", features = ["stream"] }

View File

@ -0,0 +1,9 @@
name = "ptth_kv"
version = "0.1.0"
authors = ["Trish"]
edition = "2018"
base64 = "0.13.0"
thiserror = "1.0.22"

crates/ptth_kv/src/main.rs Normal file
View File

@ -0,0 +1,311 @@
use std::{
pub struct Store {
status_dirs: HashMap <Vec <u8>, StatusKeyDirectory>,
#[derive (thiserror::Error, Debug, PartialEq)]
pub enum Error {
#[error ("key too long")]
#[error ("mutex poisoned")]
#[error ("no such key dir")]
#[error ("value too long")]
pub struct StatusQuotas {
pub max_keys: usize,
pub max_key_bytes: usize,
pub max_value_bytes: usize,
pub max_payload_bytes: usize,
pub struct GetAfter {
pub tuples: Vec <(Vec <u8>, Vec <u8>)>,
pub sequence: u64,
impl Store {
pub fn new <I> (status_dirs: I)
-> Self
where I: Iterator <Item = (Vec <u8>, StatusQuotas)>
let status_dirs = HashMap::from_iter (
.map (|(name, quotas)| (name, StatusKeyDirectory::new (quotas)))
Self {
pub fn list_key_dirs (&self) -> Vec <Vec <u8>> {
self.status_dirs.iter ()
.map (|(k, _)| k.clone ())
.collect ()
pub fn set (&self, name: &[u8], key: Vec <u8>, value: Vec <u8>)
-> Result <(), Error>
let dir = self.status_dirs.get (name)
.ok_or (Error::NoSuchKeyDir)?;
dir.set (key, value)
pub fn get_after (&self, name: &[u8], thresh: Option <u64>)
-> Result <GetAfter, Error>
let dir = self.status_dirs.get (name)
.ok_or (Error::NoSuchKeyDir)?;
dir.get_after (thresh)
// End of public interface
impl <T> From <PoisonError <MutexGuard <'_, T>>> for Error {
fn from (_: PoisonError <MutexGuard <'_, T>>) -> Self {
enum StoreCommand {
SetStatus (SetStatusCommand),
Multi (Vec <StoreCommand>),
struct StatusKeyDirectory {
quotas: StatusQuotas,
// TODO: Make this tokio::sync::Mutex.
table: Mutex <StatusTable>,
#[derive (Default)]
struct StatusTable {
map: HashMap <Vec <u8>, StatusValue>,
sequence: u64,
struct StatusValue {
value: Vec <u8>,
sequence: u64,
struct SetStatusCommand {
key_dir: Vec <u8>,
key: Vec <u8>,
value: Vec <u8>,
impl StatusKeyDirectory {
fn new (quotas: StatusQuotas) -> Self {
Self {
table: Mutex::new (Default::default ()),
fn set (&self, key: Vec <u8>, value: Vec <u8>) -> Result <(), Error>
if key.len () > self.quotas.max_key_bytes {
return Err (Error::KeyTooLong);
if value.len () > self.quotas.max_value_bytes {
return Err (Error::ValueTooLong);
let mut guard = self.table.lock ()?;
guard.set (&self.quotas, key, value);
Ok (())
fn get_after (&self, thresh: Option <u64>) -> Result <GetAfter, Error> {
let guard = self.table.lock ()?;
Ok (guard.get_after (thresh))
impl StatusTable {
fn payload_bytes (&self) -> usize {
self.map.iter ()
.map (|(k, v)| k.len () + v.len ())
.sum ()
fn set (&mut self, quotas: &StatusQuotas, key: Vec <u8>, value: Vec <u8>) {
self.sequence += 1;
if self.map.len () > quotas.max_keys {
self.map.clear ();
let new_bytes = key.len () + value.len ();
if self.payload_bytes () + new_bytes > quotas.max_payload_bytes {
self.map.clear ();
let value = StatusValue {
sequence: self.sequence,
self.map.insert (key, value);
fn get_after (&self, thresh: Option <u64>) -> GetAfter {
let thresh = thresh.unwrap_or (0);
let tuples = self.map.iter ()
.filter_map (|(key, value)| {
if value.sequence <= thresh {
return None;
Some ((key.clone (), value.value.clone ()))
.collect ();
GetAfter {
sequence: self.sequence,
impl StatusValue {
fn len (&self) -> usize {
self.value.len ()
fn main () {
println! ("Hello, world!");
#[cfg (test)]
mod tests {
use super::*;
fn get_after_eq (a: &GetAfter, b: &GetAfter) {
assert_eq! (a.sequence, b.sequence);
let a = a.tuples.clone ();
let b = b.tuples.clone ();
let a = HashMap::<Vec <u8>, Vec <u8>>::from_iter (a.into_iter ());
let b = HashMap::from_iter (b.into_iter ());
assert_eq! (a, b);
fn store () {
let s = Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 4,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
].into_iter ());
let mut expected_sequence = 0;
assert_eq! (s.list_key_dirs (), vec! [
b"key_dir".to_vec (),
assert_eq! (
s.set (b"key_dir", b"this key is too long and will cause an error".to_vec (), b"bar".to_vec ()),
Err (Error::KeyTooLong)
assert_eq! (
s.set (b"key_dir", b"foo".to_vec (), b"this value is too long and will cause an error".to_vec ()),
Err (Error::ValueTooLong)
assert_eq! (
s.set (b"invalid_key_dir", b"foo".to_vec (), b"bar".to_vec ()),
Err (Error::NoSuchKeyDir)
let ga = s.get_after (b"key_dir", None).unwrap ();
assert_eq! (ga.sequence, expected_sequence);
assert_eq! (ga.tuples, vec! []);
s.set (b"key_dir", b"foo_1".to_vec (), b"bar_1".to_vec ()).unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).unwrap ();
assert_eq! (ga.sequence, expected_sequence);
assert_eq! (ga.tuples, vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
s.set (b"key_dir", b"foo_2".to_vec (), b"bar_2".to_vec ()).unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
s.set (b"key_dir", b"foo_1".to_vec (), b"bar_3".to_vec ()).unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
let ga = s.get_after (b"key_dir", Some (2)).unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
let ga = s.get_after (b"key_dir", Some (3)).unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! []

View File

@ -2,6 +2,12 @@
## Bottom Line Up Front
A memory-only key-value store for ptth_server. Other processes on the same
system can send data to ptth_server, and it will hold the data in memory
and expose it to ptth_relay.
## Motivation
This feature allows ptth_server to act as a
@ -31,7 +37,7 @@ The client app can send data with these semantics:
Logs and status values do not share namespaces or quotas. ptth_server MUST be
configured with quotas in order to enable this feature. There is no "unlimited"
quota option, but the quotas MAY be set to any 64-bit value.
quota option, but the quotas MAY be set to any usize value.
Keys, values, and log strings are binary-safe, but SHOULD be valid UTF-8.
If they are not valid UTF-8, they MAY not appear in client apps, or they may
@ -48,7 +54,7 @@ or at all. All data is subject to quotas based on time, byte count, key count,
etc. Logs WILL be evicted in oldest-first order. Status values
replace older values written to the same key. If a key directory reaches
its quota, old values MAY be evicted in any order, including oldest-first,
alphabetical, or random.
alphabetical, random, or all at once.
Quotas: ptth_relay is the source of truth for quotas. ptth_server SHOULD NOT
store the quotas on disk, it SHOULD request them from ptth_relay when connecting,
@ -62,6 +68,7 @@ The quotas are all maximums. The quotas (with recommended value in parens) are:
- Bytes for 1 key (1,024)
- Bytes for 1 value (1,024)
- Total payload (i.e. keys and values) bytes (262,144)
- Number of log events per key (2,048)
It's hard to set a perfect limit on RAM usage, so the quotas have many
different dimensions. There is some upper bound on RAM usage, but it will