From b344e3f8ee4d5c5f3224ea3d67877e14914f23e8 Mon Sep 17 00:00:00 2001 From: _ <> Date: Sat, 17 Jul 2021 06:26:06 +0000 Subject: [PATCH] :recycle: refactor --- .../src/bin/quic_demo_relay_server.rs | 101 ++++++++++++------ 1 file changed, 68 insertions(+), 33 deletions(-) diff --git a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs index 01d934e..1e4431d 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs @@ -1,3 +1,5 @@ +use tokio::task::JoinHandle; + use quic_demo::prelude::*; #[tokio::main] @@ -33,46 +35,79 @@ async fn main () -> anyhow::Result <()> { debug! ("Waiting for client to open bi stream"); - let (mut client_send, mut client_recv) = client_incoming_bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Client didn't open a bi stream"))??; + let (client_send, client_recv) = client_incoming_bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Client didn't open a bi stream"))??; debug! ("Opening bi stream to the end server"); - let (mut server_send, mut server_recv) = end_server_conn.open_bi ().await?; + let (server_send, server_recv) = end_server_conn.open_bi ().await?; debug! ("Relaying bytes..."); - // Remember to swap tx and rx for patch cables + let ptth_conn = PtthNewConnection { + client_send, + client_recv, + server_send, + server_recv, + }.build (); - let uplink_task = tokio::spawn (async move { - // Uplink - Client to end server - - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = client_recv.read (&mut buf).await? { - let buf_slice = &buf [0..bytes_read]; - server_send.write_all (buf_slice).await?; - } - - debug! ("Uplink closed"); - - Ok::<_, anyhow::Error> (()) - }); - - let downlink_task = tokio::spawn (async move { - // Downlink - End server to client - - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = server_recv.read (&mut buf).await? { - let buf_slice = &buf [0..bytes_read]; - client_send.write_all (buf_slice).await?; - } - - debug! ("Downlink closed"); - - Ok::<_, anyhow::Error> (()) - }); - - uplink_task.await??; - downlink_task.await??; + ptth_conn.uplink_task.await??; + ptth_conn.downlink_task.await??; Ok (()) } + +struct PtthNewConnection { + client_send: quinn::SendStream, + client_recv: quinn::RecvStream, + server_send: quinn::SendStream, + server_recv: quinn::RecvStream, +} + +struct PtthConnection { + uplink_task: JoinHandle >, + downlink_task: JoinHandle >, +} + +impl PtthNewConnection { + fn build (self) -> PtthConnection { + let Self { + mut client_send, + mut client_recv, + mut server_send, + mut server_recv, + } = self; + + let uplink_task = tokio::spawn (async move { + // Uplink - Client to end server + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = client_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + server_send.write_all (buf_slice).await?; + } + + debug! ("Uplink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + let downlink_task = tokio::spawn (async move { + // Downlink - End server to client + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = server_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + client_send.write_all (buf_slice).await?; + } + + debug! ("Downlink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + PtthConnection { + uplink_task, + downlink_task, + } + } +}