💥 remove un-finished stuff and clear up `cargo check` for the PTTH_QUIC GUI

main
_ 2023-08-31 19:03:52 -05:00
parent dfc6885b8c
commit 25a8a035b9
7 changed files with 4 additions and 647 deletions

11
Cargo.lock generated
View File

@ -1317,17 +1317,6 @@ dependencies = [
"uom",
]
[[package]]
name = "ptth_kv"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.13.1",
"hyper",
"thiserror",
"tokio",
]
[[package]]
name = "ptth_multi_call_server"
version = "1.1.1"

View File

@ -1,12 +0,0 @@
[package]
name = "ptth_kv"
version = "0.1.0"
authors = ["Trish"]
edition = "2018"
[dependencies]
anyhow = "1.0.38"
base64 = "0.13.0"
hyper = { version = "0.14.4", features = ["full"] }
thiserror = "1.0.22"
tokio = { version = "1.8.1", features = ["full"] }

View File

@ -1,499 +0,0 @@
use std::{
collections::{
HashMap,
},
iter::FromIterator,
sync::{
Arc,
},
};
use hyper::{
Body,
Request,
Response,
StatusCode,
};
use tokio::{
sync::Mutex,
};
pub struct HttpService {
store: Arc <Store>,
}
pub struct Store {
status_dirs: HashMap <Vec <u8>, StatusKeyDirectory>,
}
#[derive (thiserror::Error, Debug, PartialEq)]
pub enum Error {
#[error ("key too long")]
KeyTooLong,
#[error ("no such key dir")]
NoSuchKeyDir,
#[error ("value too long")]
ValueTooLong,
}
pub struct StatusQuotas {
pub max_keys: usize,
pub max_key_bytes: usize,
pub max_value_bytes: usize,
pub max_payload_bytes: usize,
}
pub struct GetAfter {
pub tuples: Vec <(Vec <u8>, Vec <u8>)>,
pub sequence: u64,
}
impl HttpService {
pub fn new (s: Store) -> Self {
Self {
store: Arc::new (s),
}
}
pub fn inner (&self) -> &Store {
&*self.store
}
pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> {
use std::net::SocketAddr;
use hyper::{
Server,
service::{
make_service_fn,
service_fn,
},
};
let make_svc = make_service_fn (|_conn| {
let state = self.store.clone ();
async {
Ok::<_, String> (service_fn (move |req| {
let state = state.clone ();
Self::handle_all (req, state)
}))
}
});
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let server = Server::bind (&addr)
.serve (make_svc)
;
server.await
}
}
impl Store {
pub fn new <I> (status_dirs: I)
-> Self
where I: Iterator <Item = (Vec <u8>, StatusQuotas)>
{
let status_dirs = status_dirs
.map (|(name, quotas)| (name, StatusKeyDirectory::new (quotas)))
.collect ();
Self {
status_dirs,
}
}
pub fn list_key_dirs (&self) -> Vec <Vec <u8>> {
self.status_dirs.iter ()
.map (|(k, _)| k.clone ())
.collect ()
}
pub async fn set (&self, name: &[u8], key: &[u8], value: Vec <u8>)
-> Result <(), Error>
{
let dir = self.status_dirs.get (name)
.ok_or (Error::NoSuchKeyDir)?;
dir.set (key, value).await
}
async fn set_multi (&self, name: &[u8], tuples: Vec <(&[u8], Vec <u8>)>)
-> Result <(), Error>
{
let dir = self.status_dirs.get (name)
.ok_or (Error::NoSuchKeyDir)?;
dir.set_multi (tuples).await
}
pub async fn get_after (&self, name: &[u8], thresh: Option <u64>)
-> Result <GetAfter, Error>
{
let dir = self.status_dirs.get (name)
.ok_or (Error::NoSuchKeyDir)?;
dir.get_after (thresh).await
}
}
// End of public interface
const SET_BATCH_SIZE: usize = 32;
enum StoreCommand {
SetStatus (SetStatusCommand),
Multi (Vec <StoreCommand>),
}
struct StatusKeyDirectory {
quotas: StatusQuotas,
// TODO: Make this tokio::sync::Mutex.
table: Mutex <StatusTable>,
}
#[derive (Default)]
struct StatusTable {
map: HashMap <Vec <u8>, StatusValue>,
sequence: u64,
}
struct StatusValue {
value: Vec <u8>,
sequence: u64,
}
struct SetStatusCommand {
key_dir: Vec <u8>,
key: Vec <u8>,
value: Vec <u8>,
}
impl HttpService {
async fn handle_all (req: Request <Body>, store: Arc <Store>)
-> Result <Response <Body>, anyhow::Error>
{
Ok (Response::builder ()
.body (Body::from ("hello\n"))?)
}
}
impl StatusKeyDirectory {
fn new (quotas: StatusQuotas) -> Self {
Self {
quotas,
table: Mutex::new (Default::default ()),
}
}
async fn set (&self, key: &[u8], value: Vec <u8>) -> Result <(), Error>
{
if key.len () > self.quotas.max_key_bytes {
return Err (Error::KeyTooLong);
}
if value.len () > self.quotas.max_value_bytes {
return Err (Error::ValueTooLong);
}
{
let mut guard = self.table.lock ().await;
guard.set (&self.quotas, key, value);
}
Ok (())
}
async fn set_multi (&self, tuples: Vec <(&[u8], Vec <u8>)>) -> Result <(), Error>
{
{
let mut guard = self.table.lock ().await;
for (key, value) in tuples {
guard.set (&self.quotas, key, value);
}
}
Ok (())
}
async fn get_after (&self, thresh: Option <u64>) -> Result <GetAfter, Error> {
let guard = self.table.lock ().await;
Ok (guard.get_after (thresh))
}
}
impl StatusTable {
fn payload_bytes (&self) -> usize {
self.map.iter ()
.map (|(k, v)| k.len () + v.len ())
.sum ()
}
fn set (&mut self, quotas: &StatusQuotas, key: &[u8], value: Vec <u8>) {
self.sequence += 1;
if self.map.len () > quotas.max_keys {
self.map.clear ();
}
let new_bytes = key.len () + value.len ();
if self.payload_bytes () + new_bytes > quotas.max_payload_bytes {
self.map.clear ();
}
let value = StatusValue {
value,
sequence: self.sequence,
};
// self.map.insert (key, value);
match self.map.get_mut (key) {
None => {
self.map.insert (key.to_vec (), value);
},
Some (v) => *v = value,
}
}
fn get_after (&self, thresh: Option <u64>) -> GetAfter {
let thresh = thresh.unwrap_or (0);
let tuples = self.map.iter ()
.filter_map (|(key, value)| {
if value.sequence <= thresh {
return None;
}
Some ((key.clone (), value.value.clone ()))
})
.collect ();
GetAfter {
tuples,
sequence: self.sequence,
}
}
}
impl StatusValue {
fn len (&self) -> usize {
self.value.len ()
}
}
#[tokio::main]
async fn main () -> Result <(), hyper::Error> {
use std::time::Duration;
use tokio::{
spawn,
time::interval,
};
let service = HttpService::new (Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 4,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
}),
].into_iter ()));
service.serve (4003).await
}
#[cfg (test)]
mod tests {
use tokio::runtime::Runtime;
use super::*;
fn get_after_eq (a: &GetAfter, b: &GetAfter) {
assert_eq! (a.sequence, b.sequence);
let a = a.tuples.clone ();
let b = b.tuples.clone ();
let a = HashMap::<Vec <u8>, Vec <u8>>::from_iter (a.into_iter ());
let b = HashMap::from_iter (b.into_iter ());
assert_eq! (a, b);
}
#[test]
fn store () {
let rt = Runtime::new ().unwrap ();
rt.block_on (async {
let s = Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 4,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
}),
].into_iter ());
let mut expected_sequence = 0;
assert_eq! (s.list_key_dirs (), vec! [
b"key_dir".to_vec (),
]);
assert_eq! (
s.set (b"key_dir", b"this key is too long and will cause an error", b"bar".to_vec ()).await,
Err (Error::KeyTooLong)
);
assert_eq! (
s.set (b"key_dir", b"foo", b"this value is too long and will cause an error".to_vec ()).await,
Err (Error::ValueTooLong)
);
assert_eq! (
s.set (b"invalid_key_dir", b"foo", b"bar".to_vec ()).await,
Err (Error::NoSuchKeyDir)
);
let ga = s.get_after (b"key_dir", None).await.unwrap ();
assert_eq! (ga.sequence, expected_sequence);
assert_eq! (ga.tuples, vec! []);
s.set (b"key_dir", b"foo_1", b"bar_1".to_vec ()).await.unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).await.unwrap ();
assert_eq! (ga.sequence, expected_sequence);
assert_eq! (ga.tuples, vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
]);
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
]
});
s.set (b"key_dir", b"foo_2", b"bar_2".to_vec ()).await.unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).await.unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
]
});
s.set (b"key_dir", b"foo_1", b"bar_3".to_vec ()).await.unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).await.unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
]
});
let ga = s.get_after (b"key_dir", Some (2)).await.unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
]
});
let ga = s.get_after (b"key_dir", Some (3)).await.unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! []
});
});
}
#[test]
#[cfg (not (debug_assertions))]
fn perf () {
use std::time::Instant;
let rt = Runtime::new ().unwrap ();
rt.block_on (async {
let s = Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 4,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
}),
].into_iter ());
let num_iters = 1_000_000;
let key = b"foo";
let start_time = Instant::now ();
for i in 0..num_iters {
let value = format! ("{}", i);
s.set (b"key_dir", key, value.into ()).await.unwrap ();
}
let end_time = Instant::now ();
let total_dur = end_time - start_time;
let avg_nanos = total_dur.as_nanos () / num_iters;
assert! (avg_nanos < 250, dbg! (avg_nanos));
});
}
#[test]
#[cfg (not (debug_assertions))]
fn perf_multi () {
use std::time::Instant;
let rt = Runtime::new ().unwrap ();
rt.block_on (async {
let s = Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 8,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
}),
].into_iter ());
let num_iters = 1_000_000;
let start_time = Instant::now ();
for i in 0..num_iters {
let value = Vec::<u8>::from (format! ("{}", i));
let tuples = vec! [
(&b"foo_0"[..], value.clone ()),
(b"foo_1", value.clone ()),
(b"foo_2", value.clone ()),
(b"foo_3", value.clone ()),
(b"foo_4", value.clone ()),
(b"foo_5", value.clone ()),
(b"foo_6", value.clone ()),
(b"foo_7", value.clone ()),
];
s.set_multi (b"key_dir", tuples).await.unwrap ();
}
let end_time = Instant::now ();
let total_dur = end_time - start_time;
let avg_nanos = total_dur.as_nanos () / (num_iters * 8);
assert! (avg_nanos < 150, dbg! (avg_nanos));
});
}
}

