From a2d4ae81e0f2ab6a1080958ebf10cbefa6689c15 Mon Sep 17 00:00:00 2001 From: _ <> Date: Sun, 10 Oct 2021 15:19:48 +0000 Subject: [PATCH] :recycle: refactor --- .../quic_demo/src/executable_end_server.rs | 119 +++++++++++------- 1 file changed, 77 insertions(+), 42 deletions(-) diff --git a/prototypes/quic_demo/src/executable_end_server.rs b/prototypes/quic_demo/src/executable_end_server.rs index ebfffb9..3c1ee9f 100644 --- a/prototypes/quic_demo/src/executable_end_server.rs +++ b/prototypes/quic_demo/src/executable_end_server.rs @@ -4,6 +4,8 @@ use tokio::net::TcpStream; use crate::prelude::*; use protocol::PeerId; +/// A partially-filled-out config that structopt can deal with +/// Try to turn this into a Config as soon as possible. #[derive (Debug, StructOpt)] struct Opt { #[structopt (long)] @@ -17,65 +19,98 @@ struct Opt { } pub async fn main (args: &[OsString]) -> anyhow::Result <()> { - let opt = Arc::new (Opt::from_iter (args)); + let opt = Opt::from_iter (args); + let conf = opt.into_config ().await?; - let relay_cert: Vec = match opt.cert_url.as_ref () { - Some (url) => reqwest::get (url).await?.bytes ().await?.into_iter ().collect (), - None => tokio::fs::read ("quic_server.crt").await?, - }; - let relay_addr: SocketAddr = opt.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; + let mut end_server = P4EndServer::connect (conf).await?; - let id = opt.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ()); + end_server.run ().await?; + Ok (()) +} + +/// A filled-out config for constructing an end server +#[derive (Clone)] +pub struct Config { + pub debug_echo: bool, + pub id: String, + pub relay_addr: SocketAddr, + pub relay_cert: Vec , +} + +impl Opt { + /// Converts self into a Config that the server can use. + /// Performs I/O to load the relay cert from disk or from HTTP. + /// Fails if arguments can't be parsed or if I/O fails. - let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&relay_cert])?; - - trace! ("Connecting to relay server"); - - let quinn::NewConnection { - bi_streams, - .. - } = protocol::p4_connect_to_p3 (&endpoint, &relay_addr, &id).await?; - - let mut end_server = P4EndServer { - bi_streams, - id, - relay_addr, - relay_cert, - }; - - debug! ("Connected to relay server"); - trace! ("Accepting bi streams from P3"); - - loop { - let (relay_send, relay_recv) = end_server.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; + pub async fn into_config (self) -> anyhow::Result { + let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ()); - tokio::spawn (handle_bi_stream (Arc::clone (&opt), relay_send, relay_recv)); + let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; + + // Do I/O after all parsing is done. + // We don't want to waste a network request only to come back and error + // out on like "127.oooo.1" not parsing into a relay address. + + let relay_cert: Vec = match self.cert_url.as_ref () { + Some (url) => reqwest::get (url).await?.bytes ().await?.into_iter ().collect (), + None => tokio::fs::read ("quic_server.crt").await?, + }; + + Ok (Config { + debug_echo: self.debug_echo, + id, + relay_addr, + relay_cert, + }) } } +/// An end server that is connected to its relay server pub struct P4EndServer { bi_streams: quinn::IncomingBiStreams, - id: String, - relay_addr: SocketAddr, - relay_cert: Vec , + conf: Arc , } impl P4EndServer { - pub fn id (&self) -> &str { - &self.id + /// Tries to connect to a relay server and return an end server. + /// Fails if there's a network issue while making the QUIC connection + /// to the relay server. + + pub async fn connect (conf: Config) -> anyhow::Result { + trace! ("P4 end server making its QUIC endpoint"); + let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?; + + trace! ("P4 end server connecting to P3 relay server"); + let quinn::NewConnection { + bi_streams, + .. + } = protocol::p4_connect_to_p3 (&endpoint, &conf.relay_addr, &conf.id).await?; + + debug! ("Connected to relay server"); + + Ok (P4EndServer { + bi_streams, + conf: Arc::new (conf), + }) } - pub fn relay_addr (&self) -> SocketAddr { - self.relay_addr + pub fn config (&self) -> &Config { + &*self.conf } - pub fn relay_cert (&self) -> &[u8] { - &self.relay_cert + pub async fn run (&mut self) -> anyhow::Result <()> { + trace! ("Accepting bi streams from P3"); + + loop { + let (relay_send, relay_recv) = self.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; + + tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv)); + } } } async fn handle_bi_stream ( - opt: Arc , + conf: Arc , relay_send: quinn::SendStream, mut relay_recv: quinn::RecvStream, ) -> anyhow::Result <()> @@ -84,14 +119,14 @@ async fn handle_bi_stream ( protocol::P3ToP4Stream::NewPtthConnection { client_id, .. - } => handle_new_ptth_connection (opt, relay_send, relay_recv, client_id).await?, + } => handle_new_ptth_connection (conf, relay_send, relay_recv, client_id).await?, } Ok (()) } async fn handle_new_ptth_connection ( - opt: Arc , + conf: Arc , mut relay_send: quinn::SendStream, mut relay_recv: quinn::RecvStream, _client_id: String, @@ -108,7 +143,7 @@ async fn handle_new_ptth_connection ( debug! ("Started PTTH connection"); - if opt.debug_echo { + if conf.debug_echo { relay_send.write (b"Connected to P4=P5 debug echo server\n").await?; debug! ("Relaying bytes using internal debug echo server (P4=P5)"); tokio::io::copy (&mut relay_recv, &mut relay_send).await?;