From f9e10e0f64ada9534b17bc37460b6b4bafef88f3 Mon Sep 17 00:00:00 2001 From: "(on company time)" <_@_> Date: Mon, 31 Oct 2022 13:50:42 -0500 Subject: [PATCH] :star: use udp_over_tcp for both PTTH_QUIC end server and relay server - `cargo run -p ptth_quic --bin ptth_quic_relay_server -- --tcp-listen-port 4440` - `cargo run -p ptth_quic --bin ptth_quic_end_server -- --use-udp-over-tcp true --relay-addr 127.0.0.1:4440 --server-id my_server` - `cargo run -p ptth_quic_client_gui -- --client-id me` - `nc -l -p 5900` - (Open my_server, 5900 in the client GUI) - `nc 127.0.0.1 50369` (or whatever port the GUI picked) --- crates/ptth_quic/src/executable_end_server.rs | 51 ++++++++++++++----- .../ptth_quic/src/executable_relay_server.rs | 19 +++---- crates/ptth_quic/src/prelude.rs | 12 ++++- crates/udp_over_tcp/src/client.rs | 6 +++ 4 files changed, 60 insertions(+), 28 deletions(-) diff --git a/crates/ptth_quic/src/executable_end_server.rs b/crates/ptth_quic/src/executable_end_server.rs index 4033d58..7d6ea0e 100644 --- a/crates/ptth_quic/src/executable_end_server.rs +++ b/crates/ptth_quic/src/executable_end_server.rs @@ -19,6 +19,8 @@ struct Opt { debug_echo: bool, #[structopt (long)] cert_url: Option , + #[structopt (long)] + use_udp_over_tcp: Option , } pub async fn main (args: &[OsString], shutdown_rx: Option >) -> anyhow::Result <()> { @@ -26,12 +28,37 @@ pub async fn main (args: &[OsString], shutdown_rx: Option (()) }) }; @@ -57,6 +84,7 @@ pub struct Config { pub id: String, pub relay_addr: SocketAddr, pub relay_cert: Vec , + pub use_udp_over_tcp: bool, } impl Opt { @@ -83,45 +111,42 @@ impl Opt { id, relay_addr, relay_cert, + use_udp_over_tcp: self.use_udp_over_tcp.unwrap_or (false), }) } } pub struct P4EndServer { endpoint: quinn::Endpoint, - conf: Arc , shutdown_tx: watch::Sender , shutdown_rx: watch::Receiver , } impl P4EndServer { - pub fn connect (conf: Config) -> anyhow::Result { + pub fn connect (conf: &Config) -> anyhow::Result { trace! ("P4 end server making its QUIC endpoint"); let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?; let (shutdown_tx, shutdown_rx) = watch::channel (false); Ok (P4EndServer { - conf: Arc::new (conf), endpoint, shutdown_tx, shutdown_rx, }) } - pub fn config (&self) -> &Config { - &*self.conf - } - - pub async fn run (&self) -> anyhow::Result <()> { + pub async fn run (&self, conf: Config) -> anyhow::Result <()> { + let conf = Arc::new (conf); + trace! ("P4 end server connecting to P3 relay server"); let quinn::NewConnection { mut bi_streams, .. } = protocol::p4_connect_to_p3 ( &self.endpoint, - self.conf.relay_addr, - &self.conf.id + conf.relay_addr, + &conf.id ).await?; debug! ("Connected to relay server"); @@ -141,7 +166,7 @@ impl P4EndServer { stream_opt = bi_streams.next () => { let (relay_send, relay_recv) = stream_opt.ok_or_else (|| anyhow::anyhow! ("P4 ran out of incoming streams. Maybe P3 shut down or disconnected?"))??; - tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv)); + tokio::spawn (handle_bi_stream (Arc::clone (&conf), relay_send, relay_recv)); } }; } diff --git a/crates/ptth_quic/src/executable_relay_server.rs b/crates/ptth_quic/src/executable_relay_server.rs index 7f09e98..058a0ce 100644 --- a/crates/ptth_quic/src/executable_relay_server.rs +++ b/crates/ptth_quic/src/executable_relay_server.rs @@ -115,22 +115,18 @@ pub async fn main (opt: Opt) -> anyhow::Result <()> debug! ("Serving HTTP on {:?}", http_addr); - let task_tcp_over_udp_server = if let Some (tcp_listen_port) = opt.tcp_listen_port { + if let Some (tcp_listen_port) = opt.tcp_listen_port { tokio::spawn (async move { let cfg = udp_over_tcp::server::Config { tcp_port: tcp_listen_port, udp_port: listen_addr.port (), }; - udp_over_tcp::server::main (cfg).await - }) - } - else { - tokio::spawn (async move { - loop { - tokio::time::sleep (Duration::from_secs (3_600)).await; + + if let Err (e) = udp_over_tcp::server::main (cfg).await { + eprintln! ("udp_over_tcp::server::main exited with err {:?}", e); } - }) - }; + }); + } tokio::select! { _val = task_quic_server => { @@ -142,9 +138,6 @@ pub async fn main (opt: Opt) -> anyhow::Result <()> _val = task_direc_server => { eprintln! ("PTTH_DIREC server exited, exiting"); }, - _val = task_tcp_over_udp_server => { - eprintln! ("tcp_over_udp server exited, exiting"); - } } Ok (()) diff --git a/crates/ptth_quic/src/prelude.rs b/crates/ptth_quic/src/prelude.rs index d091c0d..589b65a 100644 --- a/crates/ptth_quic/src/prelude.rs +++ b/crates/ptth_quic/src/prelude.rs @@ -2,7 +2,11 @@ pub use std::{ collections::*, ffi::OsString, iter::FromIterator, - net::SocketAddr, + net::{ + Ipv4Addr, + SocketAddr, + SocketAddrV4, + }, sync::{ Arc, atomic::{ @@ -26,7 +30,11 @@ pub use tokio::{ AsyncReadExt, AsyncWriteExt, }, - net::TcpListener, + net::{ + TcpListener, + TcpSocket, + UdpSocket, + }, sync::{ Mutex, mpsc, diff --git a/crates/udp_over_tcp/src/client.rs b/crates/udp_over_tcp/src/client.rs index 9f188f3..dafab3b 100644 --- a/crates/udp_over_tcp/src/client.rs +++ b/crates/udp_over_tcp/src/client.rs @@ -10,6 +10,7 @@ use std::{ use tokio::{ net::{ TcpSocket, + TcpStream, UdpSocket, }, spawn, @@ -29,6 +30,11 @@ pub async fn main (cfg: Config) -> anyhow::Result <()> { let tcp_sock = TcpSocket::new_v4 ()?; let tcp_conn = tcp_sock.connect (cfg.tcp_server_addr).await?; + + main_with_sockets (udp_sock, tcp_conn).await +} + +pub async fn main_with_sockets (udp_sock: UdpSocket, tcp_conn: TcpStream) -> anyhow::Result <()> { let (tcp_read, tcp_write) = tcp_conn.into_split (); let tx_task;