Merge remote-tracking branch 'github/main'

main
_ 2021-01-17 20:52:50 -06:00
commit 9de30a0dca
6 changed files with 419 additions and 2 deletions

10
Cargo.lock generated
View File

@ -1612,7 +1612,7 @@ dependencies = [
name = "ptth" name = "ptth"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"base64 0.12.3", "base64 0.13.0",
"blake3", "blake3",
"chrono", "chrono",
"ptth_relay", "ptth_relay",
@ -1656,6 +1656,14 @@ dependencies = [
"uom", "uom",
] ]
[[package]]
name = "ptth_kv"
version = "0.1.0"
dependencies = [
"base64 0.13.0",
"thiserror",
]
[[package]] [[package]]
name = "ptth_relay" name = "ptth_relay"
version = "0.1.0" version = "0.1.0"

View File

@ -10,7 +10,7 @@ license = "AGPL-3.0"
[dev-dependencies] [dev-dependencies]
base64 = "0.12.3" base64 = "0.13.0"
blake3 = "0.3.7" blake3 = "0.3.7"
chrono = {version = "0.4.19", features = ["serde"]} chrono = {version = "0.4.19", features = ["serde"]}
reqwest = { version = "0.10.8", features = ["stream"] } reqwest = { version = "0.10.8", features = ["stream"] }

View File

@ -0,0 +1,9 @@
[package]
name = "ptth_kv"
version = "0.1.0"
authors = ["Trish"]
edition = "2018"
[dependencies]
base64 = "0.13.0"
thiserror = "1.0.22"

311
crates/ptth_kv/src/main.rs Normal file
View File

