diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index 275cc6b..bf01694 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -29,7 +29,7 @@ async fn main () -> anyhow::Result <()> { let listener = TcpListener::bind ("127.0.0.1:30381").await?; - debug! ("Accepting local TCP connections"); + debug! ("Accepting local TCP connections from P1"); loop { let (tcp_socket, _) = listener.accept ().await?; diff --git a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs index 25b6483..a34a5d7 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs @@ -26,62 +26,101 @@ async fn main () -> anyhow::Result <()> { let mut resp_buf = [0u8, 0, 0, 0]; recv.read_exact (&mut resp_buf).await?; - debug! ("Waiting for relay server to forward a bi stream"); + debug! ("Accepting bi streams from P3"); - let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; - - let mut req_buf = [0, 0, 0, 0]; - relay_recv.read_exact (&mut req_buf).await?; - - // TODO: Auth stuff - - debug! ("Connecting to TCP end server"); - - let stream = TcpStream::connect ("127.0.0.1:30382").await?; - let (mut local_recv, mut local_send) = stream.into_split (); - - let resp_buf = [20, 0, 0, 0]; - relay_send.write_all (&resp_buf).await?; - relay_send.write_all (&resp_buf).await?; - - debug! ("Relaying bytes..."); - - let uplink_task = tokio::spawn (async move { - // Downlink - Relay server to local client + loop { + let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = relay_recv.read (&mut buf).await? { - let buf_slice = &buf [0..bytes_read]; - debug! ("Uplink relaying {} bytes", bytes_read); - local_send.write_all (buf_slice).await?; - } - - debug! ("Downlink closed"); - - Ok::<_, anyhow::Error> (()) - }); - - let downlink_task = tokio::spawn (async move { - // Uplink - local client to relay server - - let mut buf = vec! [0u8; 65_536]; - loop { - let bytes_read = local_recv.read (&mut buf).await?; - if bytes_read == 0 { - break; - } - let buf_slice = &buf [0..bytes_read]; - debug! ("Downlink relaying {} bytes", bytes_read); - relay_send.write_all (buf_slice).await?; - } - - debug! ("Uplink closed"); - - Ok::<_, anyhow::Error> (()) - }); - - uplink_task.await??; - downlink_task.await??; - - Ok (()) + tokio::spawn (async move { + let mut req_buf = [0, 0, 0, 0]; + relay_recv.read_exact (&mut req_buf).await?; + + // TODO: Auth stuff + + debug! ("Connecting to TCP end server"); + + let stream = TcpStream::connect ("127.0.0.1:30382").await?; + let (local_recv, local_send) = stream.into_split (); + + let resp_buf = [20, 0, 0, 0]; + relay_send.write_all (&resp_buf).await?; + relay_send.write_all (&resp_buf).await?; + + debug! ("Relaying bytes..."); + + let ptth_conn = PtthNewConnection { + local_send, + local_recv, + relay_send, + relay_recv, + }.build (); + + ptth_conn.uplink_task.await??; + ptth_conn.downlink_task.await??; + + Ok::<_, anyhow::Error> (()) + }); + } +} + +struct PtthNewConnection { + local_send: tokio::net::tcp::OwnedWriteHalf, + local_recv: tokio::net::tcp::OwnedReadHalf, + relay_send: quinn::SendStream, + relay_recv: quinn::RecvStream, +} + +struct PtthConnection { + uplink_task: JoinHandle >, + downlink_task: JoinHandle >, +} + +impl PtthNewConnection { + fn build (self) -> PtthConnection { + let Self { + mut local_send, + mut local_recv, + mut relay_send, + mut relay_recv, + } = self; + + let uplink_task = tokio::spawn (async move { + // Downlink - Relay server to local client + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = relay_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + debug! ("Uplink relaying {} bytes", bytes_read); + local_send.write_all (buf_slice).await?; + } + + debug! ("Downlink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + let downlink_task = tokio::spawn (async move { + // Uplink - local client to relay server + + let mut buf = vec! [0u8; 65_536]; + loop { + let bytes_read = local_recv.read (&mut buf).await?; + if bytes_read == 0 { + break; + } + let buf_slice = &buf [0..bytes_read]; + debug! ("Downlink relaying {} bytes", bytes_read); + relay_send.write_all (buf_slice).await?; + } + + debug! ("Uplink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + PtthConnection { + uplink_task, + downlink_task, + } + } }