From c002665f6c0e5a3054aa9f341533133824185827 Mon Sep 17 00:00:00 2001 From: _ <> Date: Sun, 10 Oct 2021 17:14:32 +0000 Subject: [PATCH] :recycle: refactor: use the code from client_proxy instead --- .../quic_demo/src/bin/quic_demo_client.rs | 85 ++++++------------- prototypes/quic_demo/src/client_proxy.rs | 5 +- 2 files changed, 29 insertions(+), 61 deletions(-) diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index 41817e6..57e149c 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -1,10 +1,15 @@ use structopt::StructOpt; -use tokio::net::{ - TcpListener, - TcpStream, +use tokio::{ + sync::watch, }; -use quic_demo::prelude::*; +use quic_demo::{ + client_proxy::{ + ForwardingParams, + forward_port, + }, + prelude::*, +}; use protocol::PeerId; #[derive (Debug, StructOpt)] @@ -60,76 +65,36 @@ impl P2Client { .. } = protocol::p2_connect_to_p3 (&self.endpoint, &conf.relay_addr, &conf.client_id).await?; - let listener = TcpListener::bind (("127.0.0.1", conf.client_tcp_port)).await?; + let client_tcp_port = conf.client_tcp_port; - debug! ("Accepting local TCP connections from P1 at {}", conf.client_tcp_port); + debug! ("Accepting local TCP connections from P1 at {}", client_tcp_port); // End of per-port stuff // Beginning of per-connection stuff + let (_shutdown_flag_tx, shutdown_flag_rx) = watch::channel (true); + let task_tcp_server = { let connection = connection.clone (); - - tokio::spawn (async move { - Self::run_tcp_server (listener, connection, &*conf).await - }) - }; - - task_tcp_server.await??; - - Ok (()) - } - - /// Runs a TCP listen to forward a single TCP port - - async fn run_tcp_server ( - listener: TcpListener, - connection: quinn::Connection, - conf: &Config, - ) -> anyhow::Result <()> { - let running = true; - while running { - let (tcp_stream, _) = listener.accept ().await?; - let connection = connection.clone (); let server_id = conf.server_id.clone (); let server_tcp_port = conf.server_tcp_port; tokio::spawn (async move { - Self::run_tcp_stream (&connection, tcp_stream, &server_id, server_tcp_port).await?; + forward_port ( + connection, + ForwardingParams { + client_tcp_port, + server_id, + server_tcp_port, + }, + shutdown_flag_rx, + ).await?; Ok::<_, anyhow::Error> (()) - }); - } + }) + }; - Ok (()) - } - - /// Forwards a single TCP stream - - async fn run_tcp_stream ( - connection: &quinn::Connection, - tcp_stream: TcpStream, - server_id: &str, - server_tcp_port: u16, - ) -> anyhow::Result <()> { - let (local_recv, local_send) = tcp_stream.into_split (); - - debug! ("Starting PTTH connection"); - - let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (connection, server_id, server_tcp_port).await?; - - trace! ("Relaying bytes..."); - - let ptth_conn = quic_demo::connection::NewConnection { - local_send, - local_recv, - relay_send, - relay_recv, - }.build (); - - ptth_conn.wait_for_close ().await?; - - debug! ("Ended PTTH connection"); + task_tcp_server.await??; Ok (()) } diff --git a/prototypes/quic_demo/src/client_proxy.rs b/prototypes/quic_demo/src/client_proxy.rs index 0701ffc..8780efc 100644 --- a/prototypes/quic_demo/src/client_proxy.rs +++ b/prototypes/quic_demo/src/client_proxy.rs @@ -44,7 +44,10 @@ pub struct ForwardingParams { pub server_tcp_port: u16, } -async fn forward_port ( +/// Starts a TCP listener that can forward any number of TCP streams to +/// the same client:server port combination + +pub async fn forward_port ( connection_p2_p3: quinn::Connection, params: ForwardingParams, shutdown_flag_rx: tokio::sync::watch::Receiver ,