@ -0,0 +1,311 @@
use std::{
collections::{
HashMap,
},
iter::FromIterator,
sync::{
Mutex,
MutexGuard,
PoisonError,
},
};
pub struct Store {
status_dirs: HashMap <Vec <u8>, StatusKeyDirectory>,
}
#[derive (thiserror::Error, Debug, PartialEq)]
pub enum Error {
#[error ("key too long")]
KeyTooLong,
#[error ("mutex poisoned")]
MutexPoisoned,
#[error ("no such key dir")]
NoSuchKeyDir,
#[error ("value too long")]
ValueTooLong,
}
pub struct StatusQuotas {
pub max_keys: usize,
pub max_key_bytes: usize,
pub max_value_bytes: usize,
pub max_payload_bytes: usize,
}
pub struct GetAfter {
pub tuples: Vec <(Vec <u8>, Vec <u8>)>,
pub sequence: u64,
}
impl Store {
pub fn new <I> (status_dirs: I)
-> Self
where I: Iterator <Item = (Vec <u8>, StatusQuotas)>
{
let status_dirs = HashMap::from_iter (
status_dirs
.map (|(name, quotas)| (name, StatusKeyDirectory::new (quotas)))
);
Self {
status_dirs,
}
}
pub fn list_key_dirs (&self) -> Vec <Vec <u8>> {
self.status_dirs.iter ()
.map (|(k, _)| k.clone ())
.collect ()
}
pub fn set (&self, name: &[u8], key: Vec <u8>, value: Vec <u8>)
-> Result <(), Error>
{
let dir = self.status_dirs.get (name)
.ok_or (Error::NoSuchKeyDir)?;
dir.set (key, value)
}
pub fn get_after (&self, name: &[u8], thresh: Option <u64>)
-> Result <GetAfter, Error>
{
let dir = self.status_dirs.get (name)
.ok_or (Error::NoSuchKeyDir)?;
dir.get_after (thresh)
}
}
// End of public interface
impl <T> From <PoisonError <MutexGuard <'_, T>>> for Error {
fn from (_: PoisonError <MutexGuard <'_, T>>) -> Self {
Self::MutexPoisoned
}
}
enum StoreCommand {
SetStatus (SetStatusCommand),
Multi (Vec <StoreCommand>),
}
struct StatusKeyDirectory {
quotas: StatusQuotas,
// TODO: Make this tokio::sync::Mutex.
table: Mutex <StatusTable>,
}
#[derive (Default)]
struct StatusTable {
map: HashMap <Vec <u8>, StatusValue>,
sequence: u64,
}
struct StatusValue {
value: Vec <u8>,
sequence: u64,
}
struct SetStatusCommand {
key_dir: Vec <u8>,
key: Vec <u8>,
value: Vec <u8>,
}
impl StatusKeyDirectory {
fn new (quotas: StatusQuotas) -> Self {
Self {
quotas,
table: Mutex::new (Default::default ()),
}
}
fn set (&self, key: Vec <u8>, value: Vec <u8>) -> Result <(), Error>
{
if key.len () > self.quotas.max_key_bytes {
return Err (Error::KeyTooLong);
}
if value.len () > self.quotas.max_value_bytes {
return Err (Error::ValueTooLong);
}
{
let mut guard = self.table.lock ()?;
guard.set (&self.quotas, key, value);
}
Ok (())
}
fn get_after (&self, thresh: Option <u64>) -> Result <GetAfter, Error> {
let guard = self.table.lock ()?;
Ok (guard.get_after (thresh))
}
}
impl StatusTable {
fn payload_bytes (&self) -> usize {
self.map.iter ()
.map (|(k, v)| k.len () + v.len ())
.sum ()
}
fn set (&mut self, quotas: &StatusQuotas, key: Vec <u8>, value: Vec <u8>) {
self.sequence += 1;
if self.map.len () > quotas.max_keys {
self.map.clear ();
}
let new_bytes = key.len () + value.len ();
if self.payload_bytes () + new_bytes > quotas.max_payload_bytes {
self.map.clear ();
}
let value = StatusValue {
value,
sequence: self.sequence,
};
self.map.insert (key, value);
}
fn get_after (&self, thresh: Option <u64>) -> GetAfter {
let thresh = thresh.unwrap_or (0);
let tuples = self.map.iter ()
.filter_map (|(key, value)| {
if value.sequence <= thresh {
return None;
}
Some ((key.clone (), value.value.clone ()))
})
.collect ();
GetAfter {
tuples,
sequence: self.sequence,
}
}
}
impl StatusValue {
fn len (&self) -> usize {
self.value.len ()
}
}
fn main () {
println! ("Hello, world!");
}
#[cfg (test)]
mod tests {
use super::*;
fn get_after_eq (a: &GetAfter, b: &GetAfter) {
assert_eq! (a.sequence, b.sequence);
let a = a.tuples.clone ();
let b = b.tuples.clone ();
let a = HashMap::<Vec <u8>, Vec <u8>>::from_iter (a.into_iter ());
let b = HashMap::from_iter (b.into_iter ());
assert_eq! (a, b);
}
#[test]
fn store () {
let s = Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 4,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
}),
].into_iter ());
let mut expected_sequence = 0;
assert_eq! (s.list_key_dirs (), vec! [
b"key_dir".to_vec (),
]);
assert_eq! (
s.set (b"key_dir", b"this key is too long and will cause an error".to_vec (), b"bar".to_vec ()),
Err (Error::KeyTooLong)
);
assert_eq! (
s.set (b"key_dir", b"foo".to_vec (), b"this value is too long and will cause an error".to_vec ()),
Err (Error::ValueTooLong)
);
assert_eq! (
s.set (b"invalid_key_dir", b"foo".to_vec (), b"bar".to_vec ()),
Err (Error::NoSuchKeyDir)
);
let ga = s.get_after (b"key_dir", None).unwrap ();
assert_eq! (ga.sequence, expected_sequence);
assert_eq! (ga.tuples, vec! []);
s.set (b"key_dir", b"foo_1".to_vec (), b"bar_1".to_vec ()).unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).unwrap ();
assert_eq! (ga.sequence, expected_sequence);
assert_eq! (ga.tuples, vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
]);
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
]
});
s.set (b"key_dir", b"foo_2".to_vec (), b"bar_2".to_vec ()).unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
]
});
s.set (b"key_dir", b"foo_1".to_vec (), b"bar_3".to_vec ()).unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
]
});
let ga = s.get_after (b"key_dir", Some (2)).unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
]
});
let ga = s.get_after (b"key_dir", Some (3)).unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! []
});
}
}

View File

@ -0,0 +1,87 @@
# Fire-and-forget logs / status key-values
(DMX6CO4G)
## Bottom Line Up Front
A memory-only key-value store for ptth_server. Other processes on the same
system can send data to ptth_server, and it will hold the data in memory
and expose it to ptth_relay.
## Motivation
This feature allows ptth_server to act as a
[sidecar](https://docs.microsoft.com/en-us/azure/architecture/patterns/sidecar)
process to apps and scripts written in C, C++, Bash, or other languages
where networking is difficult.
ptth_server will hold volatile data such as high-detail logs or status data
that is not worth saving to files. This data can then be retrieved by human
users or scrapers through ptth_relay.
This feature is similar to Redis, in some ways.
## Interface with client apps
The lifecycle is this:
- A client app connects to ptth_server via TCP
- The client app sends data to ptth_server
- The client app MAY react to messages from ptth_server
- The TCP connection is closed by ptth_server OR the client app
The client app can send data with these semantics:
- "Status" i.e. "In this key directory, in this key, store this value"
- "Log" i.e. "In this log directory, in this key, store this log string"
Logs and status values do not share namespaces or quotas. ptth_server MUST be
configured with quotas in order to enable this feature. There is no "unlimited"
quota option, but the quotas MAY be set to any usize value.
Keys, values, and log strings are binary-safe, but SHOULD be valid UTF-8.
If they are not valid UTF-8, they MAY not appear in client apps, or they may
appear as errors. ptth_server MUST behave correctly if it receives non-UTF-8
data.
The data should be considered volatile. ptth_server MUST NOT save the data
to disk. The client app SHOULD NOT buffer data internally if ptth_server is
refusing connections or unavailable. Client apps will see the data as
write-only - They cannot retrieve data from ptth_server.
ptth_server does not guarantee that data will be stored for any length of time,
or at all. All data is subject to quotas based on time, byte count, key count,
etc. Logs WILL be evicted in oldest-first order. Status values
replace older values written to the same key. If a key directory reaches
its quota, old values MAY be evicted in any order, including oldest-first,
alphabetical, random, or all at once.
Quotas: ptth_relay is the source of truth for quotas. ptth_server SHOULD NOT
store the quotas on disk, it SHOULD request them from ptth_relay when connecting,
keep them in memory, and periodically refresh them. All quotas are set
per-key-directory. ptth_relay is the source of truth for the list of key
directories.
The quotas are all maximums. The quotas (with recommended value in parens) are:
- Keys in the directory (1,024)
- Bytes for 1 key (1,024)
- Bytes for 1 value (1,024)
- Total payload (i.e. keys and values) bytes (262,144)
- Number of log events per key (2,048)
It's hard to set a perfect limit on RAM usage, so the quotas have many
different dimensions. There is some upper bound on RAM usage, but it will
somewhere above the number of bytes needed to store keys and values alone.
ptth_server MAY send these messages to a client app:
- "Go" - i.e. "Everything is good on my end. If you send data I will try to
store it."
- "Stop" - i.e. "Something is wrong. If you send data I will ignore you."
Unlike HTTP, there is no synchronization or dependency between the two
halves of the TCP connection - ptth_server can send a message at any time,
and the client app can send data at any time. "Go" does not mean that the
server will store the next data sent from the client, because a "Stop" message
could be in-flight.

View File

@ -1,8 +1,10 @@
Interesting issues will get a unique ID with Interesting issues will get a unique ID with
`dd if=/dev/urandom bs=5 count=1 | base32` `dd if=/dev/urandom bs=5 count=1 | base32`
- [DMX6CO4G](issues/2021-01Jan/status-DMX6CO4G.md) fire-and-forget logs / key-value status data
- ptth_tail - ptth_tail
- Scraper `rsync -ru` example - Scraper `rsync -ru` example
- 'fax' like system similar to station307
- (WIP) Dark mode? - (WIP) Dark mode?
- [K5NPHQHP](issues/2020-12Dec/metrics-K5NPHQHP.md) API for metrics + instance data + recent logs on ptth_server - [K5NPHQHP](issues/2020-12Dec/metrics-K5NPHQHP.md) API for metrics + instance data + recent logs on ptth_server
- API for remote mtime - API for remote mtime