🚧 wip: adding http service to ptth_kv

main
_ 2021-01-03 20:57:12 +00:00
parent eb927ef80d
commit 29a6ad20ee
4 changed files with 181 additions and 25 deletions

2
Cargo.lock generated
View File

@ -1660,7 +1660,9 @@ dependencies = [
name = "ptth_kv" name = "ptth_kv"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"base64 0.13.0", "base64 0.13.0",
"hyper",
"thiserror", "thiserror",
"tokio", "tokio",
] ]

View File

@ -5,6 +5,8 @@ authors = ["Trish"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
anyhow = "1.0.34"
base64 = "0.13.0" base64 = "0.13.0"
hyper = "0.13.8"
thiserror = "1.0.22" thiserror = "1.0.22"
tokio = { version = "0.2.22", features = ["full"] } tokio = { version = "0.2.22", features = ["full"] }

View File

@ -4,16 +4,24 @@ use std::{
}, },
iter::FromIterator, iter::FromIterator,
sync::{ sync::{
MutexGuard, Arc,
PoisonError,
}, },
}; };
use hyper::{
Body,
Request,
Response,
StatusCode,
};
use tokio::{ use tokio::{
runtime::Runtime,
sync::Mutex, sync::Mutex,
}; };
pub struct HttpService {
store: Arc <Store>,
}
pub struct Store { pub struct Store {
status_dirs: HashMap <Vec <u8>, StatusKeyDirectory>, status_dirs: HashMap <Vec <u8>, StatusKeyDirectory>,
} }
@ -22,8 +30,6 @@ pub struct Store {
pub enum Error { pub enum Error {
#[error ("key too long")] #[error ("key too long")]
KeyTooLong, KeyTooLong,
#[error ("mutex poisoned")]
MutexPoisoned,
#[error ("no such key dir")] #[error ("no such key dir")]
NoSuchKeyDir, NoSuchKeyDir,
#[error ("value too long")] #[error ("value too long")]
@ -42,6 +48,50 @@ pub struct GetAfter {
pub sequence: u64, pub sequence: u64,
} }
impl HttpService {
pub fn new (s: Store) -> Self {
Self {
store: Arc::new (s),
}
}
pub fn inner (&self) -> &Store {
&*self.store
}
pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> {
use std::net::SocketAddr;
use hyper::{
server::Server,
service::{
make_service_fn,
service_fn,
},
};
let make_svc = make_service_fn (|_conn| {
let state = self.store.clone ();
async {
Ok::<_, String> (service_fn (move |req| {
let state = state.clone ();
Self::handle_all (req, state)
}))
}
});
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let server = Server::bind (&addr)
.serve (make_svc)
;
server.await
}
}
impl Store { impl Store {
pub fn new <I> (status_dirs: I) pub fn new <I> (status_dirs: I)
-> Self -> Self
@ -63,7 +113,7 @@ impl Store {
.collect () .collect ()
} }
pub async fn set (&self, name: &[u8], key: Vec <u8>, value: Vec <u8>) pub async fn set (&self, name: &[u8], key: &[u8], value: Vec <u8>)
-> Result <(), Error> -> Result <(), Error>
{ {
let dir = self.status_dirs.get (name) let dir = self.status_dirs.get (name)
@ -72,6 +122,15 @@ impl Store {
dir.set (key, value).await dir.set (key, value).await
} }
async fn set_multi (&self, name: &[u8], tuples: Vec <(&[u8], Vec <u8>)>)
-> Result <(), Error>
{
let dir = self.status_dirs.get (name)
.ok_or (Error::NoSuchKeyDir)?;
dir.set_multi (tuples).await
}
pub async fn get_after (&self, name: &[u8], thresh: Option <u64>) pub async fn get_after (&self, name: &[u8], thresh: Option <u64>)
-> Result <GetAfter, Error> -> Result <GetAfter, Error>
{ {
@ -84,11 +143,7 @@ impl Store {
// End of public interface // End of public interface
impl <T> From <PoisonError <MutexGuard <'_, T>>> for Error { const SET_BATCH_SIZE: usize = 32;
fn from (_: PoisonError <MutexGuard <'_, T>>) -> Self {
Self::MutexPoisoned
}
}
enum StoreCommand { enum StoreCommand {
SetStatus (SetStatusCommand), SetStatus (SetStatusCommand),
@ -119,6 +174,15 @@ struct SetStatusCommand {
value: Vec <u8>, value: Vec <u8>,
} }
impl HttpService {
async fn handle_all (req: Request <Body>, store: Arc <Store>)
-> Result <Response <Body>, anyhow::Error>
{
Ok (Response::builder ()
.body (Body::from ("hello\n"))?)
}
}
impl StatusKeyDirectory { impl StatusKeyDirectory {
fn new (quotas: StatusQuotas) -> Self { fn new (quotas: StatusQuotas) -> Self {
Self { Self {
@ -127,7 +191,7 @@ impl StatusKeyDirectory {
} }
} }
async fn set (&self, key: Vec <u8>, value: Vec <u8>) -> Result <(), Error> async fn set (&self, key: &[u8], value: Vec <u8>) -> Result <(), Error>
{ {
if key.len () > self.quotas.max_key_bytes { if key.len () > self.quotas.max_key_bytes {
return Err (Error::KeyTooLong); return Err (Error::KeyTooLong);
@ -144,6 +208,17 @@ impl StatusKeyDirectory {
Ok (()) Ok (())
} }
async fn set_multi (&self, tuples: Vec <(&[u8], Vec <u8>)>) -> Result <(), Error>
{
{
let mut guard = self.table.lock ().await;
for (key, value) in tuples {
guard.set (&self.quotas, key, value);
}
}
Ok (())
}
async fn get_after (&self, thresh: Option <u64>) -> Result <GetAfter, Error> { async fn get_after (&self, thresh: Option <u64>) -> Result <GetAfter, Error> {
let guard = self.table.lock ().await; let guard = self.table.lock ().await;
Ok (guard.get_after (thresh)) Ok (guard.get_after (thresh))
@ -157,7 +232,7 @@ impl StatusTable {
.sum () .sum ()
} }
fn set (&mut self, quotas: &StatusQuotas, key: Vec <u8>, value: Vec <u8>) { fn set (&mut self, quotas: &StatusQuotas, key: &[u8], value: Vec <u8>) {
self.sequence += 1; self.sequence += 1;
if self.map.len () > quotas.max_keys { if self.map.len () > quotas.max_keys {
@ -175,7 +250,13 @@ impl StatusTable {
sequence: self.sequence, sequence: self.sequence,
}; };
self.map.insert (key, value); // self.map.insert (key, value);
match self.map.get_mut (key) {
None => {
self.map.insert (key.to_vec (), value);
},
Some (v) => *v = value,
}
} }
fn get_after (&self, thresh: Option <u64>) -> GetAfter { fn get_after (&self, thresh: Option <u64>) -> GetAfter {
@ -204,12 +285,24 @@ impl StatusValue {
} }
} }
fn main () { #[tokio::main]
println! ("Hello, world!"); async fn main () -> Result <(), hyper::Error> {
let service = HttpService::new (Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 4,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
}),
].into_iter ()));
service.serve (4003).await
} }
#[cfg (test)] #[cfg (test)]
mod tests { mod tests {
use tokio::runtime::Runtime;
use super::*; use super::*;
fn get_after_eq (a: &GetAfter, b: &GetAfter) { fn get_after_eq (a: &GetAfter, b: &GetAfter) {
@ -244,15 +337,15 @@ mod tests {
]); ]);
assert_eq! ( assert_eq! (
s.set (b"key_dir", b"this key is too long and will cause an error".to_vec (), b"bar".to_vec ()).await, s.set (b"key_dir", b"this key is too long and will cause an error", b"bar".to_vec ()).await,
Err (Error::KeyTooLong) Err (Error::KeyTooLong)
); );
assert_eq! ( assert_eq! (
s.set (b"key_dir", b"foo".to_vec (), b"this value is too long and will cause an error".to_vec ()).await, s.set (b"key_dir", b"foo", b"this value is too long and will cause an error".to_vec ()).await,
Err (Error::ValueTooLong) Err (Error::ValueTooLong)
); );
assert_eq! ( assert_eq! (
s.set (b"invalid_key_dir", b"foo".to_vec (), b"bar".to_vec ()).await, s.set (b"invalid_key_dir", b"foo", b"bar".to_vec ()).await,
Err (Error::NoSuchKeyDir) Err (Error::NoSuchKeyDir)
); );
@ -260,7 +353,7 @@ mod tests {
assert_eq! (ga.sequence, expected_sequence); assert_eq! (ga.sequence, expected_sequence);
assert_eq! (ga.tuples, vec! []); assert_eq! (ga.tuples, vec! []);
s.set (b"key_dir", b"foo_1".to_vec (), b"bar_1".to_vec ()).await.unwrap (); s.set (b"key_dir", b"foo_1", b"bar_1".to_vec ()).await.unwrap ();
expected_sequence += 1; expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).await.unwrap (); let ga = s.get_after (b"key_dir", None).await.unwrap ();
@ -276,7 +369,7 @@ mod tests {
] ]
}); });
s.set (b"key_dir", b"foo_2".to_vec (), b"bar_2".to_vec ()).await.unwrap (); s.set (b"key_dir", b"foo_2", b"bar_2".to_vec ()).await.unwrap ();
expected_sequence += 1; expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).await.unwrap (); let ga = s.get_after (b"key_dir", None).await.unwrap ();
@ -288,7 +381,7 @@ mod tests {
] ]
}); });
s.set (b"key_dir", b"foo_1".to_vec (), b"bar_3".to_vec ()).await.unwrap (); s.set (b"key_dir", b"foo_1", b"bar_3".to_vec ()).await.unwrap ();
expected_sequence += 1; expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).await.unwrap (); let ga = s.get_after (b"key_dir", None).await.unwrap ();
@ -334,14 +427,14 @@ mod tests {
let num_iters = 1_000_000; let num_iters = 1_000_000;
let key = b"foo".to_vec (); let key = b"foo";
let start_time = Instant::now (); let start_time = Instant::now ();
for i in 0..num_iters { for i in 0..num_iters {
let value = format! ("{}", i); let value = format! ("{}", i);
s.set (b"key_dir", key.clone (), value.into ()).await.unwrap (); s.set (b"key_dir", key, value.into ()).await.unwrap ();
} }
let end_time = Instant::now (); let end_time = Instant::now ();
@ -349,7 +442,52 @@ mod tests {
let avg_nanos = total_dur.as_nanos () / num_iters; let avg_nanos = total_dur.as_nanos () / num_iters;
assert! (avg_nanos < 300, dbg! (avg_nanos)); assert! (avg_nanos < 250, dbg! (avg_nanos));
});
}
#[test]
#[cfg (not (debug_assertions))]
fn perf_multi () {
use std::time::Instant;
let mut rt = Runtime::new ().unwrap ();
rt.block_on (async {
let s = Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 8,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
}),
].into_iter ());
let num_iters = 1_000_000;
let start_time = Instant::now ();
for i in 0..num_iters {
let value = Vec::<u8>::from (format! ("{}", i));
let tuples = vec! [
(&b"foo_0"[..], value.clone ()),
(b"foo_1", value.clone ()),
(b"foo_2", value.clone ()),
(b"foo_3", value.clone ()),
(b"foo_4", value.clone ()),
(b"foo_5", value.clone ()),
(b"foo_6", value.clone ()),
(b"foo_7", value.clone ()),
];
s.set_multi (b"key_dir", tuples).await.unwrap ();
}
let end_time = Instant::now ();
let total_dur = end_time - start_time;
let avg_nanos = total_dur.as_nanos () / (num_iters * 8);
assert! (avg_nanos < 150, dbg! (avg_nanos));
}); });
} }
} }

View File

@ -10,3 +10,17 @@ Sizes:
- ptth_server: 14 MB - ptth_server: 14 MB
- ptth_relay gzipped: 3 MB - ptth_relay gzipped: 3 MB
- ptth_server gzipped: 5 MB - ptth_server gzipped: 5 MB
Git commit ???
Sizes:
- ptth_kv: 5.5M
- ptth_relay: 11M
- ptth_server: 14M
Gzipped sizes:
- ptth_kv: 1.6 MB
- ptth_relay: 3.3 MB
- ptth_server: 4.9 MB