From a3b62b012d4a0e72ae94ff32cabf2f938e666aeb Mon Sep 17 00:00:00 2001 From: _ <> Date: Sun, 10 Oct 2021 16:56:12 +0000 Subject: [PATCH] :recycle: refactor --- .../quic_demo/src/bin/quic_demo_client.rs | 115 ++++++++++++------ 1 file changed, 81 insertions(+), 34 deletions(-) diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index 64e252d..20bd5c1 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -1,5 +1,8 @@ use structopt::StructOpt; -use tokio::net::TcpListener; +use tokio::net::{ + TcpListener, + TcpStream, +}; use quic_demo::prelude::*; use protocol::PeerId; @@ -64,45 +67,89 @@ impl P2Client { // End of per-port stuff // Beginning of per-connection stuff - let task_tcp_server = tokio::spawn (async move { - let running = true; - while running { - let (tcp_socket, _) = listener.accept ().await?; - let connection = connection.clone (); - let server_id = conf.server_id.clone (); - let server_tcp_port = conf.server_tcp_port; - - tokio::spawn (async move { - let (local_recv, local_send) = tcp_socket.into_split (); - - debug! ("Starting PTTH connection"); - - let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?; - - trace! ("Relaying bytes..."); - - let ptth_conn = quic_demo::connection::NewConnection { - local_send, - local_recv, - relay_send, - relay_recv, - }.build (); - - ptth_conn.wait_for_close ().await?; - - debug! ("Ended PTTH connection"); - - Ok::<_, anyhow::Error> (()) - }); - } + let task_tcp_server = { + let connection = connection.clone (); - Ok::<_, anyhow::Error> (()) - }); + tokio::spawn (async move { + Self::run_tcp_server (listener, connection, &*conf).await + }) + }; task_tcp_server.await??; Ok (()) } + + /// Runs a TCP listen to forward a single TCP port + + async fn run_tcp_server ( + listener: TcpListener, + connection: quinn::Connection, + conf: &Config, + ) -> anyhow::Result <()> { + let running = true; + while running { + let (tcp_socket, _) = listener.accept ().await?; + let connection = connection.clone (); + let server_id = conf.server_id.clone (); + let server_tcp_port = conf.server_tcp_port; + + tokio::spawn (async move { + let (local_recv, local_send) = tcp_socket.into_split (); + + debug! ("Starting PTTH connection"); + + let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?; + + trace! ("Relaying bytes..."); + + let ptth_conn = quic_demo::connection::NewConnection { + local_send, + local_recv, + relay_send, + relay_recv, + }.build (); + + ptth_conn.wait_for_close ().await?; + + debug! ("Ended PTTH connection"); + + Ok::<_, anyhow::Error> (()) + }); + } + + Ok (()) + } + + /// Forwards a single TCP stream + + async fn run_tcp_stream ( + connection: quinn::Connection, + tcp_socket: TcpStream, + server_id: &str, + server_tcp_port: u16, + ) -> anyhow::Result <()> { + let (local_recv, local_send) = tcp_socket.into_split (); + + debug! ("Starting PTTH connection"); + + let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, server_id, server_tcp_port).await?; + + trace! ("Relaying bytes..."); + + let ptth_conn = quic_demo::connection::NewConnection { + local_send, + local_recv, + relay_send, + relay_recv, + }.build (); + + ptth_conn.wait_for_close ().await?; + + debug! ("Ended PTTH connection"); + + Ok (()) + } } /// A filled-out config for constructing a P2 client