➕ start adding graceful shutdown to the end server
I think it only works if there are no streams running. So, might want to double-check this before it goes into prodmain
parent
a2d4ae81e0
commit
2b60396a26
|
@ -93,9 +93,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bitflags"
|
name = "bitflags"
|
||||||
version = "1.2.1"
|
version = "1.3.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
|
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "blake3"
|
name = "blake3"
|
||||||
|
@ -257,9 +257,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ctrlc"
|
name = "ctrlc"
|
||||||
version = "3.2.0"
|
version = "3.2.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "377c9b002a72a0b2c1a18c62e2f3864bdfea4a015e3683a96e24aa45dd6c02d1"
|
checksum = "a19c6cedffdc8c03a3346d723eb20bd85a13362bb96dc2ac000842c6381ec7bf"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"nix",
|
"nix",
|
||||||
"winapi",
|
"winapi",
|
||||||
|
@ -875,9 +875,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nix"
|
name = "nix"
|
||||||
version = "0.22.2"
|
version = "0.23.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d3bb9a13fa32bc5aeb64150cd3f32d6cf4c748f8f8a417cce5d2eb976a8370ba"
|
checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags",
|
||||||
"cc",
|
"cc",
|
||||||
|
@ -1364,6 +1364,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"base64",
|
"base64",
|
||||||
|
"ctrlc",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"hyper",
|
"hyper",
|
||||||
"quinn",
|
"quinn",
|
||||||
|
|
|
@ -10,6 +10,7 @@ license = "AGPL-3.0"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.38"
|
anyhow = "1.0.38"
|
||||||
base64 = "0.13.0"
|
base64 = "0.13.0"
|
||||||
|
ctrlc = "3.2.1"
|
||||||
# fltk = "1.1.1"
|
# fltk = "1.1.1"
|
||||||
futures-util = "0.3.9"
|
futures-util = "0.3.9"
|
||||||
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] }
|
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] }
|
||||||
|
|
|
@ -2,11 +2,22 @@ use std::{
|
||||||
iter::FromIterator,
|
iter::FromIterator,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
use quic_demo::prelude::*;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main () -> anyhow::Result <()> {
|
async fn main () -> anyhow::Result <()> {
|
||||||
tracing_subscriber::fmt::init ();
|
tracing_subscriber::fmt::init ();
|
||||||
|
|
||||||
let args = Vec::from_iter (std::env::args_os ());
|
let args = Vec::from_iter (std::env::args_os ());
|
||||||
|
|
||||||
quic_demo::executable_end_server::main (&args).await
|
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
||||||
|
|
||||||
|
ctrlc::set_handler (move || {
|
||||||
|
shutdown_tx.send (true).expect ("Couldn't forward Ctrl+C signal");
|
||||||
|
})?;
|
||||||
|
trace! ("Set Ctrl+C handler");
|
||||||
|
|
||||||
|
quic_demo::executable_end_server::main (&args, Some (shutdown_rx)).await
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
use tokio::net::TcpStream;
|
use tokio::{
|
||||||
|
net::TcpStream,
|
||||||
|
sync::watch,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use protocol::PeerId;
|
use protocol::PeerId;
|
||||||
|
@ -18,13 +21,31 @@ struct Opt {
|
||||||
cert_url: Option <String>,
|
cert_url: Option <String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn main (args: &[OsString]) -> anyhow::Result <()> {
|
pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool>>) -> anyhow::Result <()> {
|
||||||
let opt = Opt::from_iter (args);
|
let opt = Opt::from_iter (args);
|
||||||
let conf = opt.into_config ().await?;
|
let conf = opt.into_config ().await?;
|
||||||
|
|
||||||
let mut end_server = P4EndServer::connect (conf).await?;
|
let end_server = Arc::new (P4EndServer::connect (conf).await?);
|
||||||
|
|
||||||
|
let run_task = {
|
||||||
|
let end_server = Arc::clone (&end_server);
|
||||||
|
tokio::spawn (async move {
|
||||||
end_server.run ().await?;
|
end_server.run ().await?;
|
||||||
|
Ok::<_, anyhow::Error> (())
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some (mut shutdown_rx) = shutdown_rx {
|
||||||
|
while ! *shutdown_rx.borrow () {
|
||||||
|
shutdown_rx.changed ().await?;
|
||||||
|
}
|
||||||
|
end_server.shut_down ()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
run_task.await??;
|
||||||
|
|
||||||
|
trace! ("P4 end server shut down gracefully.");
|
||||||
|
|
||||||
Ok (())
|
Ok (())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,30 +88,24 @@ impl Opt {
|
||||||
|
|
||||||
/// An end server that is connected to its relay server
|
/// An end server that is connected to its relay server
|
||||||
pub struct P4EndServer {
|
pub struct P4EndServer {
|
||||||
bi_streams: quinn::IncomingBiStreams,
|
endpoint: quinn::Endpoint,
|
||||||
conf: Arc <Config>,
|
conf: Arc <Config>,
|
||||||
|
shutdown_tx: watch::Sender <bool>,
|
||||||
|
shutdown_rx: watch::Receiver <bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl P4EndServer {
|
impl P4EndServer {
|
||||||
/// Tries to connect to a relay server and return an end server.
|
|
||||||
/// Fails if there's a network issue while making the QUIC connection
|
|
||||||
/// to the relay server.
|
|
||||||
|
|
||||||
pub async fn connect (conf: Config) -> anyhow::Result <Self> {
|
pub async fn connect (conf: Config) -> anyhow::Result <Self> {
|
||||||
trace! ("P4 end server making its QUIC endpoint");
|
trace! ("P4 end server making its QUIC endpoint");
|
||||||
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
|
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
|
||||||
|
|
||||||
trace! ("P4 end server connecting to P3 relay server");
|
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
||||||
let quinn::NewConnection {
|
|
||||||
bi_streams,
|
|
||||||
..
|
|
||||||
} = protocol::p4_connect_to_p3 (&endpoint, &conf.relay_addr, &conf.id).await?;
|
|
||||||
|
|
||||||
debug! ("Connected to relay server");
|
|
||||||
|
|
||||||
Ok (P4EndServer {
|
Ok (P4EndServer {
|
||||||
bi_streams,
|
|
||||||
conf: Arc::new (conf),
|
conf: Arc::new (conf),
|
||||||
|
endpoint,
|
||||||
|
shutdown_tx,
|
||||||
|
shutdown_rx,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,14 +113,49 @@ impl P4EndServer {
|
||||||
&*self.conf
|
&*self.conf
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run (&mut self) -> anyhow::Result <()> {
|
pub async fn run (&self) -> anyhow::Result <()> {
|
||||||
|
trace! ("P4 end server connecting to P3 relay server");
|
||||||
|
let quinn::NewConnection {
|
||||||
|
mut bi_streams,
|
||||||
|
..
|
||||||
|
} = protocol::p4_connect_to_p3 (
|
||||||
|
&self.endpoint,
|
||||||
|
&self.conf.relay_addr,
|
||||||
|
&self.conf.id
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
debug! ("Connected to relay server");
|
||||||
|
|
||||||
trace! ("Accepting bi streams from P3");
|
trace! ("Accepting bi streams from P3");
|
||||||
|
|
||||||
|
let mut shutdown_rx = self.shutdown_rx.clone ();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (relay_send, relay_recv) = self.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??;
|
tokio::select! {
|
||||||
|
_ = shutdown_rx.changed () => {
|
||||||
|
if *shutdown_rx.borrow () {
|
||||||
|
trace! ("P4 incoming bi streams task caught graceful shutdown");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stream_opt = bi_streams.next () => {
|
||||||
|
let (relay_send, relay_recv) = stream_opt.ok_or_else (|| anyhow::anyhow! ("P4 ran out of incoming streams. Maybe P3 shut down or disconnected?"))??;
|
||||||
|
|
||||||
tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv));
|
tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv));
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn shut_down (&self) -> anyhow::Result <()> {
|
||||||
|
trace! ("P4 end server shutting down...");
|
||||||
|
Ok (self.shutdown_tx.send (true)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn shutting_down (&self) -> bool {
|
||||||
|
*self.shutdown_rx.borrow ()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue