Compare commits

..

21 Commits

Author SHA1 Message Date
_ 1a775622cf 🐛 bug: fix compile bugs for tests 2021-10-10 18:24:35 +00:00
_ b21f8a7c50 ♻️ refactor: move `main` up 2021-10-10 18:20:08 +00:00
_ e4285ec17d 🐛 bug: fix crash when trying to open the same port twice 2021-10-10 18:17:50 +00:00
_ baa5044186 ♻️ refactor: remove redundant var 2021-10-10 18:12:36 +00:00
_ e8bb7ab098 🔊 improve error handling when opening / closing ports 2021-10-10 18:08:25 +00:00
_ de4da749f3 🐛 bug (ptth_quic_client_gui): display the correct number of forwarded ports 2021-10-10 18:00:20 +00:00
_ 11ee6292ba ♻️ refactor: extract GuiClient struct 2021-10-10 17:56:13 +00:00
_ 07fa3b97c3 update client GUI default cert path for easier testing 2021-10-10 17:39:51 +00:00
_ 68eb6f911b 🚨 clean up clippy warnings 2021-10-10 17:22:04 +00:00
_ c002665f6c ♻️ refactor: use the code from client_proxy instead 2021-10-10 17:14:32 +00:00
_ c2caeb405c ♻️ refactor 2021-10-10 16:58:12 +00:00
_ a3b62b012d ♻️ refactor 2021-10-10 16:56:12 +00:00
_ 6fbe35379b ♻️ refactor 2021-10-10 16:48:25 +00:00
_ 88fab23871 ♻️ refactor 2021-10-10 16:36:06 +00:00
_ a906472add ♻️ refactor: clean up client to match the end server style 2021-10-10 16:33:14 +00:00
_ fc6a9c9e1e fix P2 and P4 to use the default cert path and print the client port for easier testing 2021-10-10 16:23:21 +00:00
_ 2b60396a26 start adding graceful shutdown to the end server
I think it only works if there are no streams running. So, might want to
double-check this before it goes into prod
2021-10-10 16:18:02 +00:00
_ a2d4ae81e0 ♻️ refactor 2021-10-10 15:19:48 +00:00
_ 953254e550 ♻️ refactor: continue extracting end server struct 2021-10-10 14:49:14 +00:00
_ ee31d105c9 🚨 fix cargo check warnings 2021-10-10 14:40:18 +00:00
_ 35cc1d49b7 ♻️ refactor: begin extacting a struct for the end server 2021-10-10 14:36:01 +00:00
12 changed files with 407 additions and 156 deletions

14
Cargo.lock generated
View File

