2021-01-03 18:09:00 +00:00
|
|
|
use std::{
|
|
|
|
collections::{
|
|
|
|
HashMap,
|
|
|
|
},
|
|
|
|
iter::FromIterator,
|
|
|
|
sync::{
|
|
|
|
MutexGuard,
|
|
|
|
PoisonError,
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
use tokio::{
|
|
|
|
runtime::Runtime,
|
|
|
|
sync::Mutex,
|
|
|
|
};
|
|
|
|
|
2021-01-03 18:09:00 +00:00
|
|
|
pub struct Store {
|
|
|
|
status_dirs: HashMap <Vec <u8>, StatusKeyDirectory>,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive (thiserror::Error, Debug, PartialEq)]
|
|
|
|
pub enum Error {
|
|
|
|
#[error ("key too long")]
|
|
|
|
KeyTooLong,
|
|
|
|
#[error ("mutex poisoned")]
|
|
|
|
MutexPoisoned,
|
|
|
|
#[error ("no such key dir")]
|
|
|
|
NoSuchKeyDir,
|
|
|
|
#[error ("value too long")]
|
|
|
|
ValueTooLong,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct StatusQuotas {
|
|
|
|
pub max_keys: usize,
|
|
|
|
pub max_key_bytes: usize,
|
|
|
|
pub max_value_bytes: usize,
|
|
|
|
pub max_payload_bytes: usize,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct GetAfter {
|
|
|
|
pub tuples: Vec <(Vec <u8>, Vec <u8>)>,
|
|
|
|
pub sequence: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Store {
|
|
|
|
pub fn new <I> (status_dirs: I)
|
|
|
|
-> Self
|
|
|
|
where I: Iterator <Item = (Vec <u8>, StatusQuotas)>
|
|
|
|
{
|
|
|
|
let status_dirs = HashMap::from_iter (
|
|
|
|
status_dirs
|
|
|
|
.map (|(name, quotas)| (name, StatusKeyDirectory::new (quotas)))
|
|
|
|
);
|
|
|
|
|
|
|
|
Self {
|
|
|
|
status_dirs,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn list_key_dirs (&self) -> Vec <Vec <u8>> {
|
|
|
|
self.status_dirs.iter ()
|
|
|
|
.map (|(k, _)| k.clone ())
|
|
|
|
.collect ()
|
|
|
|
}
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
pub async fn set (&self, name: &[u8], key: Vec <u8>, value: Vec <u8>)
|
2021-01-03 18:09:00 +00:00
|
|
|
-> Result <(), Error>
|
|
|
|
{
|
|
|
|
let dir = self.status_dirs.get (name)
|
|
|
|
.ok_or (Error::NoSuchKeyDir)?;
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
dir.set (key, value).await
|
2021-01-03 18:09:00 +00:00
|
|
|
}
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
pub async fn get_after (&self, name: &[u8], thresh: Option <u64>)
|
2021-01-03 18:09:00 +00:00
|
|
|
-> Result <GetAfter, Error>
|
|
|
|
{
|
|
|
|
let dir = self.status_dirs.get (name)
|
|
|
|
.ok_or (Error::NoSuchKeyDir)?;
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
dir.get_after (thresh).await
|
2021-01-03 18:09:00 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// End of public interface
|
|
|
|
|
|
|
|
impl <T> From <PoisonError <MutexGuard <'_, T>>> for Error {
|
|
|
|
fn from (_: PoisonError <MutexGuard <'_, T>>) -> Self {
|
|
|
|
Self::MutexPoisoned
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
enum StoreCommand {
|
|
|
|
SetStatus (SetStatusCommand),
|
|
|
|
Multi (Vec <StoreCommand>),
|
|
|
|
}
|
|
|
|
|
|
|
|
struct StatusKeyDirectory {
|
|
|
|
quotas: StatusQuotas,
|
|
|
|
|
|
|
|
// TODO: Make this tokio::sync::Mutex.
|
|
|
|
table: Mutex <StatusTable>,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive (Default)]
|
|
|
|
struct StatusTable {
|
|
|
|
map: HashMap <Vec <u8>, StatusValue>,
|
|
|
|
sequence: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
struct StatusValue {
|
|
|
|
value: Vec <u8>,
|
|
|
|
sequence: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
struct SetStatusCommand {
|
|
|
|
key_dir: Vec <u8>,
|
|
|
|
key: Vec <u8>,
|
|
|
|
value: Vec <u8>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl StatusKeyDirectory {
|
|
|
|
fn new (quotas: StatusQuotas) -> Self {
|
|
|
|
Self {
|
|
|
|
quotas,
|
|
|
|
table: Mutex::new (Default::default ()),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
async fn set (&self, key: Vec <u8>, value: Vec <u8>) -> Result <(), Error>
|
2021-01-03 18:09:00 +00:00
|
|
|
{
|
|
|
|
if key.len () > self.quotas.max_key_bytes {
|
|
|
|
return Err (Error::KeyTooLong);
|
|
|
|
}
|
|
|
|
|
|
|
|
if value.len () > self.quotas.max_value_bytes {
|
|
|
|
return Err (Error::ValueTooLong);
|
|
|
|
}
|
|
|
|
|
|
|
|
{
|
2021-01-03 20:05:05 +00:00
|
|
|
let mut guard = self.table.lock ().await;
|
2021-01-03 18:09:00 +00:00
|
|
|
guard.set (&self.quotas, key, value);
|
|
|
|
}
|
|
|
|
Ok (())
|
|
|
|
}
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
async fn get_after (&self, thresh: Option <u64>) -> Result <GetAfter, Error> {
|
|
|
|
let guard = self.table.lock ().await;
|
2021-01-03 18:09:00 +00:00
|
|
|
Ok (guard.get_after (thresh))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl StatusTable {
|
|
|
|
fn payload_bytes (&self) -> usize {
|
|
|
|
self.map.iter ()
|
|
|
|
.map (|(k, v)| k.len () + v.len ())
|
|
|
|
.sum ()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn set (&mut self, quotas: &StatusQuotas, key: Vec <u8>, value: Vec <u8>) {
|
|
|
|
self.sequence += 1;
|
|
|
|
|
|
|
|
if self.map.len () > quotas.max_keys {
|
|
|
|
self.map.clear ();
|
|
|
|
}
|
|
|
|
|
|
|
|
let new_bytes = key.len () + value.len ();
|
|
|
|
|
|
|
|
if self.payload_bytes () + new_bytes > quotas.max_payload_bytes {
|
|
|
|
self.map.clear ();
|
|
|
|
}
|
|
|
|
|
|
|
|
let value = StatusValue {
|
|
|
|
value,
|
|
|
|
sequence: self.sequence,
|
|
|
|
};
|
|
|
|
|
|
|
|
self.map.insert (key, value);
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_after (&self, thresh: Option <u64>) -> GetAfter {
|
|
|
|
let thresh = thresh.unwrap_or (0);
|
|
|
|
|
|
|
|
let tuples = self.map.iter ()
|
|
|
|
.filter_map (|(key, value)| {
|
|
|
|
if value.sequence <= thresh {
|
|
|
|
return None;
|
|
|
|
}
|
|
|
|
|
|
|
|
Some ((key.clone (), value.value.clone ()))
|
|
|
|
})
|
|
|
|
.collect ();
|
|
|
|
|
|
|
|
GetAfter {
|
|
|
|
tuples,
|
|
|
|
sequence: self.sequence,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl StatusValue {
|
|
|
|
fn len (&self) -> usize {
|
|
|
|
self.value.len ()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn main () {
|
|
|
|
println! ("Hello, world!");
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg (test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
fn get_after_eq (a: &GetAfter, b: &GetAfter) {
|
|
|
|
assert_eq! (a.sequence, b.sequence);
|
|
|
|
|
|
|
|
let a = a.tuples.clone ();
|
|
|
|
let b = b.tuples.clone ();
|
|
|
|
|
|
|
|
let a = HashMap::<Vec <u8>, Vec <u8>>::from_iter (a.into_iter ());
|
|
|
|
let b = HashMap::from_iter (b.into_iter ());
|
|
|
|
|
|
|
|
assert_eq! (a, b);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn store () {
|
2021-01-03 20:05:05 +00:00
|
|
|
let mut rt = Runtime::new ().unwrap ();
|
|
|
|
rt.block_on (async {
|
2021-01-03 18:09:00 +00:00
|
|
|
let s = Store::new (vec! [
|
|
|
|
(b"key_dir".to_vec (), StatusQuotas {
|
|
|
|
max_keys: 4,
|
|
|
|
max_key_bytes: 16,
|
|
|
|
max_value_bytes: 16,
|
|
|
|
max_payload_bytes: 128,
|
|
|
|
}),
|
|
|
|
].into_iter ());
|
|
|
|
|
|
|
|
let mut expected_sequence = 0;
|
|
|
|
|
|
|
|
assert_eq! (s.list_key_dirs (), vec! [
|
|
|
|
b"key_dir".to_vec (),
|
|
|
|
]);
|
|
|
|
|
|
|
|
assert_eq! (
|
2021-01-03 20:05:05 +00:00
|
|
|
s.set (b"key_dir", b"this key is too long and will cause an error".to_vec (), b"bar".to_vec ()).await,
|
2021-01-03 18:09:00 +00:00
|
|
|
Err (Error::KeyTooLong)
|
|
|
|
);
|
|
|
|
assert_eq! (
|
2021-01-03 20:05:05 +00:00
|
|
|
s.set (b"key_dir", b"foo".to_vec (), b"this value is too long and will cause an error".to_vec ()).await,
|
2021-01-03 18:09:00 +00:00
|
|
|
Err (Error::ValueTooLong)
|
|
|
|
);
|
|
|
|
assert_eq! (
|
2021-01-03 20:05:05 +00:00
|
|
|
s.set (b"invalid_key_dir", b"foo".to_vec (), b"bar".to_vec ()).await,
|
2021-01-03 18:09:00 +00:00
|
|
|
Err (Error::NoSuchKeyDir)
|
|
|
|
);
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
let ga = s.get_after (b"key_dir", None).await.unwrap ();
|
2021-01-03 18:09:00 +00:00
|
|
|
assert_eq! (ga.sequence, expected_sequence);
|
|
|
|
assert_eq! (ga.tuples, vec! []);
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
s.set (b"key_dir", b"foo_1".to_vec (), b"bar_1".to_vec ()).await.unwrap ();
|
2021-01-03 18:09:00 +00:00
|
|
|
expected_sequence += 1;
|
2021-01-03 20:05:05 +00:00
|
|
|
let ga = s.get_after (b"key_dir", None).await.unwrap ();
|
2021-01-03 18:09:00 +00:00
|
|
|
|
|
|
|
assert_eq! (ga.sequence, expected_sequence);
|
|
|
|
assert_eq! (ga.tuples, vec! [
|
|
|
|
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
|
|
|
|
]);
|
|
|
|
|
|
|
|
get_after_eq (&ga, &GetAfter {
|
|
|
|
sequence: expected_sequence,
|
|
|
|
tuples: vec! [
|
|
|
|
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
|
|
|
|
]
|
|
|
|
});
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
s.set (b"key_dir", b"foo_2".to_vec (), b"bar_2".to_vec ()).await.unwrap ();
|
2021-01-03 18:09:00 +00:00
|
|
|
expected_sequence += 1;
|
2021-01-03 20:05:05 +00:00
|
|
|
let ga = s.get_after (b"key_dir", None).await.unwrap ();
|
2021-01-03 18:09:00 +00:00
|
|
|
|
|
|
|
get_after_eq (&ga, &GetAfter {
|
|
|
|
sequence: expected_sequence,
|
|
|
|
tuples: vec! [
|
|
|
|
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
|
|
|
|
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
|
|
|
|
]
|
|
|
|
});
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
s.set (b"key_dir", b"foo_1".to_vec (), b"bar_3".to_vec ()).await.unwrap ();
|
2021-01-03 18:09:00 +00:00
|
|
|
expected_sequence += 1;
|
2021-01-03 20:05:05 +00:00
|
|
|
let ga = s.get_after (b"key_dir", None).await.unwrap ();
|
2021-01-03 18:09:00 +00:00
|
|
|
|
|
|
|
get_after_eq (&ga, &GetAfter {
|
|
|
|
sequence: expected_sequence,
|
|
|
|
tuples: vec! [
|
|
|
|
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
|
|
|
|
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
|
|
|
|
]
|
|
|
|
});
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
let ga = s.get_after (b"key_dir", Some (2)).await.unwrap ();
|
2021-01-03 18:09:00 +00:00
|
|
|
get_after_eq (&ga, &GetAfter {
|
|
|
|
sequence: expected_sequence,
|
|
|
|
tuples: vec! [
|
|
|
|
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
|
|
|
|
]
|
|
|
|
});
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
let ga = s.get_after (b"key_dir", Some (3)).await.unwrap ();
|
2021-01-03 18:09:00 +00:00
|
|
|
get_after_eq (&ga, &GetAfter {
|
|
|
|
sequence: expected_sequence,
|
|
|
|
tuples: vec! []
|
|
|
|
});
|
2021-01-03 20:05:05 +00:00
|
|
|
});
|
2021-01-03 18:09:00 +00:00
|
|
|
}
|
2021-01-03 19:55:45 +00:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
#[cfg (not (debug_assertions))]
|
|
|
|
fn perf () {
|
|
|
|
use std::time::Instant;
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
let mut rt = Runtime::new ().unwrap ();
|
|
|
|
rt.block_on (async {
|
2021-01-03 19:55:45 +00:00
|
|
|
let s = Store::new (vec! [
|
|
|
|
(b"key_dir".to_vec (), StatusQuotas {
|
|
|
|
max_keys: 4,
|
|
|
|
max_key_bytes: 16,
|
|
|
|
max_value_bytes: 16,
|
|
|
|
max_payload_bytes: 128,
|
|
|
|
}),
|
|
|
|
].into_iter ());
|
|
|
|
|
|
|
|
let num_iters = 1_000_000;
|
|
|
|
|
|
|
|
let key = b"foo".to_vec ();
|
|
|
|
|
|
|
|
let start_time = Instant::now ();
|
|
|
|
|
|
|
|
for i in 0..num_iters {
|
|
|
|
let value = format! ("{}", i);
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
s.set (b"key_dir", key.clone (), value.into ()).await.unwrap ();
|
2021-01-03 19:55:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
let end_time = Instant::now ();
|
|
|
|
let total_dur = end_time - start_time;
|
|
|
|
|
|
|
|
let avg_nanos = total_dur.as_nanos () / num_iters;
|
|
|
|
|
2021-01-03 20:05:05 +00:00
|
|
|
assert! (avg_nanos < 300, dbg! (avg_nanos));
|
|
|
|
});
|
2021-01-03 19:55:45 +00:00
|
|
|
}
|
2021-01-03 18:09:00 +00:00
|
|
|
}
|