From edd7e8de54fb071266d02adefb9a2b4dd11084c3 Mon Sep 17 00:00:00 2001 From: "(on company time)" <_@_> Date: Mon, 31 Oct 2022 11:24:15 -0500 Subject: [PATCH] :star: proof of concept for UDP-over-TCP --- .gitignore | 3 ++ Cargo.lock | 8 ++++ crates/udp_over_tcp/Cargo.toml | 10 +++++ crates/udp_over_tcp/src/client.rs | 47 +++++++++++++++++++++ crates/udp_over_tcp/src/loops.rs | 69 +++++++++++++++++++++++++++++++ crates/udp_over_tcp/src/main.rs | 54 ++++++++++++++++++++++++ crates/udp_over_tcp/src/server.rs | 55 ++++++++++++++++++++++++ 7 files changed, 246 insertions(+) create mode 100644 crates/udp_over_tcp/Cargo.toml create mode 100644 crates/udp_over_tcp/src/client.rs create mode 100644 crates/udp_over_tcp/src/loops.rs create mode 100644 crates/udp_over_tcp/src/main.rs create mode 100644 crates/udp_over_tcp/src/server.rs diff --git a/.gitignore b/.gitignore index c08fc3c..11ccc86 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,6 @@ # TLS certs used for QUIC experiments *.crt + +# Kate editor temp file +*.kate-swp diff --git a/Cargo.lock b/Cargo.lock index c138489..5a39f54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2253,6 +2253,14 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c" +[[package]] +name = "udp_over_tcp" +version = "0.1.0" +dependencies = [ + "anyhow", + "tokio", +] + [[package]] name = "unicase" version = "2.6.0" diff --git a/crates/udp_over_tcp/Cargo.toml b/crates/udp_over_tcp/Cargo.toml new file mode 100644 index 0000000..bae2954 --- /dev/null +++ b/crates/udp_over_tcp/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "udp_over_tcp" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.38" +tokio = { version = "1.8.1", features = ["full"] } diff --git a/crates/udp_over_tcp/src/client.rs b/crates/udp_over_tcp/src/client.rs new file mode 100644 index 0000000..7b0fbf6 --- /dev/null +++ b/crates/udp_over_tcp/src/client.rs @@ -0,0 +1,47 @@ +use std::{ + net::{ + Ipv4Addr, + SocketAddr, + SocketAddrV4, + }, + sync::Arc, +}; + +use tokio::{ + net::{ + TcpSocket, + UdpSocket, + }, + spawn, +}; + +use crate::loops; + +pub async fn main () -> anyhow::Result <()> { + let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, crate::PORT_1)).await?; + udp_sock.connect ((Ipv4Addr::LOCALHOST, crate::PORT_0)).await?; + + let tcp_sock = TcpSocket::new_v4 ()?; + let tcp_conn = tcp_sock.connect (SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, crate::PORT_2))).await?; + let (tcp_read, tcp_write) = tcp_conn.into_split (); + + let tx_task; + let rx_task; + + { + let udp_sock = Arc::new (udp_sock); + rx_task = spawn (loops::rx (Arc::clone (&udp_sock), tcp_read)); + tx_task = spawn (loops::tx (Arc::clone (&udp_sock), tcp_write)); + } + + tokio::select! { + _val = tx_task => { + println! ("client_main: tx_task exited, exiting"); + } + _val = rx_task => { + println! ("client_main: rx_task exited, exiting"); + } + } + + Ok (()) +} diff --git a/crates/udp_over_tcp/src/loops.rs b/crates/udp_over_tcp/src/loops.rs new file mode 100644 index 0000000..d71ccb5 --- /dev/null +++ b/crates/udp_over_tcp/src/loops.rs @@ -0,0 +1,69 @@ +use std::{ + sync::Arc, +}; + +use tokio::{ + io::{ + AsyncReadExt, + AsyncWriteExt, + }, + net::{ + UdpSocket, + tcp, + } +}; + +pub async fn rx ( + udp_sock: Arc , + mut tcp_read: tcp::OwnedReadHalf, +) -> anyhow::Result <()> { + loop { + let mut tag = [0u8, 0, 0, 0]; + let bytes_read = tcp_read.read (&mut tag).await?; + if bytes_read != 4 { + anyhow::anyhow! ("loops::rx: Couldn't read 4 bytes for tag"); + } + if tag != [1, 0, 0, 0] { + anyhow::anyhow! ("loops::rx: unexpected tag in framing"); + } + + let mut length = [0u8, 0, 0, 0]; + let bytes_read = tcp_read.read (&mut length).await?; + if bytes_read != 4 { + anyhow::anyhow! ("loops::rx: Couldn't read 4 bytes for tag"); + } + + let length = usize::try_from (u32::from_le_bytes (length))?; + if length >= 8_192 { + anyhow::anyhow! ("loops::rx: Length too big for UDP packets"); + } + + let mut buf = vec! [0u8; length]; + let bytes_read = tcp_read.read_exact (&mut buf).await?; + if length != bytes_read { + anyhow::anyhow! ("loops::rx: read_exact failed"); + } + buf.truncate (bytes_read); + + udp_sock.send (&buf).await?; + } +} + +pub async fn tx ( + udp_sock: Arc , + mut tcp_write: tcp::OwnedWriteHalf, +) -> anyhow::Result <()> +{ + loop { + let mut buf = vec! [0u8; 8_192]; + let bytes_read = udp_sock.recv (&mut buf).await?; + buf.truncate (bytes_read); + + let tag = [1u8, 0, 0, 0]; + let length = u32::try_from (bytes_read)?.to_le_bytes (); + + tcp_write.write_all (&tag).await?; + tcp_write.write_all (&length).await?; + tcp_write.write_all (&buf).await?; + } +} diff --git a/crates/udp_over_tcp/src/main.rs b/crates/udp_over_tcp/src/main.rs new file mode 100644 index 0000000..4018473 --- /dev/null +++ b/crates/udp_over_tcp/src/main.rs @@ -0,0 +1,54 @@ +/* + +To test manually, run this 3 commands: + +- Terminal A: `nc -l -u -p 9503` +- Terminal B: `cargo run -p udp_over_tcp` +- Terminal C: `nc -p 9500 -u 127.0.0.1 9501` + +Terminals A and C should be connected through the UDP-over-TCP connection + +*/ + +use tokio::{ + runtime, + spawn, +}; + +mod client; +mod loops; +mod server; + +// The ephemeral UDP port that the PTTH_QUIC client will bind +const PORT_0: u16 = 9500; + +// The well-known UDP port that the UDP-over-TCP client will bind +// The PTTH_QUIC client must connect to this instead of the real relay address +const PORT_1: u16 = 9501; + +// The well-known TCP port that the UDP-over-TCP server will bind +const PORT_2: u16 = 9502; + +// The well-known UDP port that the PTTH_QUIC relay will bind +const PORT_3: u16 = 9503; + +fn main () -> anyhow::Result <()> { + let rt = runtime::Runtime::new ()?; + + rt.block_on (async_main ())?; + + Ok (()) +} + +async fn async_main () -> anyhow::Result <()> { + tokio::select! { + _val = spawn (client::main ()) => { + println! ("Client exited, exiting"); + }, + _val = spawn (server::main ()) => { + println! ("Server exited, exiting"); + }, + } + + Ok (()) +} diff --git a/crates/udp_over_tcp/src/server.rs b/crates/udp_over_tcp/src/server.rs new file mode 100644 index 0000000..a35dea4 --- /dev/null +++ b/crates/udp_over_tcp/src/server.rs @@ -0,0 +1,55 @@ +use std::{ + net::{ + Ipv4Addr, + SocketAddrV4, + }, + sync::Arc, +}; + +use tokio::{ + net::{ + TcpListener, + TcpStream, + UdpSocket, + }, + spawn, +}; + +use crate::loops; + +pub async fn main () -> anyhow::Result <()> { + let tcp_listener = TcpListener::bind ((Ipv4Addr::UNSPECIFIED, crate::PORT_2)).await?; + + loop { + let (conn, _peer_addr) = tcp_listener.accept ().await?; + + spawn (handle_connection (conn)); + } +} + +async fn handle_connection (conn: TcpStream) -> anyhow::Result <()> { + let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?; + udp_sock.connect ((Ipv4Addr::LOCALHOST, crate::PORT_3)).await?; + + let (tcp_read, tcp_write) = conn.into_split (); + + let rx_task; + let tx_task; + + { + let udp_sock = Arc::new (udp_sock); + rx_task = spawn (loops::rx (Arc::clone (&udp_sock), tcp_read)); + tx_task = spawn (loops::tx (Arc::clone (&udp_sock), tcp_write)); + } + + tokio::select! { + _val = tx_task => { + println! ("server_handle_connection: tx_task exited, exiting"); + } + _val = rx_task => { + println! ("server_handle_connection: rx_task exited, exiting"); + } + } + + Ok (()) +}