diff --git a/Cargo.lock b/Cargo.lock index 8952660..4547f9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1317,17 +1317,6 @@ dependencies = [ "uom", ] -[[package]] -name = "ptth_kv" -version = "0.1.0" -dependencies = [ - "anyhow", - "base64 0.13.1", - "hyper", - "thiserror", - "tokio", -] - [[package]] name = "ptth_multi_call_server" version = "1.1.1" diff --git a/crates/ptth_kv/Cargo.toml b/crates/ptth_kv/Cargo.toml deleted file mode 100644 index cc5f6b2..0000000 --- a/crates/ptth_kv/Cargo.toml +++ /dev/null @@ -1,12 +0,0 @@ -[package] -name = "ptth_kv" -version = "0.1.0" -authors = ["Trish"] -edition = "2018" - -[dependencies] -anyhow = "1.0.38" -base64 = "0.13.0" -hyper = { version = "0.14.4", features = ["full"] } -thiserror = "1.0.22" -tokio = { version = "1.8.1", features = ["full"] } diff --git a/crates/ptth_kv/src/main.rs b/crates/ptth_kv/src/main.rs deleted file mode 100644 index c902f59..0000000 --- a/crates/ptth_kv/src/main.rs +++ /dev/null @@ -1,499 +0,0 @@ -use std::{ - collections::{ - HashMap, - }, - iter::FromIterator, - sync::{ - Arc, - }, -}; - -use hyper::{ - Body, - Request, - Response, - StatusCode, -}; -use tokio::{ - sync::Mutex, -}; - -pub struct HttpService { - store: Arc , -} - -pub struct Store { - status_dirs: HashMap , StatusKeyDirectory>, -} - -#[derive (thiserror::Error, Debug, PartialEq)] -pub enum Error { - #[error ("key too long")] - KeyTooLong, - #[error ("no such key dir")] - NoSuchKeyDir, - #[error ("value too long")] - ValueTooLong, -} - -pub struct StatusQuotas { - pub max_keys: usize, - pub max_key_bytes: usize, - pub max_value_bytes: usize, - pub max_payload_bytes: usize, -} - -pub struct GetAfter { - pub tuples: Vec <(Vec , Vec )>, - pub sequence: u64, -} - -impl HttpService { - pub fn new (s: Store) -> Self { - Self { - store: Arc::new (s), - } - } - - pub fn inner (&self) -> &Store { - &*self.store - } - - pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> { - use std::net::SocketAddr; - - use hyper::{ - Server, - service::{ - make_service_fn, - service_fn, - }, - }; - - let make_svc = make_service_fn (|_conn| { - let state = self.store.clone (); - - async { - Ok::<_, String> (service_fn (move |req| { - let state = state.clone (); - - Self::handle_all (req, state) - })) - } - }); - - let addr = SocketAddr::from(([127, 0, 0, 1], port)); - - let server = Server::bind (&addr) - .serve (make_svc) - ; - - server.await - } -} - -impl Store { - pub fn new (status_dirs: I) - -> Self - where I: Iterator , StatusQuotas)> - { - let status_dirs = status_dirs - .map (|(name, quotas)| (name, StatusKeyDirectory::new (quotas))) - .collect (); - - Self { - status_dirs, - } - } - - pub fn list_key_dirs (&self) -> Vec > { - self.status_dirs.iter () - .map (|(k, _)| k.clone ()) - .collect () - } - - pub async fn set (&self, name: &[u8], key: &[u8], value: Vec ) - -> Result <(), Error> - { - let dir = self.status_dirs.get (name) - .ok_or (Error::NoSuchKeyDir)?; - - dir.set (key, value).await - } - - async fn set_multi (&self, name: &[u8], tuples: Vec <(&[u8], Vec )>) - -> Result <(), Error> - { - let dir = self.status_dirs.get (name) - .ok_or (Error::NoSuchKeyDir)?; - - dir.set_multi (tuples).await - } - - pub async fn get_after (&self, name: &[u8], thresh: Option ) - -> Result - { - let dir = self.status_dirs.get (name) - .ok_or (Error::NoSuchKeyDir)?; - - dir.get_after (thresh).await - } -} - -// End of public interface - -const SET_BATCH_SIZE: usize = 32; - -enum StoreCommand { - SetStatus (SetStatusCommand), - Multi (Vec ), -} - -struct StatusKeyDirectory { - quotas: StatusQuotas, - - // TODO: Make this tokio::sync::Mutex. - table: Mutex , -} - -#[derive (Default)] -struct StatusTable { - map: HashMap , StatusValue>, - sequence: u64, -} - -struct StatusValue { - value: Vec , - sequence: u64, -} - -struct SetStatusCommand { - key_dir: Vec , - key: Vec , - value: Vec , -} - -impl HttpService { - async fn handle_all (req: Request , store: Arc ) - -> Result , anyhow::Error> - { - Ok (Response::builder () - .body (Body::from ("hello\n"))?) - } -} - -impl StatusKeyDirectory { - fn new (quotas: StatusQuotas) -> Self { - Self { - quotas, - table: Mutex::new (Default::default ()), - } - } - - async fn set (&self, key: &[u8], value: Vec ) -> Result <(), Error> - { - if key.len () > self.quotas.max_key_bytes { - return Err (Error::KeyTooLong); - } - - if value.len () > self.quotas.max_value_bytes { - return Err (Error::ValueTooLong); - } - - { - let mut guard = self.table.lock ().await; - guard.set (&self.quotas, key, value); - } - Ok (()) - } - - async fn set_multi (&self, tuples: Vec <(&[u8], Vec )>) -> Result <(), Error> - { - { - let mut guard = self.table.lock ().await; - for (key, value) in tuples { - guard.set (&self.quotas, key, value); - } - } - Ok (()) - } - - async fn get_after (&self, thresh: Option ) -> Result { - let guard = self.table.lock ().await; - Ok (guard.get_after (thresh)) - } -} - -impl StatusTable { - fn payload_bytes (&self) -> usize { - self.map.iter () - .map (|(k, v)| k.len () + v.len ()) - .sum () - } - - fn set (&mut self, quotas: &StatusQuotas, key: &[u8], value: Vec ) { - self.sequence += 1; - - if self.map.len () > quotas.max_keys { - self.map.clear (); - } - - let new_bytes = key.len () + value.len (); - - if self.payload_bytes () + new_bytes > quotas.max_payload_bytes { - self.map.clear (); - } - - let value = StatusValue { - value, - sequence: self.sequence, - }; - - // self.map.insert (key, value); - match self.map.get_mut (key) { - None => { - self.map.insert (key.to_vec (), value); - }, - Some (v) => *v = value, - } - } - - fn get_after (&self, thresh: Option ) -> GetAfter { - let thresh = thresh.unwrap_or (0); - - let tuples = self.map.iter () - .filter_map (|(key, value)| { - if value.sequence <= thresh { - return None; - } - - Some ((key.clone (), value.value.clone ())) - }) - .collect (); - - GetAfter { - tuples, - sequence: self.sequence, - } - } -} - -impl StatusValue { - fn len (&self) -> usize { - self.value.len () - } -} - -#[tokio::main] -async fn main () -> Result <(), hyper::Error> { - use std::time::Duration; - - use tokio::{ - spawn, - time::interval, - }; - - let service = HttpService::new (Store::new (vec! [ - (b"key_dir".to_vec (), StatusQuotas { - max_keys: 4, - max_key_bytes: 16, - max_value_bytes: 16, - max_payload_bytes: 128, - }), - ].into_iter ())); - - service.serve (4003).await -} - -#[cfg (test)] -mod tests { - use tokio::runtime::Runtime; - - use super::*; - - fn get_after_eq (a: &GetAfter, b: &GetAfter) { - assert_eq! (a.sequence, b.sequence); - - let a = a.tuples.clone (); - let b = b.tuples.clone (); - - let a = HashMap::, Vec >::from_iter (a.into_iter ()); - let b = HashMap::from_iter (b.into_iter ()); - - assert_eq! (a, b); - } - - #[test] - fn store () { - let rt = Runtime::new ().unwrap (); - rt.block_on (async { - let s = Store::new (vec! [ - (b"key_dir".to_vec (), StatusQuotas { - max_keys: 4, - max_key_bytes: 16, - max_value_bytes: 16, - max_payload_bytes: 128, - }), - ].into_iter ()); - - let mut expected_sequence = 0; - - assert_eq! (s.list_key_dirs (), vec! [ - b"key_dir".to_vec (), - ]); - - assert_eq! ( - s.set (b"key_dir", b"this key is too long and will cause an error", b"bar".to_vec ()).await, - Err (Error::KeyTooLong) - ); - assert_eq! ( - s.set (b"key_dir", b"foo", b"this value is too long and will cause an error".to_vec ()).await, - Err (Error::ValueTooLong) - ); - assert_eq! ( - s.set (b"invalid_key_dir", b"foo", b"bar".to_vec ()).await, - Err (Error::NoSuchKeyDir) - ); - - let ga = s.get_after (b"key_dir", None).await.unwrap (); - assert_eq! (ga.sequence, expected_sequence); - assert_eq! (ga.tuples, vec! []); - - s.set (b"key_dir", b"foo_1", b"bar_1".to_vec ()).await.unwrap (); - expected_sequence += 1; - let ga = s.get_after (b"key_dir", None).await.unwrap (); - - assert_eq! (ga.sequence, expected_sequence); - assert_eq! (ga.tuples, vec! [ - (b"foo_1".to_vec (), b"bar_1".to_vec ()), - ]); - - get_after_eq (&ga, &GetAfter { - sequence: expected_sequence, - tuples: vec! [ - (b"foo_1".to_vec (), b"bar_1".to_vec ()), - ] - }); - - s.set (b"key_dir", b"foo_2", b"bar_2".to_vec ()).await.unwrap (); - expected_sequence += 1; - let ga = s.get_after (b"key_dir", None).await.unwrap (); - - get_after_eq (&ga, &GetAfter { - sequence: expected_sequence, - tuples: vec! [ - (b"foo_1".to_vec (), b"bar_1".to_vec ()), - (b"foo_2".to_vec (), b"bar_2".to_vec ()), - ] - }); - - s.set (b"key_dir", b"foo_1", b"bar_3".to_vec ()).await.unwrap (); - expected_sequence += 1; - let ga = s.get_after (b"key_dir", None).await.unwrap (); - - get_after_eq (&ga, &GetAfter { - sequence: expected_sequence, - tuples: vec! [ - (b"foo_1".to_vec (), b"bar_3".to_vec ()), - (b"foo_2".to_vec (), b"bar_2".to_vec ()), - ] - }); - - let ga = s.get_after (b"key_dir", Some (2)).await.unwrap (); - get_after_eq (&ga, &GetAfter { - sequence: expected_sequence, - tuples: vec! [ - (b"foo_1".to_vec (), b"bar_3".to_vec ()), - ] - }); - - let ga = s.get_after (b"key_dir", Some (3)).await.unwrap (); - get_after_eq (&ga, &GetAfter { - sequence: expected_sequence, - tuples: vec! [] - }); - }); - } - - #[test] - #[cfg (not (debug_assertions))] - fn perf () { - use std::time::Instant; - - let rt = Runtime::new ().unwrap (); - rt.block_on (async { - let s = Store::new (vec! [ - (b"key_dir".to_vec (), StatusQuotas { - max_keys: 4, - max_key_bytes: 16, - max_value_bytes: 16, - max_payload_bytes: 128, - }), - ].into_iter ()); - - let num_iters = 1_000_000; - - let key = b"foo"; - - let start_time = Instant::now (); - - for i in 0..num_iters { - let value = format! ("{}", i); - - s.set (b"key_dir", key, value.into ()).await.unwrap (); - } - - let end_time = Instant::now (); - let total_dur = end_time - start_time; - - let avg_nanos = total_dur.as_nanos () / num_iters; - - assert! (avg_nanos < 250, dbg! (avg_nanos)); - }); - } - - #[test] - #[cfg (not (debug_assertions))] - fn perf_multi () { - use std::time::Instant; - - let rt = Runtime::new ().unwrap (); - rt.block_on (async { - let s = Store::new (vec! [ - (b"key_dir".to_vec (), StatusQuotas { - max_keys: 8, - max_key_bytes: 16, - max_value_bytes: 16, - max_payload_bytes: 128, - }), - ].into_iter ()); - - let num_iters = 1_000_000; - - let start_time = Instant::now (); - - for i in 0..num_iters { - let value = Vec::::from (format! ("{}", i)); - let tuples = vec! [ - (&b"foo_0"[..], value.clone ()), - (b"foo_1", value.clone ()), - (b"foo_2", value.clone ()), - (b"foo_3", value.clone ()), - (b"foo_4", value.clone ()), - (b"foo_5", value.clone ()), - (b"foo_6", value.clone ()), - (b"foo_7", value.clone ()), - ]; - - s.set_multi (b"key_dir", tuples).await.unwrap (); - } - - let end_time = Instant::now (); - let total_dur = end_time - start_time; - - let avg_nanos = total_dur.as_nanos () / (num_iters * 8); - - assert! (avg_nanos < 150, dbg! (avg_nanos)); - }); - } -} diff --git a/crates/ptth_quic/src/bin/ptth_quic_client.rs b/crates/ptth_quic/src/bin/ptth_quic_client.rs index d0bba4f..b78a51d 100644 --- a/crates/ptth_quic/src/bin/ptth_quic_client.rs +++ b/crates/ptth_quic/src/bin/ptth_quic_client.rs @@ -1,6 +1,5 @@ use structopt::StructOpt; use tokio::{ - net::UdpSocket, sync::watch, }; @@ -96,34 +95,7 @@ impl P2Client { }) }; - if false { - let task_direc_connect = { - let connection = connection.clone (); - - tokio::spawn (async move { - let cookie = protocol::p2_direc_to_p4 ( - &connection, - "bogus_server", - ).await?; - - let sock = UdpSocket::bind ("0.0.0.0:0").await?; - - let mut interval = tokio::time::interval (Duration::from_millis (1000)); - interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Delay); - - loop { - interval.tick ().await; - sock.send_to(&cookie [..], "127.0.0.1:30379").await?; - debug! ("P2 sent cookie to P3 over plain UDP"); - } - - Ok::<_, anyhow::Error> (()) - }) - }; - } - task_tcp_server.await??; - //task_direc_connect.await??; Ok (()) } diff --git a/crates/ptth_quic/src/executable_end_server.rs b/crates/ptth_quic/src/executable_end_server.rs index e715f80..494016e 100644 --- a/crates/ptth_quic/src/executable_end_server.rs +++ b/crates/ptth_quic/src/executable_end_server.rs @@ -94,7 +94,6 @@ impl Opt { pub struct P4EndServer { conf: Config, conn: quinn::Connection, - endpoint: quinn::Endpoint, shutdown_rx: watch::Receiver , } @@ -142,7 +141,6 @@ impl P4EndServer { Ok ((P4EndServer { conf, conn, - endpoint, shutdown_rx, }, shutdown_tx)) } diff --git a/crates/ptth_quic/src/executable_relay_server.rs b/crates/ptth_quic/src/executable_relay_server.rs index 5840612..a66a35b 100644 --- a/crates/ptth_quic/src/executable_relay_server.rs +++ b/crates/ptth_quic/src/executable_relay_server.rs @@ -10,9 +10,6 @@ use hyper::{ StatusCode, }; use structopt::StructOpt; -use tokio::{ - net::UdpSocket, -}; use crate::prelude::*; use protocol::PeerId; @@ -87,9 +84,9 @@ impl App { pub async fn run (self) -> anyhow::Result <()> { let Self { endpoint, - listen_addr, + listen_addr: _, metrics, - server_cert, + server_cert: _, tcp_listener, } = self; @@ -158,35 +155,6 @@ impl App { }) }; - let task_direc_server = { - let relay_state = Arc::clone (&relay_state); - - tokio::spawn (async move { - let sock = UdpSocket::bind("0.0.0.0:30379").await?; - let mut buf = [0; 2048]; - loop { - let (len, addr) = sock.recv_from (&mut buf).await?; - debug! ("{:?} bytes received from {:?}", len, addr); - - let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x)); - - { - let mut direc_cookies = relay_state.direc_cookies.lock ().await; - - if let Some (direc_state) = direc_cookies.remove (&packet) { - debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id); - direc_state.p2_addr.send (addr).ok (); - } - else { - debug! ("UDP packet didn't match any PTTH_DIREC cookie"); - } - } - } - - Ok::<_, anyhow::Error> (()) - }) - }; - let task_http_server = tokio::spawn (async move { http_server.serve (make_svc).await?; Ok::<_, anyhow::Error> (()) @@ -225,9 +193,6 @@ impl App { _val = task_http_server => { eprintln! ("HTTP server exited, exiting"); }, - _val = task_direc_server => { - eprintln! ("PTTH_DIREC server exited, exiting"); - }, } Ok (()) @@ -256,7 +221,6 @@ async fn handle_http (_req: Request , relay_state: Arc ) struct RelayState { config: arc_swap::ArcSwap , p4_server_proxies: Mutex >, - direc_cookies: Mutex , DirecState>>, metrics: Arc >, stats: Stats, http_client: reqwest::Client, @@ -265,7 +229,6 @@ struct RelayState { #[derive (Default)] struct Config { ip_nicknames: BTreeMap <[u8; 4], String>, - tcp_listen_port: Option , webhook_url: Option , } @@ -273,7 +236,6 @@ impl From for Config { fn from (x: ConfigFile) -> Self { Self { ip_nicknames: x.ip_nicknames.into_iter ().collect (), - tcp_listen_port: x.tcp_listen_port, webhook_url: x.webhook_url, } } @@ -286,12 +248,6 @@ struct ConfigFile { webhook_url: Option , } -struct DirecState { - start_time: Instant, - p2_id: PeerId, - p2_addr: tokio::sync::oneshot::Sender , -} - #[derive (Default)] struct Stats { quic: ConnectEvents, @@ -513,19 +469,7 @@ async fn handle_p2_connection ( recv ).await? }, - protocol::P2ToP3Stream::DirecP2ToP4 { - server_id, - cookie, - } => { - handle_direc_p2_to_p4 ( - relay_state, - client_id, - server_id, - cookie, - send, - recv - ).await? - }, + _ => (), } debug! ("Request ended for P2"); @@ -569,41 +513,6 @@ async fn handle_request_p2_to_p4 ( Ok (()) } -async fn handle_direc_p2_to_p4 ( - relay_state: Arc , - client_id: String, - server_id: PeerId, - cookie: Vec , - mut client_send: quinn::SendStream, - client_recv: quinn::RecvStream, -) -> anyhow::Result <()> -{ - debug! ("P2 {} wants a P2P connection to P4 {}", client_id, server_id); - - // TODO: Check authorization - - protocol::p3_authorize_p2_to_p4_direc (&mut client_send).await?; - - let (tx, rx) = tokio::sync::oneshot::channel (); - - { - let mut direc_cookies = relay_state.direc_cookies.lock ().await; - direc_cookies.insert (cookie, DirecState { - start_time: Instant::now (), - p2_id: client_id.clone (), - p2_addr: tx, - }); - } - - debug! ("Waiting to learn P2's WAN address..."); - - let wan_addr = rx.await?; - - debug! ("And that WAN address is {}", wan_addr); - - Ok (()) -} - async fn handle_p4_connection ( relay_state: Arc , connection: quinn::Connection, diff --git a/crates/ptth_quic/src/quinn_utils.rs b/crates/ptth_quic/src/quinn_utils.rs index 56df875..dec1e7d 100644 --- a/crates/ptth_quic/src/quinn_utils.rs +++ b/crates/ptth_quic/src/quinn_utils.rs @@ -9,7 +9,7 @@ use std::{ use quinn::{ ClientConfig, Endpoint, - ServerConfig, TransportConfig, + ServerConfig, }; /// Constructs a QUIC endpoint configured for use a client only.