From fe25ff3d34bd8e75750675c22141a241e5cb10d5 Mon Sep 17 00:00:00 2001 From: _ <> Date: Sat, 17 Jul 2021 20:02:16 +0000 Subject: [PATCH] :recycle: refactor --- .../quic_demo/src/bin/quic_demo_client.rs | 106 ++++++++++++------ .../quic_demo/src/bin/quic_demo_end_server.rs | 32 +++--- 2 files changed, 87 insertions(+), 51 deletions(-) diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index 511af4e..79fafcd 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -31,7 +31,7 @@ async fn main () -> anyhow::Result <()> { let listener = TcpListener::bind ("127.0.0.1:30381").await?; let (tcp_socket, _) = listener.accept ().await?; - let (mut local_recv, mut local_send) = tcp_socket.into_split (); + let (local_recv, local_send) = tcp_socket.into_split (); debug! ("Connecting to end server"); @@ -45,41 +45,77 @@ async fn main () -> anyhow::Result <()> { debug! ("Relaying bytes..."); - let uplink_task = tokio::spawn (async move { - // Uplink - local client to relay server - - let mut buf = vec! [0u8; 65_536]; - loop { - let bytes_read = local_recv.read (&mut buf).await?; - if bytes_read == 0 { - break; - } - let buf_slice = &buf [0..bytes_read]; - debug! ("Uplink relaying {} bytes", bytes_read); - relay_send.write_all (buf_slice).await?; - } - - debug! ("Uplink closed"); - - Ok::<_, anyhow::Error> (()) - }); + let ptth_conn = PtthNewConnection { + local_send, + local_recv, + relay_send, + relay_recv, + }.build (); - let downlink_task = tokio::spawn (async move { - // Downlink - Relay server to local client - - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = relay_recv.read (&mut buf).await? { - let buf_slice = &buf [0..bytes_read]; - debug! ("Downlink relaying {} bytes", bytes_read); - local_send.write_all (buf_slice).await?; - } - - debug! ("Downlink closed"); - - Ok::<_, anyhow::Error> (()) - }); + ptth_conn.uplink_task.await??; + ptth_conn.downlink_task.await??; - uplink_task.await??; - downlink_task.await??; Ok (()) } + +struct PtthNewConnection { + local_send: tokio::net::tcp::OwnedWriteHalf, + local_recv: tokio::net::tcp::OwnedReadHalf, + relay_send: quinn::SendStream, + relay_recv: quinn::RecvStream, +} + +struct PtthConnection { + uplink_task: JoinHandle >, + downlink_task: JoinHandle >, +} + +impl PtthNewConnection { + fn build (self) -> PtthConnection { + let Self { + mut local_send, + mut local_recv, + mut relay_send, + mut relay_recv, + } = self; + + let uplink_task = tokio::spawn (async move { + // Uplink - local client to relay server + + let mut buf = vec! [0u8; 65_536]; + loop { + let bytes_read = local_recv.read (&mut buf).await?; + if bytes_read == 0 { + break; + } + let buf_slice = &buf [0..bytes_read]; + debug! ("Uplink relaying {} bytes", bytes_read); + relay_send.write_all (buf_slice).await?; + } + + debug! ("Uplink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + let downlink_task = tokio::spawn (async move { + // Downlink - Relay server to local client + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = relay_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + debug! ("Downlink relaying {} bytes", bytes_read); + local_send.write_all (buf_slice).await?; + } + + debug! ("Downlink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + PtthConnection { + uplink_task, + downlink_task, + } + } +} diff --git a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs index 1ca5371..25b6483 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs @@ -47,6 +47,21 @@ async fn main () -> anyhow::Result <()> { debug! ("Relaying bytes..."); let uplink_task = tokio::spawn (async move { + // Downlink - Relay server to local client + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = relay_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + debug! ("Uplink relaying {} bytes", bytes_read); + local_send.write_all (buf_slice).await?; + } + + debug! ("Downlink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + let downlink_task = tokio::spawn (async move { // Uplink - local client to relay server let mut buf = vec! [0u8; 65_536]; @@ -56,7 +71,7 @@ async fn main () -> anyhow::Result <()> { break; } let buf_slice = &buf [0..bytes_read]; - debug! ("Uplink relaying {} bytes", bytes_read); + debug! ("Downlink relaying {} bytes", bytes_read); relay_send.write_all (buf_slice).await?; } @@ -65,21 +80,6 @@ async fn main () -> anyhow::Result <()> { Ok::<_, anyhow::Error> (()) }); - let downlink_task = tokio::spawn (async move { - // Downlink - Relay server to local client - - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = relay_recv.read (&mut buf).await? { - let buf_slice = &buf [0..bytes_read]; - debug! ("Downlink relaying {} bytes", bytes_read); - local_send.write_all (buf_slice).await?; - } - - debug! ("Downlink closed"); - - Ok::<_, anyhow::Error> (()) - }); - uplink_task.await??; downlink_task.await??;