test(ptth_quic): add end server --> relay server connection tests

main
(on company time) 2022-12-19 14:26:50 -06:00
parent 3f0272ed09
commit 27ed72b196
3 changed files with 105 additions and 63 deletions

View File

@ -5,12 +5,13 @@ use std::{
use tokio::sync::watch; use tokio::sync::watch;
use ptth_quic::prelude::*; use ptth_quic::prelude::*;
use ptth_quic::executable_end_server as server;
#[tokio::main] #[tokio::main]
async fn main () -> anyhow::Result <()> { async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init (); tracing_subscriber::fmt::init ();
let args = Vec::from_iter (std::env::args_os ()); let args: Vec <_> = std::env::args_os ().collect ();
let (shutdown_tx, shutdown_rx) = watch::channel (false); let (shutdown_tx, shutdown_rx) = watch::channel (false);
@ -19,5 +20,5 @@ async fn main () -> anyhow::Result <()> {
})?; })?;
trace! ("Set Ctrl+C handler"); trace! ("Set Ctrl+C handler");
ptth_quic::executable_end_server::main (&args, Some (shutdown_rx)).await server::main (&args, Some (shutdown_rx)).await
} }

View File

@ -10,7 +10,7 @@ use protocol::PeerId;
/// A partially-filled-out config that structopt can deal with /// A partially-filled-out config that structopt can deal with
/// Try to turn this into a Config as soon as possible. /// Try to turn this into a Config as soon as possible.
#[derive (Debug, StructOpt)] #[derive (Debug, StructOpt)]
struct Opt { pub struct Opt {
#[structopt (long)] #[structopt (long)]
relay_addr: Option <String>, relay_addr: Option <String>,
#[structopt (long)] #[structopt (long)]
@ -23,42 +23,26 @@ struct Opt {
use_udp_over_tcp: Option <bool>, use_udp_over_tcp: Option <bool>,
} }
/// A filled-out config for constructing an end server
#[derive (Clone)]
pub (crate) struct Config {
pub debug_echo: bool,
pub id: String,
pub relay_addr: SocketAddr,
pub relay_cert: Vec <u8>,
pub use_udp_over_tcp: bool,
}
pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool>>) -> anyhow::Result <()> { pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool>>) -> anyhow::Result <()> {
trace! ("executable_end_server::main"); trace! ("executable_end_server::main");
let opt = Opt::from_iter (args); let opt = Opt::from_iter (args);
let conf = opt.into_config ().await?; let conf = opt.into_config ().await?;
let end_server = Arc::new (P4EndServer::connect (&conf)?); let (end_server, shutdown_tx) = P4EndServer::connect (conf).await?;
let conf = if conf.use_udp_over_tcp {
let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?;
udp_sock.connect ((Ipv4Addr::LOCALHOST, end_server.endpoint.local_addr ()?.port ())).await?;
let udp_local_server_port = udp_sock.local_addr ()?.port ();
let tcp_sock = TcpSocket::new_v4 ()?;
let tcp_conn = tcp_sock.connect (conf.relay_addr).await?;
tokio::spawn (async move {
udp_over_tcp::client::main_with_sockets (udp_sock, tcp_conn).await
});
Config {
debug_echo: conf.debug_echo,
id: conf.id,
relay_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, udp_local_server_port)),
relay_cert: conf.relay_cert,
use_udp_over_tcp: true,
}
}
else {
conf
};
let run_task = { let run_task = {
let end_server = Arc::clone (&end_server);
tokio::spawn (async move { tokio::spawn (async move {
end_server.run (conf).await?; end_server.run ().await?;
Ok::<_, anyhow::Error> (()) Ok::<_, anyhow::Error> (())
}) })
}; };
@ -67,7 +51,8 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool
while ! *shutdown_rx.borrow () { while ! *shutdown_rx.borrow () {
shutdown_rx.changed ().await?; shutdown_rx.changed ().await?;
} }
end_server.shut_down ()?; trace! ("P4 end server shutting down...");
shutdown_tx.send (true)?
} }
run_task.await??; run_task.await??;
@ -77,22 +62,12 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool
Ok (()) Ok (())
} }
/// A filled-out config for constructing an end server
#[derive (Clone)]
pub struct Config {
pub debug_echo: bool,
pub id: String,
pub relay_addr: SocketAddr,
pub relay_cert: Vec <u8>,
pub use_udp_over_tcp: bool,
}
impl Opt { impl Opt {
/// Converts self into a Config that the server can use. /// Converts self into a Config that the server can use.
/// Performs I/O to load the relay cert from disk or from HTTP. /// Performs I/O to load the relay cert from disk or from HTTP.
/// Fails if arguments can't be parsed or if I/O fails. /// Fails if arguments can't be parsed or if I/O fails.
pub async fn into_config (self) -> anyhow::Result <Config> { pub (crate) async fn into_config (self) -> anyhow::Result <Config> {
let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ()); let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ());
let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
@ -117,41 +92,68 @@ impl Opt {
} }
pub struct P4EndServer { pub struct P4EndServer {
conf: Config,
conn: quinn::Connection,
endpoint: quinn::Endpoint, endpoint: quinn::Endpoint,
shutdown_tx: watch::Sender <bool>,
shutdown_rx: watch::Receiver <bool>, shutdown_rx: watch::Receiver <bool>,
} }
impl P4EndServer { impl P4EndServer {
pub fn connect (conf: &Config) -> anyhow::Result <Self> { pub (crate) async fn connect (conf: Config) -> anyhow::Result <(Self, watch::Sender <bool>)> {
trace! ("P4 end server making its QUIC endpoint"); trace! ("P4 end server making its QUIC endpoint");
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?; let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
let (shutdown_tx, shutdown_rx) = watch::channel (false); let conf = if conf.use_udp_over_tcp {
let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?;
udp_sock.connect ((Ipv4Addr::LOCALHOST, endpoint.local_addr ()?.port ())).await?;
Ok (P4EndServer { let udp_local_server_port = udp_sock.local_addr ()?.port ();
endpoint,
shutdown_tx, let tcp_sock = TcpSocket::new_v4 ()?;
shutdown_rx, let tcp_conn = tcp_sock.connect (conf.relay_addr).await?;
})
tokio::spawn (async move {
udp_over_tcp::client::main_with_sockets (udp_sock, tcp_conn).await
});
Config {
debug_echo: conf.debug_echo,
id: conf.id,
relay_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, udp_local_server_port)),
relay_cert: conf.relay_cert,
use_udp_over_tcp: true,
} }
}
pub async fn run (&self, conf: Config) -> anyhow::Result <()> { else {
let conf = Arc::new (conf); conf
};
trace! ("P4 end server connecting to P3 relay server"); trace! ("P4 end server connecting to P3 relay server");
let conn = protocol::p4_connect_to_p3 ( let conn = protocol::p4_connect_to_p3 (
&self.endpoint, &endpoint,
conf.relay_addr, conf.relay_addr,
&conf.id &conf.id
).await?; ).await?;
debug! ("Connected to relay server"); debug! ("Connected to relay server");
let (shutdown_tx, shutdown_rx) = watch::channel (false);
Ok ((P4EndServer {
conf,
conn,
endpoint,
shutdown_rx,
}, shutdown_tx))
}
pub (crate) async fn run (self) -> anyhow::Result <()> {
trace! ("Accepting bi streams from P3"); trace! ("Accepting bi streams from P3");
let mut shutdown_rx = self.shutdown_rx.clone (); let mut shutdown_rx = self.shutdown_rx.clone ();
let conf = Arc::new (self.conf);
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown_rx.changed () => { _ = shutdown_rx.changed () => {
@ -160,7 +162,7 @@ impl P4EndServer {
break; break;
} }
} }
stream_opt = conn.accept_bi () => { stream_opt = self.conn.accept_bi () => {
let (relay_send, relay_recv) = stream_opt?; let (relay_send, relay_recv) = stream_opt?;
tokio::spawn (handle_bi_stream (Arc::clone (&conf), relay_send, relay_recv)); tokio::spawn (handle_bi_stream (Arc::clone (&conf), relay_send, relay_recv));
@ -171,11 +173,6 @@ impl P4EndServer {
Ok (()) Ok (())
} }
pub fn shut_down (&self) -> anyhow::Result <()> {
trace! ("P4 end server shutting down...");
Ok (self.shutdown_tx.send (true)?)
}
pub fn shutting_down (&self) -> bool { pub fn shutting_down (&self) -> bool {
*self.shutdown_rx.borrow () *self.shutdown_rx.borrow ()
} }

View File

@ -6,6 +6,7 @@ fn end_to_end () -> anyhow::Result <()> {
} }
async fn end_to_end_async () -> anyhow::Result <()> { async fn end_to_end_async () -> anyhow::Result <()> {
use crate::executable_end_server as server;
use crate::executable_relay_server as relay; use crate::executable_relay_server as relay;
let relay_opt = relay::Opt { let relay_opt = relay::Opt {
@ -13,9 +14,52 @@ async fn end_to_end_async () -> anyhow::Result <()> {
tcp_listen_port: None, tcp_listen_port: None,
}; };
let relay_app = relay::App::new (relay_opt)?; let relay_app = relay::App::new (relay_opt)?;
let relay_quic_port = relay_app.listen_addr ().port ();
let relay_cert = Vec::from (relay_app.server_cert ());
let task_relay = tokio::spawn (async move { let task_relay = tokio::spawn (async move {
relay_app.run ().await relay_app.run ().await
}); });
// Connect with wrong port, should fail
let server_conf = server::Config {
debug_echo: false,
id: "bogus".into (),
relay_addr: "127.0.0.1:80".parse ()?,
relay_cert: relay_cert.clone (),
use_udp_over_tcp: false,
};
let server_err = server::P4EndServer::connect (server_conf).await;
assert! (server_err.is_err ());
// Connect with wrong cert, should fail
let server_conf = server::Config {
debug_echo: false,
id: "bogus".into (),
relay_addr: ([127, 0, 0, 1], relay_quic_port).into (),
relay_cert: vec! [],
use_udp_over_tcp: false,
};
let server_err = server::P4EndServer::connect (server_conf).await;
assert! (server_err.is_err ());
// Connect properly
let server_conf = server::Config {
debug_echo: false,
id: "bogus".into (),
relay_addr: ([127, 0, 0, 1], relay_quic_port).into (),
relay_cert: relay_cert.clone (),
use_udp_over_tcp: false,
};
let (server, _) = server::P4EndServer::connect (server_conf).await?;
Ok (()) Ok (())
} }