View File

@ -1,6 +1,5 @@
use structopt::StructOpt;
use tokio::{
net::UdpSocket,
sync::watch,
};
@ -96,34 +95,7 @@ impl P2Client {
})
};
if false {
let task_direc_connect = {
let connection = connection.clone ();
tokio::spawn (async move {
let cookie = protocol::p2_direc_to_p4 (
&connection,
"bogus_server",
).await?;
let sock = UdpSocket::bind ("0.0.0.0:0").await?;
let mut interval = tokio::time::interval (Duration::from_millis (1000));
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Delay);
loop {
interval.tick ().await;
sock.send_to(&cookie [..], "127.0.0.1:30379").await?;
debug! ("P2 sent cookie to P3 over plain UDP");
}
Ok::<_, anyhow::Error> (())
})
};
}
task_tcp_server.await??;
//task_direc_connect.await??;
Ok (())
}

View File

@ -94,7 +94,6 @@ impl Opt {
pub struct P4EndServer {
conf: Config,
conn: quinn::Connection,
endpoint: quinn::Endpoint,
shutdown_rx: watch::Receiver <bool>,
}
@ -142,7 +141,6 @@ impl P4EndServer {
Ok ((P4EndServer {
conf,
conn,
endpoint,
shutdown_rx,
}, shutdown_tx))
}

View File

@ -10,9 +10,6 @@ use hyper::{
StatusCode,
};
use structopt::StructOpt;
use tokio::{
net::UdpSocket,
};
use crate::prelude::*;
use protocol::PeerId;
@ -87,9 +84,9 @@ impl App {
pub async fn run (self) -> anyhow::Result <()> {
let Self {
endpoint,
listen_addr,
listen_addr: _,
metrics,
server_cert,
server_cert: _,
tcp_listener,
} = self;
@ -158,35 +155,6 @@ impl App {
})
};
let task_direc_server = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
let sock = UdpSocket::bind("0.0.0.0:30379").await?;
let mut buf = [0; 2048];
loop {
let (len, addr) = sock.recv_from (&mut buf).await?;
debug! ("{:?} bytes received from {:?}", len, addr);
let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x));
{
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
if let Some (direc_state) = direc_cookies.remove (&packet) {
debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id);
direc_state.p2_addr.send (addr).ok ();
}
else {
debug! ("UDP packet didn't match any PTTH_DIREC cookie");
}
}
}
Ok::<_, anyhow::Error> (())
})
};
let task_http_server = tokio::spawn (async move {
http_server.serve (make_svc).await?;
Ok::<_, anyhow::Error> (())
@ -225,9 +193,6 @@ impl App {
_val = task_http_server => {
eprintln! ("HTTP server exited, exiting");
},
_val = task_direc_server => {
eprintln! ("PTTH_DIREC server exited, exiting");
},
}
Ok (())
@ -256,7 +221,6 @@ async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
struct RelayState {
config: arc_swap::ArcSwap <Config>,
p4_server_proxies: Mutex <HashMap <PeerId, P4State>>,
direc_cookies: Mutex <HashMap <Vec <u8>, DirecState>>,
metrics: Arc <RwLock <Metrics>>,
stats: Stats,
http_client: reqwest::Client,
@ -265,7 +229,6 @@ struct RelayState {
#[derive (Default)]
struct Config {
ip_nicknames: BTreeMap <[u8; 4], String>,
tcp_listen_port: Option <u16>,
webhook_url: Option <String>,
}
@ -273,7 +236,6 @@ impl From <ConfigFile> for Config {
fn from (x: ConfigFile) -> Self {
Self {
ip_nicknames: x.ip_nicknames.into_iter ().collect (),
tcp_listen_port: x.tcp_listen_port,
webhook_url: x.webhook_url,
}
}
@ -286,12 +248,6 @@ struct ConfigFile {
webhook_url: Option <String>,
}
struct DirecState {
start_time: Instant,
p2_id: PeerId,
p2_addr: tokio::sync::oneshot::Sender <SocketAddr>,
}
#[derive (Default)]
struct Stats {
quic: ConnectEvents,
@ -513,19 +469,7 @@ async fn handle_p2_connection (
recv
).await?
},
protocol::P2ToP3Stream::DirecP2ToP4 {
server_id,
cookie,
} => {
handle_direc_p2_to_p4 (
relay_state,
client_id,
server_id,
cookie,
send,
recv
).await?
},
_ => (),
}
debug! ("Request ended for P2");
@ -569,41 +513,6 @@ async fn handle_request_p2_to_p4 (
Ok (())
}
async fn handle_direc_p2_to_p4 (
relay_state: Arc <RelayState>,
client_id: String,
server_id: PeerId,
cookie: Vec <u8>,
mut client_send: quinn::SendStream,
client_recv: quinn::RecvStream,
) -> anyhow::Result <()>
{
debug! ("P2 {} wants a P2P connection to P4 {}", client_id, server_id);
// TODO: Check authorization
protocol::p3_authorize_p2_to_p4_direc (&mut client_send).await?;
let (tx, rx) = tokio::sync::oneshot::channel ();
{
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
direc_cookies.insert (cookie, DirecState {
start_time: Instant::now (),
p2_id: client_id.clone (),
p2_addr: tx,
});
}
debug! ("Waiting to learn P2's WAN address...");
let wan_addr = rx.await?;
debug! ("And that WAN address is {}", wan_addr);
Ok (())
}
async fn handle_p4_connection (
relay_state: Arc <RelayState>,
connection: quinn::Connection,

View File

@ -9,7 +9,7 @@ use std::{
use quinn::{
ClientConfig, Endpoint,
ServerConfig, TransportConfig,
ServerConfig,
};
/// Constructs a QUIC endpoint configured for use a client only.