From e3ff600b51f1e5ee386d83888980a3795879ffe0 Mon Sep 17 00:00:00 2001 From: "(on company time)" <_@_> Date: Mon, 19 Dec 2022 14:58:12 -0600 Subject: [PATCH] :white_check_mark: expose count of connected end servers to testing code --- .../ptth_quic/src/executable_relay_server.rs | 23 ++++++++++++++++++- crates/ptth_quic/src/prelude.rs | 1 + crates/ptth_quic/src/tests.rs | 21 ++++++++++++++++- 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/crates/ptth_quic/src/executable_relay_server.rs b/crates/ptth_quic/src/executable_relay_server.rs index ee6fb01..9e149b5 100644 --- a/crates/ptth_quic/src/executable_relay_server.rs +++ b/crates/ptth_quic/src/executable_relay_server.rs @@ -28,10 +28,16 @@ pub struct Opt { pub struct App { endpoint: quinn::Endpoint, listen_addr: SocketAddr, + pub (crate) metrics: Arc >, server_cert: Vec , tcp_listen_port: Option , } +#[derive (Default)] +pub (crate) struct Metrics { + pub (crate) connected_end_servers: usize, +} + impl App { pub fn new (opt: Opt) -> anyhow::Result { let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; @@ -42,6 +48,7 @@ impl App { Ok (Self { endpoint, listen_addr, + metrics: Default::default (), server_cert, tcp_listen_port: opt.tcp_listen_port, }) @@ -59,11 +66,13 @@ impl App { let Self { endpoint, listen_addr, + metrics, server_cert, tcp_listen_port, } = self; - let relay_state = RelayState::default (); + let mut relay_state = RelayState::default (); + relay_state.metrics = metrics; if let Err (e) = relay_state.reload_config ().await { error! ("{:?}", e); } @@ -110,6 +119,7 @@ impl App { // Each new peer QUIC connection gets its own task tokio::spawn (async move { let active = relay_state.stats.quic.connect (); + debug! ("QUIC connections: {}", active); match handle_quic_connection (Arc::clone (&relay_state), conn).await { @@ -228,6 +238,7 @@ struct RelayState { config: arc_swap::ArcSwap , p4_server_proxies: Mutex >, direc_cookies: Mutex , DirecState>>, + metrics: Arc >, stats: Stats, http_client: reqwest::Client, } @@ -431,7 +442,17 @@ async fn handle_quic_connection ( // TODO: Check authorization for P4 peers protocol::p3_authorize_p4_peer (&mut send).await?; + let metrics = Arc::clone (&relay_state.metrics); + + { + let mut m = metrics.write ().await; + m.connected_end_servers += 1; + } handle_p4_connection (relay_state, conn, peer).await?; + { + let mut m = metrics.write ().await; + m.connected_end_servers -= 1; + } }, } diff --git a/crates/ptth_quic/src/prelude.rs b/crates/ptth_quic/src/prelude.rs index 9508d89..d067afa 100644 --- a/crates/ptth_quic/src/prelude.rs +++ b/crates/ptth_quic/src/prelude.rs @@ -37,6 +37,7 @@ pub use tokio::{ }, sync::{ Mutex, + RwLock, mpsc, }, task::JoinHandle, diff --git a/crates/ptth_quic/src/tests.rs b/crates/ptth_quic/src/tests.rs index dd93d29..e3308ae 100644 --- a/crates/ptth_quic/src/tests.rs +++ b/crates/ptth_quic/src/tests.rs @@ -1,3 +1,5 @@ +use crate::prelude::*; + #[test] fn end_to_end () -> anyhow::Result <()> { let rt = tokio::runtime::Runtime::new ()?; @@ -14,12 +16,20 @@ async fn end_to_end_async () -> anyhow::Result <()> { tcp_listen_port: None, }; let relay_app = relay::App::new (relay_opt)?; + let relay_quic_port = relay_app.listen_addr ().port (); let relay_cert = Vec::from (relay_app.server_cert ()); + let relay_metrics = Arc::clone (&relay_app.metrics); + let task_relay = tokio::spawn (async move { relay_app.run ().await }); + { + let m = relay_metrics.read ().await; + assert_eq! (m.connected_end_servers, 0); + } + // Connect with wrong port, should fail let server_conf = server::Config { @@ -48,6 +58,11 @@ async fn end_to_end_async () -> anyhow::Result <()> { assert! (server_err.is_err ()); + { + let m = relay_metrics.read ().await; + assert_eq! (m.connected_end_servers, 0); + } + // Connect properly let server_conf = server::Config { @@ -58,8 +73,12 @@ async fn end_to_end_async () -> anyhow::Result <()> { use_udp_over_tcp: false, }; - let (server, _) = server::P4EndServer::connect (server_conf).await?; + let (server, server_shutdown_tx) = server::P4EndServer::connect (server_conf).await?; + { + let m = relay_metrics.read ().await; + assert_eq! (m.connected_end_servers, 1); + } Ok (()) }