From 97fc2c74d460dec50f9f7b5cf7a7082b3ee6bbb0 Mon Sep 17 00:00:00 2001 From: _ <> Date: Sat, 2 Oct 2021 17:39:03 +0000 Subject: [PATCH] :recycle: refactor: extract PTTH_QUIC end server to a module in the PTTH_QUIC lib --- .../quic_demo/src/bin/quic_demo_end_server.rs | 103 +---------------- .../quic_demo/src/executable_end_server.rs | 104 ++++++++++++++++++ prototypes/quic_demo/src/lib.rs | 1 + 3 files changed, 107 insertions(+), 101 deletions(-) create mode 100644 prototypes/quic_demo/src/executable_end_server.rs diff --git a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs index 5f28433..86c3cc8 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs @@ -1,107 +1,8 @@ -use structopt::StructOpt; -use tokio::net::TcpStream; - -use quic_demo::prelude::*; -use protocol::PeerId; - -#[derive (Debug, StructOpt)] -struct Opt { - #[structopt (long)] - relay_addr: Option , - #[structopt (long)] - server_id: Option , - #[structopt (long)] - debug_echo: bool, - #[structopt (long)] - cert_url: Option , -} - #[tokio::main] async fn main () -> anyhow::Result <()> { tracing_subscriber::fmt::init (); - let opt = Arc::new (Opt::from_args ()); + let args = std::env::args_os (); - let server_cert = match opt.cert_url.as_ref () { - Some (url) => reqwest::get (url).await?.bytes ().await?, - None => tokio::fs::read ("quic_server.crt").await?.into (), - }; - let relay_addr = opt.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; - let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?; - - trace! ("Connecting to relay server"); - - let server_id = opt.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ()); - - let quinn::NewConnection { - mut bi_streams, - .. - } = protocol::p4_connect_to_p3 (&endpoint, &relay_addr, &server_id).await?; - - debug! ("Connected to relay server"); - trace! ("Accepting bi streams from P3"); - - loop { - let (relay_send, relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; - - tokio::spawn (handle_bi_stream (Arc::clone (&opt), relay_send, relay_recv)); - } -} - -async fn handle_bi_stream ( - opt: Arc , - relay_send: quinn::SendStream, - mut relay_recv: quinn::RecvStream, -) -> anyhow::Result <()> -{ - match protocol::p4_accept_p3_stream (&mut relay_recv).await? { - protocol::P3ToP4Stream::NewPtthConnection { - client_id, - .. - } => handle_new_ptth_connection (opt, relay_send, relay_recv, client_id).await?, - } - - Ok (()) -} - -async fn handle_new_ptth_connection ( - opt: Arc , - mut relay_send: quinn::SendStream, - mut relay_recv: quinn::RecvStream, - _client_id: String, -) -> anyhow::Result <()> -{ - // TODO: Check authorization for P2 --> P4 - - protocol::p4_authorize_p2_connection (&mut relay_send).await?; - let p4_to_p5_req = protocol::p4_expect_p5_request (&mut relay_recv).await?; - - // TODO: Check authorization for P1 --> P5 - - protocol::p4_authorize_p1_connection (&mut relay_send).await?; - - debug! ("Started PTTH connection"); - - if opt.debug_echo { - relay_send.write (b"Connected to P4=P5 debug echo server\n").await?; - debug! ("Relaying bytes using internal debug echo server (P4=P5)"); - tokio::io::copy (&mut relay_recv, &mut relay_send).await?; - } - else { - let stream = TcpStream::connect (("127.0.0.1", p4_to_p5_req.port)).await?; - let (local_recv, local_send) = stream.into_split (); - - trace! ("Relaying bytes..."); - - let ptth_conn = quic_demo::connection::NewConnection { - local_send, - local_recv, - relay_send, - relay_recv, - }.build (); - - ptth_conn.wait_for_close ().await?; - } - - Ok (()) + quic_demo::executable_end_server::main (args).await } diff --git a/prototypes/quic_demo/src/executable_end_server.rs b/prototypes/quic_demo/src/executable_end_server.rs new file mode 100644 index 0000000..62804ac --- /dev/null +++ b/prototypes/quic_demo/src/executable_end_server.rs @@ -0,0 +1,104 @@ +use structopt::StructOpt; +use tokio::net::TcpStream; + +use crate::prelude::*; +use protocol::PeerId; + +#[derive (Debug, StructOpt)] +struct Opt { + #[structopt (long)] + relay_addr: Option , + #[structopt (long)] + server_id: Option , + #[structopt (long)] + debug_echo: bool, + #[structopt (long)] + cert_url: Option , +} + +pub async fn main (args: std::env::ArgsOs) -> anyhow::Result <()> { + let opt = Arc::new (Opt::from_iter (args)); + + let server_cert = match opt.cert_url.as_ref () { + Some (url) => reqwest::get (url).await?.bytes ().await?, + None => tokio::fs::read ("quic_server.crt").await?.into (), + }; + let relay_addr = opt.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; + let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?; + + trace! ("Connecting to relay server"); + + let server_id = opt.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ()); + + let quinn::NewConnection { + mut bi_streams, + .. + } = protocol::p4_connect_to_p3 (&endpoint, &relay_addr, &server_id).await?; + + debug! ("Connected to relay server"); + trace! ("Accepting bi streams from P3"); + + loop { + let (relay_send, relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; + + tokio::spawn (handle_bi_stream (Arc::clone (&opt), relay_send, relay_recv)); + } +} + +async fn handle_bi_stream ( + opt: Arc , + relay_send: quinn::SendStream, + mut relay_recv: quinn::RecvStream, +) -> anyhow::Result <()> +{ + match protocol::p4_accept_p3_stream (&mut relay_recv).await? { + protocol::P3ToP4Stream::NewPtthConnection { + client_id, + .. + } => handle_new_ptth_connection (opt, relay_send, relay_recv, client_id).await?, + } + + Ok (()) +} + +async fn handle_new_ptth_connection ( + opt: Arc , + mut relay_send: quinn::SendStream, + mut relay_recv: quinn::RecvStream, + _client_id: String, +) -> anyhow::Result <()> +{ + // TODO: Check authorization for P2 --> P4 + + protocol::p4_authorize_p2_connection (&mut relay_send).await?; + let p4_to_p5_req = protocol::p4_expect_p5_request (&mut relay_recv).await?; + + // TODO: Check authorization for P1 --> P5 + + protocol::p4_authorize_p1_connection (&mut relay_send).await?; + + debug! ("Started PTTH connection"); + + if opt.debug_echo { + relay_send.write (b"Connected to P4=P5 debug echo server\n").await?; + debug! ("Relaying bytes using internal debug echo server (P4=P5)"); + tokio::io::copy (&mut relay_recv, &mut relay_send).await?; + } + else { + let stream = TcpStream::connect (("127.0.0.1", p4_to_p5_req.port)).await?; + let (local_recv, local_send) = stream.into_split (); + + trace! ("Relaying bytes..."); + + let ptth_conn = crate::connection::NewConnection { + local_send, + local_recv, + relay_send, + relay_recv, + }.build (); + + ptth_conn.wait_for_close ().await?; + } + + Ok (()) +} diff --git a/prototypes/quic_demo/src/lib.rs b/prototypes/quic_demo/src/lib.rs index dcbd50a..d5f0aa0 100644 --- a/prototypes/quic_demo/src/lib.rs +++ b/prototypes/quic_demo/src/lib.rs @@ -1,5 +1,6 @@ pub mod client_proxy; pub mod connection; +pub mod executable_end_server; pub mod prelude; pub mod protocol; pub mod quinn_utils;