From 4ba35ee7d1a3f5e8dd0b13073b3a17a6fe630826 Mon Sep 17 00:00:00 2001 From: _ <> Date: Sun, 18 Jul 2021 18:52:51 +0000 Subject: [PATCH] :recycle: refactor --- .../quic_demo/src/bin/quic_demo_end_server.rs | 105 ++++++++++-------- 1 file changed, 56 insertions(+), 49 deletions(-) diff --git a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs index da535d0..104153e 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs @@ -37,59 +37,66 @@ async fn main () -> anyhow::Result <()> { trace! ("Accepting bi streams from P3"); loop { - let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; + let (relay_send, relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; - tokio::spawn (async move { - let mut req_buf = [0, 0, 0, 0]; - relay_recv.read_exact (&mut req_buf).await?; - assert_eq! (req_buf [0], Command::CONNECT_P2_TO_P4_STEP_2.0); - - // TODO: Authorize P2 to connect to us - - let resp_buf = [ - Command::OKAY.0, - Command::CONNECT_P2_TO_P4_STEP_2.0, - 0, - 0, - ]; - relay_send.write_all (&resp_buf).await?; - - let mut req_buf = [0, 0, 0, 0]; - relay_recv.read_exact (&mut req_buf).await?; - assert_eq! (req_buf [0], Command::CONNECT_P2_TO_P5.0); - - // TODO: Authorize P2 to connect to P5 - - let resp_buf = [ - Command::OKAY.0, - Command::CONNECT_P2_TO_P5.0, - 0, - 0, - ]; - relay_send.write_all (&resp_buf).await?; - - debug! ("Started PTTH connection"); - - let stream = TcpStream::connect (("127.0.0.1", local_tcp_port)).await?; - let (local_recv, local_send) = stream.into_split (); - - trace! ("Relaying bytes..."); - - let ptth_conn = PtthNewConnection { - local_send, - local_recv, - relay_send, - relay_recv, - }.build (); - - ptth_conn.uplink_task.await??; - ptth_conn.downlink_task.await??; - - Ok::<_, anyhow::Error> (()) - }); + tokio::spawn (handle_p2_connection (relay_send, relay_recv, local_tcp_port)); } } +async fn handle_p2_connection ( + mut relay_send: quinn::SendStream, + mut relay_recv: quinn::RecvStream, + local_tcp_port: u16, +) -> anyhow::Result <()> +{ + let mut req_buf = [0, 0, 0, 0]; + relay_recv.read_exact (&mut req_buf).await?; + assert_eq! (req_buf [0], Command::CONNECT_P2_TO_P4_STEP_2.0); + + // TODO: Authorize P2 to connect to us + + let resp_buf = [ + Command::OKAY.0, + Command::CONNECT_P2_TO_P4_STEP_2.0, + 0, + 0, + ]; + relay_send.write_all (&resp_buf).await?; + + let mut req_buf = [0, 0, 0, 0]; + relay_recv.read_exact (&mut req_buf).await?; + assert_eq! (req_buf [0], Command::CONNECT_P2_TO_P5.0); + + // TODO: Authorize P2 to connect to P5 + + let resp_buf = [ + Command::OKAY.0, + Command::CONNECT_P2_TO_P5.0, + 0, + 0, + ]; + relay_send.write_all (&resp_buf).await?; + + debug! ("Started PTTH connection"); + + let stream = TcpStream::connect (("127.0.0.1", local_tcp_port)).await?; + let (local_recv, local_send) = stream.into_split (); + + trace! ("Relaying bytes..."); + + let ptth_conn = PtthNewConnection { + local_send, + local_recv, + relay_send, + relay_recv, + }.build (); + + ptth_conn.uplink_task.await??; + ptth_conn.downlink_task.await??; + + Ok (()) +} + struct PtthNewConnection { local_send: tokio::net::tcp::OwnedWriteHalf, local_recv: tokio::net::tcp::OwnedReadHalf,