diff --git a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs index fcf2743..f30b717 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs @@ -13,52 +13,63 @@ async fn main () -> anyhow::Result <()> { debug! ("Connecting to relay server"); let quinn::NewConnection { + connection, mut bi_streams, .. } = endpoint.connect (&server_addr, "localhost")?.await?; - debug! ("Waiting for relay server to forward a bi stream"); + let (mut send, mut recv) = connection.open_bi ().await?; - let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; + let req_buf = [4u8, 0, 0, 0]; + send.write_all (&req_buf).await?; - debug! ("Connecting to TCP end server"); + let mut resp_buf = [0u8, 0, 0, 0]; + recv.read_exact (&mut resp_buf).await?; - let stream = TcpStream::connect ("127.0.0.1:30382").await?; - let (mut local_recv, mut local_send) = stream.into_split (); - - debug! ("Relaying bytes..."); - - let uplink_task = tokio::spawn (async move { - // Uplink - local client to relay server + if false { + debug! ("Waiting for relay server to forward a bi stream"); - let mut buf = vec! [0u8; 65_536]; - loop { - let bytes_read = local_recv.read (&mut buf).await?; - let buf_slice = &buf [0..bytes_read]; - relay_send.write_all (buf_slice).await?; - } + let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; - debug! ("Uplink closed"); + debug! ("Connecting to TCP end server"); - Ok::<_, anyhow::Error> (()) - }); - - let downlink_task = tokio::spawn (async move { - // Downlink - Relay server to local client + let stream = TcpStream::connect ("127.0.0.1:30382").await?; + let (mut local_recv, mut local_send) = stream.into_split (); - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = relay_recv.read (&mut buf).await? { - let buf_slice = &buf [0..bytes_read]; - local_send.write_all (buf_slice).await?; - } + debug! ("Relaying bytes..."); - debug! ("Downlink closed"); + let uplink_task = tokio::spawn (async move { + // Uplink - local client to relay server + + let mut buf = vec! [0u8; 65_536]; + loop { + let bytes_read = local_recv.read (&mut buf).await?; + let buf_slice = &buf [0..bytes_read]; + relay_send.write_all (buf_slice).await?; + } + + debug! ("Uplink closed"); + + Ok::<_, anyhow::Error> (()) + }); - Ok::<_, anyhow::Error> (()) - }); - - uplink_task.await??; - downlink_task.await??; + let downlink_task = tokio::spawn (async move { + // Downlink - Relay server to local client + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = relay_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + local_send.write_all (buf_slice).await?; + } + + debug! ("Downlink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + uplink_task.await??; + downlink_task.await??; + } Ok (()) } diff --git a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs index 1e4431d..a7d4a10 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs @@ -1,5 +1,3 @@ -use tokio::task::JoinHandle; - use quic_demo::prelude::*; #[tokio::main] @@ -10,52 +8,99 @@ async fn main () -> anyhow::Result <()> { let (mut incoming, server_cert) = make_server_endpoint (server_addr)?; tokio::fs::write ("quic_server.crt", &server_cert).await?; - debug! ("Waiting for end server to connect"); + let relay_state = RelayState::default (); + let relay_state = Arc::new (Mutex::new (relay_state)); - let end_server_conn = incoming.next ().await.ok_or_else (|| anyhow::anyhow! ("No end server connection"))?; + while let Some (conn) = incoming.next ().await { + let relay_state = Arc::clone (&relay_state); + + // Each new connection gets its own task + tokio::spawn (async move { + let quinn::NewConnection { + connection, + mut bi_streams, + .. + } = conn.await?; + + // Everyone who connects must identify themselves with the first + // bi stream + // TODO: Timeout + + let (mut send, mut recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("QUIC client didn't identify itself"))??; + + let mut req_buf = [0u8; 4]; + recv.read_exact (&mut req_buf).await?; + + match req_buf [0] { + 4 => debug! ("Server-side proxy (P4) connected"), + 2 => debug! ("Client-side proxy (P2) connected"), + _ => bail! ("Unknown QUIC client type"), + } + + let resp_buf = [20u8, 0, 0, 0]; + send.write_all (&resp_buf).await?; + + Ok::<_, anyhow::Error> (()) + }); + } - let end_server_conn = end_server_conn.await?; - - let quinn::NewConnection { - connection: end_server_conn, - .. - } = end_server_conn; - - debug! ("Waiting for client to connect"); - - let client_conn = incoming.next ().await.ok_or_else (|| anyhow::anyhow! ("No client connection"))?; - - let client_conn = client_conn.await?; - - let quinn::NewConnection { - connection: _client_conn, - bi_streams: mut client_incoming_bi_streams, - .. - } = client_conn; - - debug! ("Waiting for client to open bi stream"); - - let (client_send, client_recv) = client_incoming_bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Client didn't open a bi stream"))??; - - debug! ("Opening bi stream to the end server"); - - let (server_send, server_recv) = end_server_conn.open_bi ().await?; - - debug! ("Relaying bytes..."); - - let ptth_conn = PtthNewConnection { - client_send, - client_recv, - server_send, - server_recv, - }.build (); - - ptth_conn.uplink_task.await??; - ptth_conn.downlink_task.await??; + if false { + debug! ("Waiting for end server to connect"); + + let end_server_conn = incoming.next ().await.ok_or_else (|| anyhow::anyhow! ("No end server connection"))?; + + let end_server_conn = end_server_conn.await?; + + let quinn::NewConnection { + connection: end_server_conn, + .. + } = end_server_conn; + + debug! ("Waiting for client to connect"); + + let client_conn = incoming.next ().await.ok_or_else (|| anyhow::anyhow! ("No client connection"))?; + + let client_conn = client_conn.await?; + + let quinn::NewConnection { + connection: _client_conn, + bi_streams: mut client_incoming_bi_streams, + .. + } = client_conn; + + debug! ("Waiting for client to open bi stream"); + + let (client_send, client_recv) = client_incoming_bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Client didn't open a bi stream"))??; + + debug! ("Opening bi stream to the end server"); + + let (server_send, server_recv) = end_server_conn.open_bi ().await?; + + debug! ("Relaying bytes..."); + + let ptth_conn = PtthNewConnection { + client_send, + client_recv, + server_send, + server_recv, + }.build (); + + ptth_conn.uplink_task.await??; + ptth_conn.downlink_task.await??; + } Ok (()) } +#[derive (Default)] +struct RelayState { + +} + +impl RelayState { + +} + struct PtthNewConnection { client_send: quinn::SendStream, client_recv: quinn::RecvStream, diff --git a/prototypes/quic_demo/src/prelude.rs b/prototypes/quic_demo/src/prelude.rs index 18d6d53..13562f4 100644 --- a/prototypes/quic_demo/src/prelude.rs +++ b/prototypes/quic_demo/src/prelude.rs @@ -1,4 +1,5 @@ pub use std::{ + sync::Arc, time::Duration, }; @@ -9,6 +10,8 @@ pub use tokio::{ AsyncReadExt, AsyncWriteExt, }, + sync::Mutex, + task::JoinHandle, }; pub use tracing::{ debug,