From 69c4afe456c0baa6928502f62ca3432ed0f8e0e3 Mon Sep 17 00:00:00 2001 From: _ <> Date: Sun, 18 Jul 2021 18:58:59 +0000 Subject: [PATCH] :recycle: refactor: extract TCP-QUIC relay connection --- .../quic_demo/src/bin/quic_demo_client.rs | 67 +---------------- .../quic_demo/src/bin/quic_demo_end_server.rs | 69 +---------------- prototypes/quic_demo/src/connection.rs | 74 +++++++++++++++++++ prototypes/quic_demo/src/lib.rs | 1 + 4 files changed, 80 insertions(+), 131 deletions(-) create mode 100644 prototypes/quic_demo/src/connection.rs diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index eebeab6..fd60706 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -54,15 +54,14 @@ async fn main () -> anyhow::Result <()> { trace! ("Relaying bytes..."); - let ptth_conn = PtthNewConnection { + let ptth_conn = quic_demo::connection::NewConnection { local_send, local_recv, relay_send, relay_recv, }.build (); - ptth_conn.uplink_task.await??; - ptth_conn.downlink_task.await??; + ptth_conn.wait_for_close ().await?; debug! ("Ended PTTH connection"); @@ -70,65 +69,3 @@ async fn main () -> anyhow::Result <()> { }); } } - -struct PtthNewConnection { - local_send: tokio::net::tcp::OwnedWriteHalf, - local_recv: tokio::net::tcp::OwnedReadHalf, - relay_send: quinn::SendStream, - relay_recv: quinn::RecvStream, -} - -struct PtthConnection { - uplink_task: JoinHandle >, - downlink_task: JoinHandle >, -} - -impl PtthNewConnection { - fn build (self) -> PtthConnection { - let Self { - mut local_send, - mut local_recv, - mut relay_send, - mut relay_recv, - } = self; - - let uplink_task = tokio::spawn (async move { - // Uplink - local client to relay server - - let mut buf = vec! [0u8; 65_536]; - loop { - let bytes_read = local_recv.read (&mut buf).await?; - if bytes_read == 0 { - break; - } - let buf_slice = &buf [0..bytes_read]; - trace! ("Uplink relaying {} bytes", bytes_read); - relay_send.write_all (buf_slice).await?; - } - - trace! ("Uplink closed"); - - Ok::<_, anyhow::Error> (()) - }); - - let downlink_task = tokio::spawn (async move { - // Downlink - Relay server to local client - - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = relay_recv.read (&mut buf).await? { - let buf_slice = &buf [0..bytes_read]; - trace! ("Downlink relaying {} bytes", bytes_read); - local_send.write_all (buf_slice).await?; - } - - trace! ("Downlink closed"); - - Ok::<_, anyhow::Error> (()) - }); - - PtthConnection { - uplink_task, - downlink_task, - } - } -} diff --git a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs index 104153e..031a240 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs @@ -84,77 +84,14 @@ async fn handle_p2_connection ( trace! ("Relaying bytes..."); - let ptth_conn = PtthNewConnection { + let ptth_conn = quic_demo::connection::NewConnection { local_send, local_recv, relay_send, relay_recv, }.build (); - ptth_conn.uplink_task.await??; - ptth_conn.downlink_task.await??; - + ptth_conn.wait_for_close ().await?; + Ok (()) } - -struct PtthNewConnection { - local_send: tokio::net::tcp::OwnedWriteHalf, - local_recv: tokio::net::tcp::OwnedReadHalf, - relay_send: quinn::SendStream, - relay_recv: quinn::RecvStream, -} - -struct PtthConnection { - uplink_task: JoinHandle >, - downlink_task: JoinHandle >, -} - -impl PtthNewConnection { - fn build (self) -> PtthConnection { - let Self { - mut local_send, - mut local_recv, - mut relay_send, - mut relay_recv, - } = self; - - let uplink_task = tokio::spawn (async move { - // Downlink - Relay server to local client - - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = relay_recv.read (&mut buf).await? { - let buf_slice = &buf [0..bytes_read]; - trace! ("Uplink relaying {} bytes", bytes_read); - local_send.write_all (buf_slice).await?; - } - - trace! ("Downlink closed"); - - Ok::<_, anyhow::Error> (()) - }); - - let downlink_task = tokio::spawn (async move { - // Uplink - local client to relay server - - let mut buf = vec! [0u8; 65_536]; - loop { - let bytes_read = local_recv.read (&mut buf).await?; - if bytes_read == 0 { - break; - } - let buf_slice = &buf [0..bytes_read]; - trace! ("Downlink relaying {} bytes", bytes_read); - relay_send.write_all (buf_slice).await?; - } - - trace! ("Uplink closed"); - - Ok::<_, anyhow::Error> (()) - }); - - PtthConnection { - uplink_task, - downlink_task, - } - } -} diff --git a/prototypes/quic_demo/src/connection.rs b/prototypes/quic_demo/src/connection.rs new file mode 100644 index 0000000..1184175 --- /dev/null +++ b/prototypes/quic_demo/src/connection.rs @@ -0,0 +1,74 @@ +use crate::prelude::*; + +pub struct NewConnection { + pub local_send: tokio::net::tcp::OwnedWriteHalf, + pub local_recv: tokio::net::tcp::OwnedReadHalf, + pub relay_send: quinn::SendStream, + pub relay_recv: quinn::RecvStream, +} + +pub struct Connection { + // Blue and green because they're not necessarily uplink nor downlink. + // It depends on whether the client or server is using us. + + task_blue: JoinHandle >, + task_green: JoinHandle >, +} + +impl NewConnection { + pub fn build (self) -> Connection { + let Self { + mut local_send, + mut local_recv, + mut relay_send, + mut relay_recv, + } = self; + + let task_blue = tokio::spawn (async move { + // Downlink - Relay server to local client + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = relay_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + trace! ("Uplink relaying {} bytes", bytes_read); + local_send.write_all (buf_slice).await?; + } + + trace! ("Downlink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + let task_green = tokio::spawn (async move { + // Uplink - local client to relay server + + let mut buf = vec! [0u8; 65_536]; + loop { + let bytes_read = local_recv.read (&mut buf).await?; + if bytes_read == 0 { + break; + } + let buf_slice = &buf [0..bytes_read]; + trace! ("Downlink relaying {} bytes", bytes_read); + relay_send.write_all (buf_slice).await?; + } + + trace! ("Uplink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + Connection { + task_blue, + task_green, + } + } +} + +impl Connection { + pub async fn wait_for_close (self) -> anyhow::Result <()> { + self.task_blue.await??; + self.task_green.await??; + Ok (()) + } +} diff --git a/prototypes/quic_demo/src/lib.rs b/prototypes/quic_demo/src/lib.rs index 96c3681..83c96a2 100644 --- a/prototypes/quic_demo/src/lib.rs +++ b/prototypes/quic_demo/src/lib.rs @@ -1,3 +1,4 @@ +pub mod connection; pub mod prelude; pub mod protocol; pub mod quinn_utils;