From 73d75512157dfb7d4d50cc57de944b014004d41c Mon Sep 17 00:00:00 2001 From: _ <> Date: Sat, 17 Jul 2021 03:11:01 +0000 Subject: [PATCH] modify end server to connect to a local TCP server Tested with netcat on each end and it works great. --- prototypes/quic_demo/README.md | 14 ++++++ .../quic_demo/src/bin/quic_demo_client.rs | 2 + .../quic_demo/src/bin/quic_demo_end_server.rs | 48 ++++++++++++++++--- prototypes/quic_demo/src/prelude.rs | 1 + prototypes/quic_demo/src/quinn_utils.rs | 21 +++++--- 5 files changed, 72 insertions(+), 14 deletions(-) create mode 100644 prototypes/quic_demo/README.md diff --git a/prototypes/quic_demo/README.md b/prototypes/quic_demo/README.md new file mode 100644 index 0000000..1e73d04 --- /dev/null +++ b/prototypes/quic_demo/README.md @@ -0,0 +1,14 @@ +# Testing + + + +There are 5 processes, so you'll need 5 terminal windows or screen / tmux +sessions: + +1. TCP end server: `nc -l -p 30382` +2. QUIC relay server: `RUST_LOG=quic_demo_relay_server=debug cargo run --bin quic_demo_relay_server` +3. Server-side adapter: `RUST_LOG=quic_demo_end_server=debug cargo run --bin quic_demo_end_server` +4. Client-side adapter: `RUST_LOG=quic_demo_client cargo run --bin quic_demo_client` +5. TCP end client: `nc 127.0.0.1 30381` + +The netcat processes from steps 1 and 5 should now be connected to each other. diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index 5a7e765..67006c9 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -29,6 +29,8 @@ async fn main () -> anyhow::Result <()> { let (mut relay_send, mut relay_recv) = connection.open_bi ().await?; + debug! ("Relaying bytes..."); + let uplink_task = tokio::spawn (async move { // Uplink - local client to relay server diff --git a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs index 61ff175..fd63e82 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs @@ -1,9 +1,15 @@ +use tokio::net::TcpStream; + use quic_demo::prelude::*; #[tokio::main] async fn main () -> anyhow::Result <()> { tracing_subscriber::fmt::init (); + let stream = TcpStream::connect ("127.0.0.1:30382").await?; + + let (mut local_recv, mut local_send) = stream.into_split (); + let server_cert = tokio::fs::read ("quic_server.crt").await?; let server_addr = "127.0.0.1:30380".parse ()?; let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?; @@ -15,15 +21,43 @@ async fn main () -> anyhow::Result <()> { .. } = endpoint.connect (&server_addr, "localhost")?.await?; - debug! ("Waiting for incoming bi stream"); + debug! ("Waiting for relay server to forward a bi stream"); - let (mut send, mut recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; + let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = recv.read (&mut buf).await? { - let s = format! ("bytes_read: {}", bytes_read); - send.write_all (s.as_bytes ()).await?; - } + debug! ("Relaying bytes..."); + + let uplink_task = tokio::spawn (async move { + // Uplink - local client to relay server + + let mut buf = vec! [0u8; 65_536]; + loop { + let bytes_read = local_recv.read (&mut buf).await?; + let buf_slice = &buf [0..bytes_read]; + relay_send.write_all (buf_slice).await?; + } + + debug! ("Uplink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + let downlink_task = tokio::spawn (async move { + // Downlink - Relay server to local client + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = relay_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + local_send.write_all (buf_slice).await?; + } + + debug! ("Downlink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + uplink_task.await??; + downlink_task.await??; Ok (()) } diff --git a/prototypes/quic_demo/src/prelude.rs b/prototypes/quic_demo/src/prelude.rs index 7ef2577..18d6d53 100644 --- a/prototypes/quic_demo/src/prelude.rs +++ b/prototypes/quic_demo/src/prelude.rs @@ -2,6 +2,7 @@ pub use std::{ time::Duration, }; +pub use anyhow::bail; pub use futures_util::StreamExt; pub use tokio::{ io::{ diff --git a/prototypes/quic_demo/src/quinn_utils.rs b/prototypes/quic_demo/src/quinn_utils.rs index 08486f1..c99effc 100644 --- a/prototypes/quic_demo/src/quinn_utils.rs +++ b/prototypes/quic_demo/src/quinn_utils.rs @@ -1,14 +1,16 @@ // I'm not sure where I got this module from, but it's probably from the // quinn examples, so the license should be okay. +use std::{ + net::SocketAddr, + sync::Arc, + time::Duration, +}; + use quinn::{ Certificate, CertificateChain, ClientConfig, ClientConfigBuilder, Endpoint, Incoming, PrivateKey, ServerConfig, ServerConfigBuilder, TransportConfig, }; -use std::{ - net::SocketAddr, - sync::Arc, -}; /// Constructs a QUIC endpoint configured for use a client only. /// @@ -20,9 +22,14 @@ pub fn make_client_endpoint( bind_addr: SocketAddr, server_certs: &[&[u8]], ) -> anyhow::Result { - let client_cfg = configure_client(server_certs)?; - let mut endpoint_builder = Endpoint::builder(); - endpoint_builder.default_client_config(client_cfg); + let mut client_cfg = configure_client (server_certs)?; + let mut transport = quinn::TransportConfig::default (); + transport.keep_alive_interval (Some (Duration::from_millis (5_000))); + + client_cfg.transport = Arc::new (transport); + + let mut endpoint_builder = Endpoint::builder (); + endpoint_builder.default_client_config (client_cfg); let (endpoint, _incoming) = endpoint_builder.bind(&bind_addr)?; Ok(endpoint) }