@ -93,9 +93,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.2.1" version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "blake3" name = "blake3"
@ -257,9 +257,9 @@ dependencies = [
[[package]] [[package]]
name = "ctrlc" name = "ctrlc"
version = "3.2.0" version = "3.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "377c9b002a72a0b2c1a18c62e2f3864bdfea4a015e3683a96e24aa45dd6c02d1" checksum = "a19c6cedffdc8c03a3346d723eb20bd85a13362bb96dc2ac000842c6381ec7bf"
dependencies = [ dependencies = [
"nix", "nix",
"winapi", "winapi",
@ -875,9 +875,9 @@ dependencies = [
[[package]] [[package]]
name = "nix" name = "nix"
version = "0.22.2" version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3bb9a13fa32bc5aeb64150cd3f32d6cf4c748f8f8a417cce5d2eb976a8370ba" checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"cc", "cc",
@ -1228,6 +1228,7 @@ name = "ptth_multi_call_server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"ctrlc",
"ptth_file_server", "ptth_file_server",
"ptth_server", "ptth_server",
"quic_demo", "quic_demo",
@ -1364,6 +1365,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
"ctrlc",
"futures-util", "futures-util",
"hyper", "hyper",
"quinn", "quinn",

View File

@ -7,6 +7,7 @@ license = "AGPL-3.0"
[dependencies] [dependencies]
anyhow = "1.0.38" anyhow = "1.0.38"
ctrlc = "3.2.1"
ptth_file_server = { path = "../ptth_file_server_bin" } ptth_file_server = { path = "../ptth_file_server_bin" }
ptth_server = { path = "../ptth_server" } ptth_server = { path = "../ptth_server" }
quic_demo = { path = "../../prototypes/quic_demo" } quic_demo = { path = "../../prototypes/quic_demo" }

View File

@ -3,6 +3,8 @@ use std::{
iter::FromIterator, iter::FromIterator,
}; };
use tokio::sync::watch;
#[derive (Clone, Copy, Debug, PartialEq)] #[derive (Clone, Copy, Debug, PartialEq)]
enum Subcommand { enum Subcommand {
PtthServer, PtthServer,
@ -10,6 +12,30 @@ enum Subcommand {
PtthQuicEndServer, PtthQuicEndServer,
} }
#[tokio::main]
async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let args = Vec::from_iter (std::env::args_os ());
let (subcommand, args) = parse_args (&args)?;
match subcommand {
Subcommand::PtthServer => ptth_server::executable::main (&args).await,
Subcommand::PtthFileServer => ptth_file_server::main (&args).await,
Subcommand::PtthQuicEndServer => {
let (shutdown_tx, shutdown_rx) = watch::channel (false);
ctrlc::set_handler (move || {
shutdown_tx.send (true).expect ("Couldn't forward Ctrl+C signal");
})?;
tracing::trace! ("Set Ctrl+C handler");
quic_demo::executable_end_server::main (&args, Some (shutdown_rx)).await?;
Ok (())
}
}
}
fn parse_subcommand (arg: &str) -> Option <Subcommand> fn parse_subcommand (arg: &str) -> Option <Subcommand>
{ {
use Subcommand::*; use Subcommand::*;
@ -58,20 +84,6 @@ fn parse_args (args: &[OsString]) -> anyhow::Result <(Subcommand, &[OsString])>
anyhow::bail! ("Subcommand must be either arg 0 (exe name) or arg 1") anyhow::bail! ("Subcommand must be either arg 0 (exe name) or arg 1")
} }
#[tokio::main]
async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let args = Vec::from_iter (std::env::args_os ());
let (subcommand, args) = parse_args (&args)?;
match subcommand {
Subcommand::PtthServer => ptth_server::executable::main (&args).await,
Subcommand::PtthFileServer => ptth_file_server::main (&args).await,
Subcommand::PtthQuicEndServer => quic_demo::executable_end_server::main (&args).await,
}
}
#[cfg (test)] #[cfg (test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -10,6 +10,7 @@ use fltk::{
window::Window window::Window
}; };
use structopt::StructOpt; use structopt::StructOpt;
use tokio::runtime::Runtime;
use quic_demo::{ use quic_demo::{
client_proxy::*, client_proxy::*,
@ -35,9 +36,69 @@ enum Message {
ClosePort (usize), ClosePort (usize),
} }
struct GuiClient <'a> {
rt: &'a Runtime,
frame_status: Frame,
forwarding_instances: Vec <Option <ForwardingInstance>>,
gui_ports: Vec <GuiPort>,
}
impl GuiClient <'_> {
pub fn open_port (
&mut self,
connection_p2_p3: quinn::Connection,
port_idx: usize,
) -> anyhow::Result <()>
{
let params = self.gui_ports [port_idx].get_params ()?;
let _guard = self.rt.enter ();
let forwarding_instance = self.rt.block_on (ForwardingInstance::new (
connection_p2_p3,
params,
))?;
self.forwarding_instances [port_idx].replace (forwarding_instance);
self.gui_ports [port_idx].set_forwarding (true);
self.sync_status ();
Ok (())
}
pub fn close_port (&mut self, port_idx: usize) -> anyhow::Result <()> {
if let Some (old_instance) = self.forwarding_instances [port_idx].take () {
self.rt.block_on (async {
old_instance.close ()
.await
.context ("closing ForwardingInstance")?;
Ok::<_, anyhow::Error> (())
})?;
}
self.gui_ports [port_idx].set_forwarding (false);
self.sync_status ();
Ok (())
}
fn open_ports (&self) -> usize {
self.forwarding_instances.iter ()
.map (|x| if x.is_some () { 1 } else { 0 })
.sum ()
}
fn sync_status (&mut self) {
let open_ports = self.open_ports ();
self.frame_status.set_label (&format! ("Forwarding {} ports", open_ports));
}
}
fn main () -> anyhow::Result <()> { fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init (); tracing_subscriber::fmt::init ();
let rt = tokio::runtime::Runtime::new ()?; let rt = Runtime::new ()?;
let opt = Opt::from_args (); let opt = Opt::from_args ();
@ -53,7 +114,7 @@ fn main () -> anyhow::Result <()> {
let mut x = margin; let mut x = margin;
let mut y = margin; let mut y = margin;
let mut frame_status = Frame::new (x, y, 800 - 20, h, "Forwarding 0 ports"); let frame_status = Frame::new (x, y, 800 - 20, h, "Forwarding 0 ports");
y += h + margin; y += h + margin;
x = margin; x = margin;
@ -87,18 +148,25 @@ fn main () -> anyhow::Result <()> {
// y += h + margin; // y += h + margin;
// x = margin; // x = margin;
let mut gui_ports = vec! [ let gui_ports = vec! [
gui_port_0, gui_port_0,
gui_port_1, gui_port_1,
gui_port_2, gui_port_2,
]; ];
let mut forwarding_instances = vec! [ let forwarding_instances = vec! [
None, None,
None, None,
None, None,
]; ];
let mut gui_client = GuiClient {
rt: &rt,
frame_status,
forwarding_instances,
gui_ports,
};
// y += h + margin; // y += h + margin;
wind.end (); wind.end ();
@ -107,7 +175,7 @@ fn main () -> anyhow::Result <()> {
let connection_p2_p3 = rt.block_on (async move { let connection_p2_p3 = rt.block_on (async move {
let server_cert = match opt.cert_url.as_ref () { let server_cert = match opt.cert_url.as_ref () {
Some (url) => reqwest::get (url).await?.bytes ().await?, Some (url) => reqwest::get (url).await?.bytes ().await?,
None => tokio::fs::read ("quic_server.crt").await?.into (), None => tokio::fs::read ("ptth_quic_output/quic_server.crt").await?.into (),
}; };
let relay_addr = opt.relay_addr let relay_addr = opt.relay_addr
@ -132,27 +200,13 @@ fn main () -> anyhow::Result <()> {
while app.wait () { while app.wait () {
match fltk_rx.recv () { match fltk_rx.recv () {
Some (Message::OpenPort (port_idx)) => { Some (Message::OpenPort (port_idx)) => {
if let Ok (params) = gui_ports [port_idx].get_params () { match gui_client.open_port (connection_p2_p3.clone (), port_idx) {
let connection_p2_p3 = connection_p2_p3.clone (); Err (e) => error! ("{:?}", e),
_ => (),
let _guard = rt.enter (); };
forwarding_instances [port_idx].replace (ForwardingInstance::new (
connection_p2_p3,
params,
));
gui_ports [port_idx].set_forwarding (true);
frame_status.set_label ("Forwarding 1 port");
}
}, },
Some (Message::ClosePort (port_idx)) => { Some (Message::ClosePort (port_idx)) => {
if let Some (old_instance) = forwarding_instances [port_idx].take () { gui_client.close_port (port_idx)?;
rt.block_on (old_instance.close ())?;
}
gui_ports [port_idx].set_forwarding (false);
frame_status.set_label ("Forwarding 0 ports");
}, },
None => (), None => (),
} }
@ -207,9 +261,9 @@ impl GuiPort {
input_server_id.set_value ("bogus_server"); input_server_id.set_value ("bogus_server");
input_server_port.set_value ("5900"); input_server_port.set_value ("5900");
but_open.set_trigger (CallbackTrigger::Changed); but_open.set_trigger (CallbackTrigger::Release);
but_open.emit (fltk_tx, Message::OpenPort (port_idx)); but_open.emit (fltk_tx, Message::OpenPort (port_idx));
but_close.set_trigger (CallbackTrigger::Changed); but_close.set_trigger (CallbackTrigger::Release);
but_close.emit (fltk_tx, Message::ClosePort (port_idx)); but_close.emit (fltk_tx, Message::ClosePort (port_idx));
set_active (&mut but_open, true); set_active (&mut but_open, true);

View File

@ -10,6 +10,7 @@ license = "AGPL-3.0"
[dependencies] [dependencies]
anyhow = "1.0.38" anyhow = "1.0.38"
base64 = "0.13.0" base64 = "0.13.0"
ctrlc = "3.2.1"
# fltk = "1.1.1" # fltk = "1.1.1"
futures-util = "0.3.9" futures-util = "0.3.9"
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] } hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] }

View File

@ -1,7 +1,15 @@
use structopt::StructOpt; use structopt::StructOpt;
use tokio::net::TcpListener; use tokio::{
sync::watch,
};
use quic_demo::prelude::*; use quic_demo::{
client_proxy::{
ForwardingParams,
forward_port,
},
prelude::*,
};
use protocol::PeerId; use protocol::PeerId;
#[derive (Debug, StructOpt)] #[derive (Debug, StructOpt)]
@ -23,68 +31,109 @@ async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init (); tracing_subscriber::fmt::init ();
let opt = Opt::from_args (); let opt = Opt::from_args ();
let conf = opt.into_config ().await?;
let server_cert = tokio::fs::read ("quic_server.crt").await?; let client = P2Client::connect (conf)?;
let relay_addr = opt.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; client.run ().await?;
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?;
debug! ("Connecting to relay server");
let client_id = opt.client_id.unwrap_or_else (|| "bogus_client".to_string ());
let quinn::NewConnection {
connection,
..
} = protocol::p2_connect_to_p3 (&endpoint, &relay_addr, &client_id).await?;
// End of per-client stuff
// Beginning of per-port stuff
let server_id = opt.server_id.unwrap_or_else (|| "bogus_server".to_string ());
let client_tcp_port = opt.client_tcp_port.unwrap_or (30381);
let server_tcp_port = opt.server_tcp_port.unwrap_or (30382);
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
// End of per-port stuff
// Beginning of per-connection stuff
let task_tcp_server = tokio::spawn (async move {
loop {
let (tcp_socket, _) = listener.accept ().await?;
let connection = connection.clone ();
let server_id = server_id.clone ();
tokio::spawn (async move {
let (local_recv, local_send) = tcp_socket.into_split ();
debug! ("Starting PTTH connection");
let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?;
trace! ("Relaying bytes...");
let ptth_conn = quic_demo::connection::NewConnection {
local_send,
local_recv,
relay_send,
relay_recv,
}.build ();
ptth_conn.wait_for_close ().await?;
debug! ("Ended PTTH connection");
Ok::<_, anyhow::Error> (())
});
}
Ok::<_, anyhow::Error> (())
});
debug! ("Accepting local TCP connections from P1");
task_tcp_server.await??;
Ok (()) Ok (())
} }
pub struct P2Client {
endpoint: quinn::Endpoint,
conf: Arc <Config>,
}
impl P2Client {
pub fn connect (conf: Config) -> anyhow::Result <Self> {
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
let conf = Arc::new (conf);
Ok (Self {
endpoint,
conf,
})
}
pub async fn run (&self) -> anyhow::Result <()> {
debug! ("P2 client connecting to P3 relay server");
let conf = Arc::clone (&self.conf);
let quinn::NewConnection {
connection,
..
} = protocol::p2_connect_to_p3 (&self.endpoint, &conf.relay_addr, &conf.client_id).await?;
let client_tcp_port = conf.client_tcp_port;
debug! ("Accepting local TCP connections from P1 at {}", client_tcp_port);
// End of per-port stuff
// Beginning of per-connection stuff
let (_shutdown_flag_tx, shutdown_flag_rx) = watch::channel (true);
let task_tcp_server = {
let connection = connection.clone ();
let server_id = conf.server_id.clone ();
let server_tcp_port = conf.server_tcp_port;
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
tokio::spawn (async move {
forward_port (
listener,
connection,
ForwardingParams {
client_tcp_port,
server_id,
server_tcp_port,
},
shutdown_flag_rx,
).await?;
Ok::<_, anyhow::Error> (())
})
};
task_tcp_server.await??;
Ok (())
}
}
/// A filled-out config for constructing a P2 client
#[derive (Clone)]
pub struct Config {
client_tcp_port: u16,
server_tcp_port: u16,
client_id: String,
server_id: String,
relay_addr: SocketAddr,
relay_cert: Vec <u8>,
}
impl Opt {
pub async fn into_config (self) -> anyhow::Result <Config> {
let client_tcp_port = self.client_tcp_port.unwrap_or (30381);
let server_tcp_port = self.server_tcp_port.unwrap_or (30382);
let client_id = self.client_id.unwrap_or_else (|| "bogus_client".to_string ());
let server_id = self.server_id.unwrap_or_else (|| "bogus_server".to_string ());
let relay_addr = self.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
// Begin I/O
let relay_cert = tokio::fs::read ("ptth_quic_output/quic_server.crt").await?;
Ok (Config {
client_tcp_port,
server_tcp_port,
client_id,
server_id,
relay_addr,
relay_cert,
})
}
}

View File

@ -2,11 +2,22 @@ use std::{
iter::FromIterator, iter::FromIterator,
}; };
use tokio::sync::watch;
use quic_demo::prelude::*;
#[tokio::main] #[tokio::main]
async fn main () -> anyhow::Result <()> { async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init (); tracing_subscriber::fmt::init ();
let args = Vec::from_iter (std::env::args_os ()); let args = Vec::from_iter (std::env::args_os ());
quic_demo::executable_end_server::main (&args).await let (shutdown_tx, shutdown_rx) = watch::channel (false);
ctrlc::set_handler (move || {
shutdown_tx.send (true).expect ("Couldn't forward Ctrl+C signal");
})?;
trace! ("Set Ctrl+C handler");
quic_demo::executable_end_server::main (&args, Some (shutdown_rx)).await
} }

View File

@ -10,6 +10,7 @@ use hyper::{
StatusCode, StatusCode,
}; };
use structopt::StructOpt; use structopt::StructOpt;
use tokio::sync::watch;
use quic_demo::prelude::*; use quic_demo::prelude::*;
use protocol::PeerId; use protocol::PeerId;
@ -56,6 +57,8 @@ async fn main () -> anyhow::Result <()> {
let tcp_port = 30382; let tcp_port = 30382;
let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?; let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?;
let (_running_tx, running_rx) = watch::channel (true);
let task_quic_server = { let task_quic_server = {
let relay_state = Arc::clone (&relay_state); let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move { tokio::spawn (async move {
@ -89,24 +92,22 @@ async fn main () -> anyhow::Result <()> {
let task_tcp_server = { let task_tcp_server = {
let relay_state = Arc::clone (&relay_state); let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move { tokio::spawn (async move {
loop { while *running_rx.borrow () {
let (tcp_socket, _) = tcp_listener.accept ().await?; let (tcp_socket, _) = tcp_listener.accept ().await?;
let server_id = "bogus_server".to_string ();
let relay_state = Arc::clone (&relay_state); let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move { tokio::spawn (async move {
let (client_recv, client_send) = tcp_socket.into_split (); let (_client_recv, _client_send) = tcp_socket.into_split ();
debug! ("Accepted direct TCP connection P1 --> P3"); debug! ("Accepted direct TCP connection P1 --> P3");
let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; let p4_server_proxies = relay_state.p4_server_proxies.lock ().await;
let p4 = match p4_server_proxies.get ("bogus_server") { let _p4 = match p4_server_proxies.get ("bogus_server") {
Some (x) => x, Some (x) => x,
None => bail! ("That server isn't connected"), None => bail! ("That server isn't connected"),
}; };
unimplemented! (); // unimplemented! ();
/* /*
p4.req_channel.send (RequestP2ToP4 { p4.req_channel.send (RequestP2ToP4 {
client_send, client_send,

View File

@ -12,28 +12,37 @@ pub struct ForwardingInstance {
} }
impl ForwardingInstance { impl ForwardingInstance {
pub fn new ( pub async fn new (
connection_p2_p3: quinn::Connection, connection_p2_p3: quinn::Connection,
params: ForwardingParams, params: ForwardingParams,
) -> Self ) -> anyhow::Result <Self>
{ {
let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true); let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true);
let listener = TcpListener::bind (("127.0.0.1", params.client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1 on {}", params.client_tcp_port);
let task = tokio::spawn (forward_port ( let task = tokio::spawn (forward_port (
listener,
connection_p2_p3, connection_p2_p3,
params, params,
shutdown_flag_rx shutdown_flag_rx
)); ));
Self { Ok (Self {
task, task,
shutdown_flag, shutdown_flag,
} })
} }
pub async fn close (self) -> anyhow::Result <()> { pub async fn close (self) -> anyhow::Result <()> {
self.shutdown_flag.send (false)?; match self.shutdown_flag.send (false) {
self.task.await??; Err (_) => warn! ("Trying to gracefully shutdown forwarding task but it appears to already be shut down"),
_ => (),
}
self.task.await
.context ("awaiting ForwardingInstance task")?
.context ("inside ForwardingInstance task")?;
Ok (()) Ok (())
} }
} }
@ -44,10 +53,14 @@ pub struct ForwardingParams {
pub server_tcp_port: u16, pub server_tcp_port: u16,
} }
async fn forward_port ( /// Starts a TCP listener that can forward any number of TCP streams to
/// the same client:server port combination
pub async fn forward_port (
listener: TcpListener,
connection_p2_p3: quinn::Connection, connection_p2_p3: quinn::Connection,
params: ForwardingParams, params: ForwardingParams,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>, mut shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()> ) -> anyhow::Result <()>
{ {
let ForwardingParams { let ForwardingParams {
@ -56,13 +69,7 @@ async fn forward_port (
server_tcp_port, server_tcp_port,
} = params; } = params;
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
while *shutdown_flag_rx.borrow () { while *shutdown_flag_rx.borrow () {
let mut shutdown_flag_rx_2 = shutdown_flag_rx.clone ();
tokio::select! { tokio::select! {
x = listener.accept () => { x = listener.accept () => {
let (tcp_socket, _) = x?; let (tcp_socket, _) = x?;
@ -72,7 +79,7 @@ async fn forward_port (
tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx)); tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx));
}, },
_ = shutdown_flag_rx_2.changed () => (), _ = shutdown_flag_rx.changed () => (),
}; };
} }

View File

@ -1,9 +1,14 @@
use structopt::StructOpt; use structopt::StructOpt;
use tokio::net::TcpStream; use tokio::{
net::TcpStream,
sync::watch,
};
use crate::prelude::*; use crate::prelude::*;
use protocol::PeerId; use protocol::PeerId;
/// A partially-filled-out config that structopt can deal with
/// Try to turn this into a Config as soon as possible.
#[derive (Debug, StructOpt)] #[derive (Debug, StructOpt)]
struct Opt { struct Opt {
#[structopt (long)] #[structopt (long)]
@ -16,37 +21,145 @@ struct Opt {
cert_url: Option <String>, cert_url: Option <String>,
} }
pub async fn main (args: &[OsString]) -> anyhow::Result <()> { pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool>>) -> anyhow::Result <()> {
let opt = Arc::new (Opt::from_iter (args)); let opt = Opt::from_iter (args);
let conf = opt.into_config ().await?;
let server_cert = match opt.cert_url.as_ref () { let end_server = Arc::new (P4EndServer::connect (conf)?);
Some (url) => reqwest::get (url).await?.bytes ().await?,
None => tokio::fs::read ("quic_server.crt").await?.into (), let run_task = {
let end_server = Arc::clone (&end_server);
tokio::spawn (async move {
end_server.run ().await?;
Ok::<_, anyhow::Error> (())
})
}; };
let relay_addr = opt.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?;
trace! ("Connecting to relay server"); if let Some (mut shutdown_rx) = shutdown_rx {
while ! *shutdown_rx.borrow () {
shutdown_rx.changed ().await?;
}
end_server.shut_down ()?;
}
let server_id = opt.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ()); run_task.await??;
let quinn::NewConnection { trace! ("P4 end server shut down gracefully.");
mut bi_streams,
..
} = protocol::p4_connect_to_p3 (&endpoint, &relay_addr, &server_id).await?;
debug! ("Connected to relay server"); Ok (())
trace! ("Accepting bi streams from P3"); }
loop { /// A filled-out config for constructing an end server
let (relay_send, relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; #[derive (Clone)]
pub struct Config {
pub debug_echo: bool,
pub id: String,
pub relay_addr: SocketAddr,
pub relay_cert: Vec <u8>,
}
tokio::spawn (handle_bi_stream (Arc::clone (&opt), relay_send, relay_recv)); impl Opt {
/// Converts self into a Config that the server can use.
/// Performs I/O to load the relay cert from disk or from HTTP.
/// Fails if arguments can't be parsed or if I/O fails.
pub async fn into_config (self) -> anyhow::Result <Config> {
let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ());
let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
// Do I/O after all parsing is done.
// We don't want to waste a network request only to come back and error
// out on like "127.oooo.1" not parsing into a relay address.
let relay_cert: Vec <u8> = match self.cert_url.as_ref () {
Some (url) => reqwest::get (url).await?.bytes ().await?.into_iter ().collect (),
None => tokio::fs::read ("ptth_quic_output/quic_server.crt").await?,
};
Ok (Config {
debug_echo: self.debug_echo,
id,
relay_addr,
relay_cert,
})
}
}
pub struct P4EndServer {
endpoint: quinn::Endpoint,
conf: Arc <Config>,
shutdown_tx: watch::Sender <bool>,
shutdown_rx: watch::Receiver <bool>,
}
impl P4EndServer {
pub fn connect (conf: Config) -> anyhow::Result <Self> {
trace! ("P4 end server making its QUIC endpoint");
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
let (shutdown_tx, shutdown_rx) = watch::channel (false);
Ok (P4EndServer {
conf: Arc::new (conf),
endpoint,
shutdown_tx,
shutdown_rx,
})
}
pub fn config (&self) -> &Config {
&*self.conf
}
pub async fn run (&self) -> anyhow::Result <()> {
trace! ("P4 end server connecting to P3 relay server");
let quinn::NewConnection {
mut bi_streams,
..
} = protocol::p4_connect_to_p3 (
&self.endpoint,
&self.conf.relay_addr,
&self.conf.id
).await?;
debug! ("Connected to relay server");
trace! ("Accepting bi streams from P3");
let mut shutdown_rx = self.shutdown_rx.clone ();
loop {
tokio::select! {
_ = shutdown_rx.changed () => {
if *shutdown_rx.borrow () {
trace! ("P4 incoming bi streams task caught graceful shutdown");
break;
}
}
stream_opt = bi_streams.next () => {
let (relay_send, relay_recv) = stream_opt.ok_or_else (|| anyhow::anyhow! ("P4 ran out of incoming streams. Maybe P3 shut down or disconnected?"))??;
tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv));
}
};
}
Ok (())
}
pub fn shut_down (&self) -> anyhow::Result <()> {
trace! ("P4 end server shutting down...");
Ok (self.shutdown_tx.send (true)?)
}
pub fn shutting_down (&self) -> bool {
*self.shutdown_rx.borrow ()
} }
} }
async fn handle_bi_stream ( async fn handle_bi_stream (
opt: Arc <Opt>, conf: Arc <Config>,
relay_send: quinn::SendStream, relay_send: quinn::SendStream,
mut relay_recv: quinn::RecvStream, mut relay_recv: quinn::RecvStream,
) -> anyhow::Result <()> ) -> anyhow::Result <()>
@ -55,14 +168,14 @@ async fn handle_bi_stream (
protocol::P3ToP4Stream::NewPtthConnection { protocol::P3ToP4Stream::NewPtthConnection {
client_id, client_id,
.. ..
} => handle_new_ptth_connection (opt, relay_send, relay_recv, client_id).await?, } => handle_new_ptth_connection (conf, relay_send, relay_recv, client_id).await?,
} }
Ok (()) Ok (())
} }
async fn handle_new_ptth_connection ( async fn handle_new_ptth_connection (
opt: Arc <Opt>, conf: Arc <Config>,
mut relay_send: quinn::SendStream, mut relay_send: quinn::SendStream,
mut relay_recv: quinn::RecvStream, mut relay_recv: quinn::RecvStream,
_client_id: String, _client_id: String,
@ -79,7 +192,7 @@ async fn handle_new_ptth_connection (
debug! ("Started PTTH connection"); debug! ("Started PTTH connection");
if opt.debug_echo { if conf.debug_echo {
relay_send.write (b"Connected to P4=P5 debug echo server\n").await?; relay_send.write (b"Connected to P4=P5 debug echo server\n").await?;
debug! ("Relaying bytes using internal debug echo server (P4=P5)"); debug! ("Relaying bytes using internal debug echo server (P4=P5)");
tokio::io::copy (&mut relay_recv, &mut relay_send).await?; tokio::io::copy (&mut relay_recv, &mut relay_send).await?;

View File

@ -61,7 +61,7 @@ pub async fn p2_connect_to_p5 (
let cmd_type = Command::CONNECT_P2_TO_P4.0; let cmd_type = Command::CONNECT_P2_TO_P4.0;
send.write_all (&[cmd_type, 0, 0, 0]).await?; send.write_all (&[cmd_type, 0, 0, 0]).await?;
send_lv_string (&mut send, &server_id).await?; send_lv_string (&mut send, server_id).await?;
expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await
.context ("P2 didn't get OK response when asking P3 to connect P2 to P4")?; .context ("P2 didn't get OK response when asking P3 to connect P2 to P4")?;

View File

@ -58,7 +58,7 @@ pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Incoming,
fn configure_client(server_certs: &[&[u8]]) -> anyhow::Result<ClientConfig> { fn configure_client(server_certs: &[&[u8]]) -> anyhow::Result<ClientConfig> {
let mut cfg_builder = ClientConfigBuilder::default(); let mut cfg_builder = ClientConfigBuilder::default();
for cert in server_certs { for cert in server_certs {
cfg_builder.add_certificate_authority(Certificate::from_der(&cert)?)?; cfg_builder.add_certificate_authority(Certificate::from_der(cert)?)?;
} }
Ok(cfg_builder.build()) Ok(cfg_builder.build())
} }