From c4cd8cf1aafcd694ffcb672fd4aae646a1b2824c Mon Sep 17 00:00:00 2001 From: _ <> Date: Sat, 17 Jul 2021 02:39:08 +0000 Subject: [PATCH] modify client to accept connections from a local TCP client --- .../quic_demo/src/bin/quic_demo_client.rs | 54 ++++++++++++++----- prototypes/quic_demo/src/prelude.rs | 6 +++ 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index 86fa115..5a7e765 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -1,3 +1,7 @@ +use tokio::{ + net::TcpListener, +}; + use quic_demo::prelude::*; #[tokio::main] @@ -15,21 +19,47 @@ async fn main () -> anyhow::Result <()> { .. } = endpoint.connect (&server_addr, "localhost")?.await?; + debug! ("Waiting for local TCP client to connect to us"); + + let listener = TcpListener::bind ("127.0.0.1:30381").await?; + let (tcp_socket, _) = listener.accept ().await?; + let (mut local_recv, mut local_send) = tcp_socket.into_split (); + debug! ("Connecting to end server"); - let (mut send, mut recv) = connection.open_bi ().await?; + let (mut relay_send, mut relay_recv) = connection.open_bi ().await?; - let mut buf = vec! [0u8; 65_536]; - let mut send_interval = tokio::time::interval (Duration::from_millis (1000)); - // send_interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); - loop { - send_interval.tick ().await; + let uplink_task = tokio::spawn (async move { + // Uplink - local client to relay server - send.write_all (b"Hi!\n").await?; - let bytes_read = recv.read (&mut buf).await?.ok_or_else (|| anyhow::anyhow! ("Server or relay closed connection"))?; - let buf_slice = &buf [0..bytes_read]; + let mut buf = vec! [0u8; 65_536]; + loop { + let bytes_read = local_recv.read (&mut buf).await?; + let buf_slice = &buf [0..bytes_read]; + relay_send.write_all (buf_slice).await?; + } - let s = std::str::from_utf8 (buf_slice)?; - println! ("Received: `{}`", s); - } + debug! ("Uplink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + let downlink_task = tokio::spawn (async move { + // Downlink - Relay server to local client + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = relay_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + local_send.write_all (buf_slice).await?; + } + + debug! ("Downlink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + uplink_task.await??; + downlink_task.await??; + + Ok (()) } diff --git a/prototypes/quic_demo/src/prelude.rs b/prototypes/quic_demo/src/prelude.rs index 4aece1b..7ef2577 100644 --- a/prototypes/quic_demo/src/prelude.rs +++ b/prototypes/quic_demo/src/prelude.rs @@ -3,6 +3,12 @@ pub use std::{ }; pub use futures_util::StreamExt; +pub use tokio::{ + io::{ + AsyncReadExt, + AsyncWriteExt, + }, +}; pub use tracing::{ debug, error,