From 27ed72b196988521387c6c6a08ac28ee11059d49 Mon Sep 17 00:00:00 2001 From: "(on company time)" <_@_> Date: Mon, 19 Dec 2022 14:26:50 -0600 Subject: [PATCH] :white_check_mark: test(ptth_quic): add end server --> relay server connection tests --- .../ptth_quic/src/bin/ptth_quic_end_server.rs | 5 +- crates/ptth_quic/src/executable_end_server.rs | 119 +++++++++--------- crates/ptth_quic/src/tests.rs | 44 +++++++ 3 files changed, 105 insertions(+), 63 deletions(-) diff --git a/crates/ptth_quic/src/bin/ptth_quic_end_server.rs b/crates/ptth_quic/src/bin/ptth_quic_end_server.rs index bd0ffc8..048b354 100644 --- a/crates/ptth_quic/src/bin/ptth_quic_end_server.rs +++ b/crates/ptth_quic/src/bin/ptth_quic_end_server.rs @@ -5,12 +5,13 @@ use std::{ use tokio::sync::watch; use ptth_quic::prelude::*; +use ptth_quic::executable_end_server as server; #[tokio::main] async fn main () -> anyhow::Result <()> { tracing_subscriber::fmt::init (); - let args = Vec::from_iter (std::env::args_os ()); + let args: Vec <_> = std::env::args_os ().collect (); let (shutdown_tx, shutdown_rx) = watch::channel (false); @@ -19,5 +20,5 @@ async fn main () -> anyhow::Result <()> { })?; trace! ("Set Ctrl+C handler"); - ptth_quic::executable_end_server::main (&args, Some (shutdown_rx)).await + server::main (&args, Some (shutdown_rx)).await } diff --git a/crates/ptth_quic/src/executable_end_server.rs b/crates/ptth_quic/src/executable_end_server.rs index 374b295..9cfbb57 100644 --- a/crates/ptth_quic/src/executable_end_server.rs +++ b/crates/ptth_quic/src/executable_end_server.rs @@ -10,7 +10,7 @@ use protocol::PeerId; /// A partially-filled-out config that structopt can deal with /// Try to turn this into a Config as soon as possible. #[derive (Debug, StructOpt)] -struct Opt { +pub struct Opt { #[structopt (long)] relay_addr: Option , #[structopt (long)] @@ -23,42 +23,26 @@ struct Opt { use_udp_over_tcp: Option , } +/// A filled-out config for constructing an end server +#[derive (Clone)] +pub (crate) struct Config { + pub debug_echo: bool, + pub id: String, + pub relay_addr: SocketAddr, + pub relay_cert: Vec , + pub use_udp_over_tcp: bool, +} + pub async fn main (args: &[OsString], shutdown_rx: Option >) -> anyhow::Result <()> { trace! ("executable_end_server::main"); let opt = Opt::from_iter (args); let conf = opt.into_config ().await?; - let end_server = Arc::new (P4EndServer::connect (&conf)?); - - let conf = if conf.use_udp_over_tcp { - let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?; - udp_sock.connect ((Ipv4Addr::LOCALHOST, end_server.endpoint.local_addr ()?.port ())).await?; - - let udp_local_server_port = udp_sock.local_addr ()?.port (); - - let tcp_sock = TcpSocket::new_v4 ()?; - let tcp_conn = tcp_sock.connect (conf.relay_addr).await?; - - tokio::spawn (async move { - udp_over_tcp::client::main_with_sockets (udp_sock, tcp_conn).await - }); - - Config { - debug_echo: conf.debug_echo, - id: conf.id, - relay_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, udp_local_server_port)), - relay_cert: conf.relay_cert, - use_udp_over_tcp: true, - } - } - else { - conf - }; + let (end_server, shutdown_tx) = P4EndServer::connect (conf).await?; let run_task = { - let end_server = Arc::clone (&end_server); tokio::spawn (async move { - end_server.run (conf).await?; + end_server.run ().await?; Ok::<_, anyhow::Error> (()) }) }; @@ -67,7 +51,8 @@ pub async fn main (args: &[OsString], shutdown_rx: Option , - pub use_udp_over_tcp: bool, -} - impl Opt { /// Converts self into a Config that the server can use. /// Performs I/O to load the relay cert from disk or from HTTP. /// Fails if arguments can't be parsed or if I/O fails. - pub async fn into_config (self) -> anyhow::Result { + pub (crate) async fn into_config (self) -> anyhow::Result { let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ()); let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; @@ -117,41 +92,68 @@ impl Opt { } pub struct P4EndServer { + conf: Config, + conn: quinn::Connection, endpoint: quinn::Endpoint, - shutdown_tx: watch::Sender , shutdown_rx: watch::Receiver , } impl P4EndServer { - pub fn connect (conf: &Config) -> anyhow::Result { + pub (crate) async fn connect (conf: Config) -> anyhow::Result <(Self, watch::Sender )> { trace! ("P4 end server making its QUIC endpoint"); let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?; - let (shutdown_tx, shutdown_rx) = watch::channel (false); - - Ok (P4EndServer { - endpoint, - shutdown_tx, - shutdown_rx, - }) - } - - pub async fn run (&self, conf: Config) -> anyhow::Result <()> { - let conf = Arc::new (conf); + let conf = if conf.use_udp_over_tcp { + let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?; + udp_sock.connect ((Ipv4Addr::LOCALHOST, endpoint.local_addr ()?.port ())).await?; + + let udp_local_server_port = udp_sock.local_addr ()?.port (); + + let tcp_sock = TcpSocket::new_v4 ()?; + let tcp_conn = tcp_sock.connect (conf.relay_addr).await?; + + tokio::spawn (async move { + udp_over_tcp::client::main_with_sockets (udp_sock, tcp_conn).await + }); + + Config { + debug_echo: conf.debug_echo, + id: conf.id, + relay_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, udp_local_server_port)), + relay_cert: conf.relay_cert, + use_udp_over_tcp: true, + } + } + else { + conf + }; trace! ("P4 end server connecting to P3 relay server"); let conn = protocol::p4_connect_to_p3 ( - &self.endpoint, + &endpoint, conf.relay_addr, &conf.id ).await?; debug! ("Connected to relay server"); + let (shutdown_tx, shutdown_rx) = watch::channel (false); + + Ok ((P4EndServer { + conf, + conn, + endpoint, + shutdown_rx, + }, shutdown_tx)) + } + + pub (crate) async fn run (self) -> anyhow::Result <()> { trace! ("Accepting bi streams from P3"); let mut shutdown_rx = self.shutdown_rx.clone (); + let conf = Arc::new (self.conf); + loop { tokio::select! { _ = shutdown_rx.changed () => { @@ -160,7 +162,7 @@ impl P4EndServer { break; } } - stream_opt = conn.accept_bi () => { + stream_opt = self.conn.accept_bi () => { let (relay_send, relay_recv) = stream_opt?; tokio::spawn (handle_bi_stream (Arc::clone (&conf), relay_send, relay_recv)); @@ -171,11 +173,6 @@ impl P4EndServer { Ok (()) } - pub fn shut_down (&self) -> anyhow::Result <()> { - trace! ("P4 end server shutting down..."); - Ok (self.shutdown_tx.send (true)?) - } - pub fn shutting_down (&self) -> bool { *self.shutdown_rx.borrow () } diff --git a/crates/ptth_quic/src/tests.rs b/crates/ptth_quic/src/tests.rs index 2ffd11e..dd93d29 100644 --- a/crates/ptth_quic/src/tests.rs +++ b/crates/ptth_quic/src/tests.rs @@ -6,6 +6,7 @@ fn end_to_end () -> anyhow::Result <()> { } async fn end_to_end_async () -> anyhow::Result <()> { + use crate::executable_end_server as server; use crate::executable_relay_server as relay; let relay_opt = relay::Opt { @@ -13,9 +14,52 @@ async fn end_to_end_async () -> anyhow::Result <()> { tcp_listen_port: None, }; let relay_app = relay::App::new (relay_opt)?; + let relay_quic_port = relay_app.listen_addr ().port (); + let relay_cert = Vec::from (relay_app.server_cert ()); let task_relay = tokio::spawn (async move { relay_app.run ().await }); + // Connect with wrong port, should fail + + let server_conf = server::Config { + debug_echo: false, + id: "bogus".into (), + relay_addr: "127.0.0.1:80".parse ()?, + relay_cert: relay_cert.clone (), + use_udp_over_tcp: false, + }; + + let server_err = server::P4EndServer::connect (server_conf).await; + + assert! (server_err.is_err ()); + + // Connect with wrong cert, should fail + + let server_conf = server::Config { + debug_echo: false, + id: "bogus".into (), + relay_addr: ([127, 0, 0, 1], relay_quic_port).into (), + relay_cert: vec! [], + use_udp_over_tcp: false, + }; + + let server_err = server::P4EndServer::connect (server_conf).await; + + assert! (server_err.is_err ()); + + // Connect properly + + let server_conf = server::Config { + debug_echo: false, + id: "bogus".into (), + relay_addr: ([127, 0, 0, 1], relay_quic_port).into (), + relay_cert: relay_cert.clone (), + use_udp_over_tcp: false, + }; + + let (server, _) = server::P4EndServer::connect (server_conf).await?; + + Ok (()) }