UDP-over-TCP passes okay

Next step is updating the relay and making sure it integrates into (our thing that uses PTTH_QUIC)
main
(on company time) 2022-12-19 16:25:50 -06:00
parent e3ff600b51
commit 3f4bce85a4
4 changed files with 88 additions and 23 deletions

View File

@ -18,7 +18,7 @@ async fn main () -> anyhow::Result <()> {
})?; })?;
trace! ("Set Ctrl+C handler"); trace! ("Set Ctrl+C handler");
let app = relay::App::new (opt)?; let app = relay::App::new (opt).await?;
println! ("Base64 cert: {}", base64::encode (app.server_cert ())); println! ("Base64 cert: {}", base64::encode (app.server_cert ()));
println! ("Listening on {}", app.listen_addr ()); println! ("Listening on {}", app.listen_addr ());

View File

@ -30,7 +30,7 @@ pub struct App {
listen_addr: SocketAddr, listen_addr: SocketAddr,
pub (crate) metrics: Arc <RwLock <Metrics>>, pub (crate) metrics: Arc <RwLock <Metrics>>,
server_cert: Vec <u8>, server_cert: Vec <u8>,
tcp_listen_port: Option <u16>, tcp_listener: Option <udp_over_tcp::server::Listener>,
} }
#[derive (Default)] #[derive (Default)]
@ -39,18 +39,30 @@ pub (crate) struct Metrics {
} }
impl App { impl App {
pub fn new (opt: Opt) -> anyhow::Result <Self> { pub async fn new (opt: Opt) -> anyhow::Result <Self> {
let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?;
let (endpoint, server_cert) = make_server_endpoint (listen_addr)?; let (endpoint, server_cert) = make_server_endpoint (listen_addr)?;
let listen_addr = endpoint.local_addr ()?; let listen_addr = endpoint.local_addr ()?;
let tcp_listener = if let Some (tcp_port) = opt.tcp_listen_port {
let cfg = udp_over_tcp::server::Config {
tcp_port,
udp_port: listen_addr.port (),
};
Some (udp_over_tcp::server::Listener::new (cfg).await?)
}
else {
None
};
Ok (Self { Ok (Self {
endpoint, endpoint,
listen_addr, listen_addr,
metrics: Default::default (), metrics: Default::default (),
server_cert, server_cert,
tcp_listen_port: opt.tcp_listen_port, tcp_listener,
}) })
} }
@ -62,13 +74,20 @@ impl App {
&self.server_cert &self.server_cert
} }
pub fn tcp_listen_port (&self) -> anyhow::Result <Option <u16>> {
match self.tcp_listener.as_ref () {
None => Ok (None),
Some (tcp_listener) => Ok (tcp_listener.tcp_port ()?.into ()),
}
}
pub async fn run (self) -> anyhow::Result <()> { pub async fn run (self) -> anyhow::Result <()> {
let Self { let Self {
endpoint, endpoint,
listen_addr, listen_addr,
metrics, metrics,
server_cert, server_cert,
tcp_listen_port, tcp_listener,
} = self; } = self;
let mut relay_state = RelayState::default (); let mut relay_state = RelayState::default ();
@ -172,16 +191,13 @@ impl App {
debug! ("Serving HTTP on {:?}", http_addr); debug! ("Serving HTTP on {:?}", http_addr);
if let Some (tcp_listen_port) = tcp_listen_port { if let Some (tcp_listener) = tcp_listener {
tokio::spawn (async move { tokio::spawn (async move {
let cfg = udp_over_tcp::server::Config { if let Err (e) = tcp_listener.run ().await {
tcp_port: tcp_listen_port,
udp_port: listen_addr.port (),
};
if let Err (e) = udp_over_tcp::server::main (cfg).await {
eprintln! ("udp_over_tcp::server::main exited with err {:?}", e); eprintln! ("udp_over_tcp::server::main exited with err {:?}", e);
} }
Ok::<_, anyhow::Error> (())
}); });
} }

View File

@ -13,13 +13,16 @@ async fn end_to_end_async () -> anyhow::Result <()> {
let relay_opt = relay::Opt { let relay_opt = relay::Opt {
listen_addr: "127.0.0.1:0".to_string ().into (), listen_addr: "127.0.0.1:0".to_string ().into (),
tcp_listen_port: None, tcp_listen_port: Some (0),
}; };
let relay_app = relay::App::new (relay_opt)?; let relay_app = relay::App::new (relay_opt).await?;
let relay_quic_port = relay_app.listen_addr ().port (); let relay_quic_port = relay_app.listen_addr ().port ();
let relay_cert = Vec::from (relay_app.server_cert ()); let relay_cert = Vec::from (relay_app.server_cert ());
let relay_metrics = Arc::clone (&relay_app.metrics); let relay_metrics = Arc::clone (&relay_app.metrics);
let tcp_listen_port = relay_app.tcp_listen_port ()?.unwrap ();
assert_ne! (tcp_listen_port, 0);
let task_relay = tokio::spawn (async move { let task_relay = tokio::spawn (async move {
relay_app.run ().await relay_app.run ().await
@ -63,22 +66,45 @@ async fn end_to_end_async () -> anyhow::Result <()> {
assert_eq! (m.connected_end_servers, 0); assert_eq! (m.connected_end_servers, 0);
} }
// Connect properly // Connect over UDP
let server_conf = server::Config { let server_conf = server::Config {
debug_echo: false, debug_echo: false,
id: "bogus".into (), id: "bogus_VZBNRUA5".into (),
relay_addr: ([127, 0, 0, 1], relay_quic_port).into (), relay_addr: ([127, 0, 0, 1], relay_quic_port).into (),
relay_cert: relay_cert.clone (), relay_cert: relay_cert.clone (),
use_udp_over_tcp: false, use_udp_over_tcp: false,
}; };
let t = Instant::now ();
let (server, server_shutdown_tx) = server::P4EndServer::connect (server_conf).await?; let (server, server_shutdown_tx) = server::P4EndServer::connect (server_conf).await?;
let dur = t.elapsed ();
assert! (dur < Duration::from_millis (1_000), "{:?}", dur);
{ {
let m = relay_metrics.read ().await; let m = relay_metrics.read ().await;
assert_eq! (m.connected_end_servers, 1); assert_eq! (m.connected_end_servers, 1);
} }
// Connect over TCP
let server_conf = server::Config {
debug_echo: false,
id: "bogus_6E5CZIAI".into (),
relay_addr: ([127, 0, 0, 1], tcp_listen_port).into (),
relay_cert: relay_cert.clone (),
use_udp_over_tcp: true,
};
let t = Instant::now ();
let (server, server_shutdown_tx) = server::P4EndServer::connect (server_conf).await?;
let dur = t.elapsed ();
assert! (dur < Duration::from_millis (1_000), "{:?}", dur);
{
let m = relay_metrics.read ().await;
assert_eq! (m.connected_end_servers, 2);
}
Ok (()) Ok (())
} }

View File

@ -26,14 +26,37 @@ pub struct Config {
pub udp_port: u16, pub udp_port: u16,
} }
pub async fn main (cfg: Config) -> anyhow::Result <()> { pub struct Listener {
let tcp_listener = TcpListener::bind ((Ipv4Addr::UNSPECIFIED, cfg.tcp_port)).await?; cfg: Config,
tcp_listener: TcpListener,
loop { }
let (conn, _peer_addr) = tcp_listener.accept ().await?;
impl Listener {
pub async fn new (cfg: Config) -> anyhow::Result <Self> {
let tcp_listener = TcpListener::bind ((Ipv4Addr::UNSPECIFIED, cfg.tcp_port)).await?;
let cfg = cfg.clone (); Ok (Self {
spawn (handle_connection (cfg, conn)); cfg,
tcp_listener,
})
}
pub fn tcp_port (&self) -> anyhow::Result <u16> {
Ok (self.tcp_listener.local_addr ()?.port ())
}
pub async fn run (self) -> anyhow::Result <()> {
let Self {
cfg,
tcp_listener,
} = self;
loop {
let (conn, _peer_addr) = tcp_listener.accept ().await?;
let cfg = cfg.clone ();
spawn (handle_connection (cfg, conn));
}
} }
} }