diff --git a/Cargo.lock b/Cargo.lock index 52fbe6c..baec8a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,9 +93,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" [[package]] name = "bitflags" -version = "1.2.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "blake3" @@ -257,9 +257,9 @@ dependencies = [ [[package]] name = "ctrlc" -version = "3.2.0" +version = "3.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "377c9b002a72a0b2c1a18c62e2f3864bdfea4a015e3683a96e24aa45dd6c02d1" +checksum = "a19c6cedffdc8c03a3346d723eb20bd85a13362bb96dc2ac000842c6381ec7bf" dependencies = [ "nix", "winapi", @@ -875,9 +875,9 @@ dependencies = [ [[package]] name = "nix" -version = "0.22.2" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3bb9a13fa32bc5aeb64150cd3f32d6cf4c748f8f8a417cce5d2eb976a8370ba" +checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188" dependencies = [ "bitflags", "cc", @@ -1364,6 +1364,7 @@ version = "0.1.0" dependencies = [ "anyhow", "base64", + "ctrlc", "futures-util", "hyper", "quinn", diff --git a/prototypes/quic_demo/Cargo.toml b/prototypes/quic_demo/Cargo.toml index d10125b..9e591f3 100644 --- a/prototypes/quic_demo/Cargo.toml +++ b/prototypes/quic_demo/Cargo.toml @@ -10,6 +10,7 @@ license = "AGPL-3.0" [dependencies] anyhow = "1.0.38" base64 = "0.13.0" +ctrlc = "3.2.1" # fltk = "1.1.1" futures-util = "0.3.9" hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] } diff --git a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs index f2d9f75..6453666 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs @@ -2,11 +2,22 @@ use std::{ iter::FromIterator, }; +use tokio::sync::watch; + +use quic_demo::prelude::*; + #[tokio::main] async fn main () -> anyhow::Result <()> { tracing_subscriber::fmt::init (); let args = Vec::from_iter (std::env::args_os ()); - quic_demo::executable_end_server::main (&args).await + let (shutdown_tx, shutdown_rx) = watch::channel (false); + + ctrlc::set_handler (move || { + shutdown_tx.send (true).expect ("Couldn't forward Ctrl+C signal"); + })?; + trace! ("Set Ctrl+C handler"); + + quic_demo::executable_end_server::main (&args, Some (shutdown_rx)).await } diff --git a/prototypes/quic_demo/src/executable_end_server.rs b/prototypes/quic_demo/src/executable_end_server.rs index 3c1ee9f..08a31a4 100644 --- a/prototypes/quic_demo/src/executable_end_server.rs +++ b/prototypes/quic_demo/src/executable_end_server.rs @@ -1,5 +1,8 @@ use structopt::StructOpt; -use tokio::net::TcpStream; +use tokio::{ + net::TcpStream, + sync::watch, +}; use crate::prelude::*; use protocol::PeerId; @@ -18,13 +21,31 @@ struct Opt { cert_url: Option , } -pub async fn main (args: &[OsString]) -> anyhow::Result <()> { +pub async fn main (args: &[OsString], shutdown_rx: Option >) -> anyhow::Result <()> { let opt = Opt::from_iter (args); let conf = opt.into_config ().await?; - let mut end_server = P4EndServer::connect (conf).await?; + let end_server = Arc::new (P4EndServer::connect (conf).await?); + + let run_task = { + let end_server = Arc::clone (&end_server); + tokio::spawn (async move { + end_server.run ().await?; + Ok::<_, anyhow::Error> (()) + }) + }; + + if let Some (mut shutdown_rx) = shutdown_rx { + while ! *shutdown_rx.borrow () { + shutdown_rx.changed ().await?; + } + end_server.shut_down ()?; + } + + run_task.await??; + + trace! ("P4 end server shut down gracefully."); - end_server.run ().await?; Ok (()) } @@ -67,30 +88,24 @@ impl Opt { /// An end server that is connected to its relay server pub struct P4EndServer { - bi_streams: quinn::IncomingBiStreams, + endpoint: quinn::Endpoint, conf: Arc , + shutdown_tx: watch::Sender , + shutdown_rx: watch::Receiver , } impl P4EndServer { - /// Tries to connect to a relay server and return an end server. - /// Fails if there's a network issue while making the QUIC connection - /// to the relay server. - pub async fn connect (conf: Config) -> anyhow::Result { trace! ("P4 end server making its QUIC endpoint"); let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?; - trace! ("P4 end server connecting to P3 relay server"); - let quinn::NewConnection { - bi_streams, - .. - } = protocol::p4_connect_to_p3 (&endpoint, &conf.relay_addr, &conf.id).await?; - - debug! ("Connected to relay server"); + let (shutdown_tx, shutdown_rx) = watch::channel (false); Ok (P4EndServer { - bi_streams, conf: Arc::new (conf), + endpoint, + shutdown_tx, + shutdown_rx, }) } @@ -98,14 +113,49 @@ impl P4EndServer { &*self.conf } - pub async fn run (&mut self) -> anyhow::Result <()> { + pub async fn run (&self) -> anyhow::Result <()> { + trace! ("P4 end server connecting to P3 relay server"); + let quinn::NewConnection { + mut bi_streams, + .. + } = protocol::p4_connect_to_p3 ( + &self.endpoint, + &self.conf.relay_addr, + &self.conf.id + ).await?; + + debug! ("Connected to relay server"); + trace! ("Accepting bi streams from P3"); + let mut shutdown_rx = self.shutdown_rx.clone (); + loop { - let (relay_send, relay_recv) = self.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; - - tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv)); + tokio::select! { + _ = shutdown_rx.changed () => { + if *shutdown_rx.borrow () { + trace! ("P4 incoming bi streams task caught graceful shutdown"); + break; + } + } + stream_opt = bi_streams.next () => { + let (relay_send, relay_recv) = stream_opt.ok_or_else (|| anyhow::anyhow! ("P4 ran out of incoming streams. Maybe P3 shut down or disconnected?"))??; + + tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv)); + } + }; } + + Ok (()) + } + + pub fn shut_down (&self) -> anyhow::Result <()> { + trace! ("P4 end server shutting down..."); + Ok (self.shutdown_tx.send (true)?) + } + + pub fn shutting_down (&self) -> bool { + *self.shutdown_rx.borrow () } }