Compare commits
21 Commits
61a74c29a6
...
1a775622cf
Author | SHA1 | Date |
---|---|---|
_ | 1a775622cf | |
_ | b21f8a7c50 | |
_ | e4285ec17d | |
_ | baa5044186 | |
_ | e8bb7ab098 | |
_ | de4da749f3 | |
_ | 11ee6292ba | |
_ | 07fa3b97c3 | |
_ | 68eb6f911b | |
_ | c002665f6c | |
_ | c2caeb405c | |
_ | a3b62b012d | |
_ | 6fbe35379b | |
_ | 88fab23871 | |
_ | a906472add | |
_ | fc6a9c9e1e | |
_ | 2b60396a26 | |
_ | a2d4ae81e0 | |
_ | 953254e550 | |
_ | ee31d105c9 | |
_ | 35cc1d49b7 |
|
@ -93,9 +93,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bitflags"
|
name = "bitflags"
|
||||||
version = "1.2.1"
|
version = "1.3.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
|
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "blake3"
|
name = "blake3"
|
||||||
|
@ -257,9 +257,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ctrlc"
|
name = "ctrlc"
|
||||||
version = "3.2.0"
|
version = "3.2.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "377c9b002a72a0b2c1a18c62e2f3864bdfea4a015e3683a96e24aa45dd6c02d1"
|
checksum = "a19c6cedffdc8c03a3346d723eb20bd85a13362bb96dc2ac000842c6381ec7bf"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"nix",
|
"nix",
|
||||||
"winapi",
|
"winapi",
|
||||||
|
@ -875,9 +875,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nix"
|
name = "nix"
|
||||||
version = "0.22.2"
|
version = "0.23.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d3bb9a13fa32bc5aeb64150cd3f32d6cf4c748f8f8a417cce5d2eb976a8370ba"
|
checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags",
|
||||||
"cc",
|
"cc",
|
||||||
|
@ -1228,6 +1228,7 @@ name = "ptth_multi_call_server"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"ctrlc",
|
||||||
"ptth_file_server",
|
"ptth_file_server",
|
||||||
"ptth_server",
|
"ptth_server",
|
||||||
"quic_demo",
|
"quic_demo",
|
||||||
|
@ -1364,6 +1365,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"base64",
|
"base64",
|
||||||
|
"ctrlc",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"hyper",
|
"hyper",
|
||||||
"quinn",
|
"quinn",
|
||||||
|
|
|
@ -7,6 +7,7 @@ license = "AGPL-3.0"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.38"
|
anyhow = "1.0.38"
|
||||||
|
ctrlc = "3.2.1"
|
||||||
ptth_file_server = { path = "../ptth_file_server_bin" }
|
ptth_file_server = { path = "../ptth_file_server_bin" }
|
||||||
ptth_server = { path = "../ptth_server" }
|
ptth_server = { path = "../ptth_server" }
|
||||||
quic_demo = { path = "../../prototypes/quic_demo" }
|
quic_demo = { path = "../../prototypes/quic_demo" }
|
||||||
|
|
|
@ -3,6 +3,8 @@ use std::{
|
||||||
iter::FromIterator,
|
iter::FromIterator,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
#[derive (Clone, Copy, Debug, PartialEq)]
|
#[derive (Clone, Copy, Debug, PartialEq)]
|
||||||
enum Subcommand {
|
enum Subcommand {
|
||||||
PtthServer,
|
PtthServer,
|
||||||
|
@ -10,6 +12,30 @@ enum Subcommand {
|
||||||
PtthQuicEndServer,
|
PtthQuicEndServer,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main () -> anyhow::Result <()> {
|
||||||
|
tracing_subscriber::fmt::init ();
|
||||||
|
|
||||||
|
let args = Vec::from_iter (std::env::args_os ());
|
||||||
|
|
||||||
|
let (subcommand, args) = parse_args (&args)?;
|
||||||
|
match subcommand {
|
||||||
|
Subcommand::PtthServer => ptth_server::executable::main (&args).await,
|
||||||
|
Subcommand::PtthFileServer => ptth_file_server::main (&args).await,
|
||||||
|
Subcommand::PtthQuicEndServer => {
|
||||||
|
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
||||||
|
|
||||||
|
ctrlc::set_handler (move || {
|
||||||
|
shutdown_tx.send (true).expect ("Couldn't forward Ctrl+C signal");
|
||||||
|
})?;
|
||||||
|
tracing::trace! ("Set Ctrl+C handler");
|
||||||
|
quic_demo::executable_end_server::main (&args, Some (shutdown_rx)).await?;
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn parse_subcommand (arg: &str) -> Option <Subcommand>
|
fn parse_subcommand (arg: &str) -> Option <Subcommand>
|
||||||
{
|
{
|
||||||
use Subcommand::*;
|
use Subcommand::*;
|
||||||
|
@ -58,20 +84,6 @@ fn parse_args (args: &[OsString]) -> anyhow::Result <(Subcommand, &[OsString])>
|
||||||
anyhow::bail! ("Subcommand must be either arg 0 (exe name) or arg 1")
|
anyhow::bail! ("Subcommand must be either arg 0 (exe name) or arg 1")
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main () -> anyhow::Result <()> {
|
|
||||||
tracing_subscriber::fmt::init ();
|
|
||||||
|
|
||||||
let args = Vec::from_iter (std::env::args_os ());
|
|
||||||
|
|
||||||
let (subcommand, args) = parse_args (&args)?;
|
|
||||||
match subcommand {
|
|
||||||
Subcommand::PtthServer => ptth_server::executable::main (&args).await,
|
|
||||||
Subcommand::PtthFileServer => ptth_file_server::main (&args).await,
|
|
||||||
Subcommand::PtthQuicEndServer => quic_demo::executable_end_server::main (&args).await,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg (test)]
|
#[cfg (test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
|
@ -10,6 +10,7 @@ use fltk::{
|
||||||
window::Window
|
window::Window
|
||||||
};
|
};
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
use tokio::runtime::Runtime;
|
||||||
|
|
||||||
use quic_demo::{
|
use quic_demo::{
|
||||||
client_proxy::*,
|
client_proxy::*,
|
||||||
|
@ -35,9 +36,69 @@ enum Message {
|
||||||
ClosePort (usize),
|
ClosePort (usize),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct GuiClient <'a> {
|
||||||
|
rt: &'a Runtime,
|
||||||
|
frame_status: Frame,
|
||||||
|
forwarding_instances: Vec <Option <ForwardingInstance>>,
|
||||||
|
gui_ports: Vec <GuiPort>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GuiClient <'_> {
|
||||||
|
pub fn open_port (
|
||||||
|
&mut self,
|
||||||
|
connection_p2_p3: quinn::Connection,
|
||||||
|
port_idx: usize,
|
||||||
|
) -> anyhow::Result <()>
|
||||||
|
{
|
||||||
|
let params = self.gui_ports [port_idx].get_params ()?;
|
||||||
|
|
||||||
|
let _guard = self.rt.enter ();
|
||||||
|
let forwarding_instance = self.rt.block_on (ForwardingInstance::new (
|
||||||
|
connection_p2_p3,
|
||||||
|
params,
|
||||||
|
))?;
|
||||||
|
|
||||||
|
self.forwarding_instances [port_idx].replace (forwarding_instance);
|
||||||
|
|
||||||
|
self.gui_ports [port_idx].set_forwarding (true);
|
||||||
|
self.sync_status ();
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn close_port (&mut self, port_idx: usize) -> anyhow::Result <()> {
|
||||||
|
if let Some (old_instance) = self.forwarding_instances [port_idx].take () {
|
||||||
|
self.rt.block_on (async {
|
||||||
|
old_instance.close ()
|
||||||
|
.await
|
||||||
|
.context ("closing ForwardingInstance")?;
|
||||||
|
|
||||||
|
Ok::<_, anyhow::Error> (())
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.gui_ports [port_idx].set_forwarding (false);
|
||||||
|
self.sync_status ();
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_ports (&self) -> usize {
|
||||||
|
self.forwarding_instances.iter ()
|
||||||
|
.map (|x| if x.is_some () { 1 } else { 0 })
|
||||||
|
.sum ()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sync_status (&mut self) {
|
||||||
|
let open_ports = self.open_ports ();
|
||||||
|
|
||||||
|
self.frame_status.set_label (&format! ("Forwarding {} ports", open_ports));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn main () -> anyhow::Result <()> {
|
fn main () -> anyhow::Result <()> {
|
||||||
tracing_subscriber::fmt::init ();
|
tracing_subscriber::fmt::init ();
|
||||||
let rt = tokio::runtime::Runtime::new ()?;
|
let rt = Runtime::new ()?;
|
||||||
|
|
||||||
let opt = Opt::from_args ();
|
let opt = Opt::from_args ();
|
||||||
|
|
||||||
|
@ -53,7 +114,7 @@ fn main () -> anyhow::Result <()> {
|
||||||
let mut x = margin;
|
let mut x = margin;
|
||||||
let mut y = margin;
|
let mut y = margin;
|
||||||
|
|
||||||
let mut frame_status = Frame::new (x, y, 800 - 20, h, "Forwarding 0 ports");
|
let frame_status = Frame::new (x, y, 800 - 20, h, "Forwarding 0 ports");
|
||||||
|
|
||||||
y += h + margin;
|
y += h + margin;
|
||||||
x = margin;
|
x = margin;
|
||||||
|
@ -87,18 +148,25 @@ fn main () -> anyhow::Result <()> {
|
||||||
// y += h + margin;
|
// y += h + margin;
|
||||||
// x = margin;
|
// x = margin;
|
||||||
|
|
||||||
let mut gui_ports = vec! [
|
let gui_ports = vec! [
|
||||||
gui_port_0,
|
gui_port_0,
|
||||||
gui_port_1,
|
gui_port_1,
|
||||||
gui_port_2,
|
gui_port_2,
|
||||||
];
|
];
|
||||||
|
|
||||||
let mut forwarding_instances = vec! [
|
let forwarding_instances = vec! [
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
];
|
];
|
||||||
|
|
||||||
|
let mut gui_client = GuiClient {
|
||||||
|
rt: &rt,
|
||||||
|
frame_status,
|
||||||
|
forwarding_instances,
|
||||||
|
gui_ports,
|
||||||
|
};
|
||||||
|
|
||||||
// y += h + margin;
|
// y += h + margin;
|
||||||
|
|
||||||
wind.end ();
|
wind.end ();
|
||||||
|
@ -107,7 +175,7 @@ fn main () -> anyhow::Result <()> {
|
||||||
let connection_p2_p3 = rt.block_on (async move {
|
let connection_p2_p3 = rt.block_on (async move {
|
||||||
let server_cert = match opt.cert_url.as_ref () {
|
let server_cert = match opt.cert_url.as_ref () {
|
||||||
Some (url) => reqwest::get (url).await?.bytes ().await?,
|
Some (url) => reqwest::get (url).await?.bytes ().await?,
|
||||||
None => tokio::fs::read ("quic_server.crt").await?.into (),
|
None => tokio::fs::read ("ptth_quic_output/quic_server.crt").await?.into (),
|
||||||
};
|
};
|
||||||
|
|
||||||
let relay_addr = opt.relay_addr
|
let relay_addr = opt.relay_addr
|
||||||
|
@ -132,27 +200,13 @@ fn main () -> anyhow::Result <()> {
|
||||||
while app.wait () {
|
while app.wait () {
|
||||||
match fltk_rx.recv () {
|
match fltk_rx.recv () {
|
||||||
Some (Message::OpenPort (port_idx)) => {
|
Some (Message::OpenPort (port_idx)) => {
|
||||||
if let Ok (params) = gui_ports [port_idx].get_params () {
|
match gui_client.open_port (connection_p2_p3.clone (), port_idx) {
|
||||||
let connection_p2_p3 = connection_p2_p3.clone ();
|
Err (e) => error! ("{:?}", e),
|
||||||
|
_ => (),
|
||||||
let _guard = rt.enter ();
|
};
|
||||||
forwarding_instances [port_idx].replace (ForwardingInstance::new (
|
|
||||||
connection_p2_p3,
|
|
||||||
params,
|
|
||||||
));
|
|
||||||
|
|
||||||
gui_ports [port_idx].set_forwarding (true);
|
|
||||||
|
|
||||||
frame_status.set_label ("Forwarding 1 port");
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
Some (Message::ClosePort (port_idx)) => {
|
Some (Message::ClosePort (port_idx)) => {
|
||||||
if let Some (old_instance) = forwarding_instances [port_idx].take () {
|
gui_client.close_port (port_idx)?;
|
||||||
rt.block_on (old_instance.close ())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
gui_ports [port_idx].set_forwarding (false);
|
|
||||||
frame_status.set_label ("Forwarding 0 ports");
|
|
||||||
},
|
},
|
||||||
None => (),
|
None => (),
|
||||||
}
|
}
|
||||||
|
@ -207,9 +261,9 @@ impl GuiPort {
|
||||||
input_server_id.set_value ("bogus_server");
|
input_server_id.set_value ("bogus_server");
|
||||||
input_server_port.set_value ("5900");
|
input_server_port.set_value ("5900");
|
||||||
|
|
||||||
but_open.set_trigger (CallbackTrigger::Changed);
|
but_open.set_trigger (CallbackTrigger::Release);
|
||||||
but_open.emit (fltk_tx, Message::OpenPort (port_idx));
|
but_open.emit (fltk_tx, Message::OpenPort (port_idx));
|
||||||
but_close.set_trigger (CallbackTrigger::Changed);
|
but_close.set_trigger (CallbackTrigger::Release);
|
||||||
but_close.emit (fltk_tx, Message::ClosePort (port_idx));
|
but_close.emit (fltk_tx, Message::ClosePort (port_idx));
|
||||||
|
|
||||||
set_active (&mut but_open, true);
|
set_active (&mut but_open, true);
|
||||||
|
|
|
@ -10,6 +10,7 @@ license = "AGPL-3.0"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.38"
|
anyhow = "1.0.38"
|
||||||
base64 = "0.13.0"
|
base64 = "0.13.0"
|
||||||
|
ctrlc = "3.2.1"
|
||||||
# fltk = "1.1.1"
|
# fltk = "1.1.1"
|
||||||
futures-util = "0.3.9"
|
futures-util = "0.3.9"
|
||||||
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] }
|
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] }
|
||||||
|
|
|
@ -1,7 +1,15 @@
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
use tokio::net::TcpListener;
|
use tokio::{
|
||||||
|
sync::watch,
|
||||||
|
};
|
||||||
|
|
||||||
use quic_demo::prelude::*;
|
use quic_demo::{
|
||||||
|
client_proxy::{
|
||||||
|
ForwardingParams,
|
||||||
|
forward_port,
|
||||||
|
},
|
||||||
|
prelude::*,
|
||||||
|
};
|
||||||
use protocol::PeerId;
|
use protocol::PeerId;
|
||||||
|
|
||||||
#[derive (Debug, StructOpt)]
|
#[derive (Debug, StructOpt)]
|
||||||
|
@ -23,68 +31,109 @@ async fn main () -> anyhow::Result <()> {
|
||||||
tracing_subscriber::fmt::init ();
|
tracing_subscriber::fmt::init ();
|
||||||
|
|
||||||
let opt = Opt::from_args ();
|
let opt = Opt::from_args ();
|
||||||
|
let conf = opt.into_config ().await?;
|
||||||
|
|
||||||
let server_cert = tokio::fs::read ("quic_server.crt").await?;
|
let client = P2Client::connect (conf)?;
|
||||||
let relay_addr = opt.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
|
client.run ().await?;
|
||||||
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?;
|
|
||||||
|
|
||||||
debug! ("Connecting to relay server");
|
Ok (())
|
||||||
|
}
|
||||||
|
|
||||||
let client_id = opt.client_id.unwrap_or_else (|| "bogus_client".to_string ());
|
pub struct P2Client {
|
||||||
|
endpoint: quinn::Endpoint,
|
||||||
|
conf: Arc <Config>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl P2Client {
|
||||||
|
pub fn connect (conf: Config) -> anyhow::Result <Self> {
|
||||||
|
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
|
||||||
|
let conf = Arc::new (conf);
|
||||||
|
|
||||||
|
Ok (Self {
|
||||||
|
endpoint,
|
||||||
|
conf,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run (&self) -> anyhow::Result <()> {
|
||||||
|
debug! ("P2 client connecting to P3 relay server");
|
||||||
|
|
||||||
|
let conf = Arc::clone (&self.conf);
|
||||||
|
|
||||||
let quinn::NewConnection {
|
let quinn::NewConnection {
|
||||||
connection,
|
connection,
|
||||||
..
|
..
|
||||||
} = protocol::p2_connect_to_p3 (&endpoint, &relay_addr, &client_id).await?;
|
} = protocol::p2_connect_to_p3 (&self.endpoint, &conf.relay_addr, &conf.client_id).await?;
|
||||||
|
|
||||||
// End of per-client stuff
|
let client_tcp_port = conf.client_tcp_port;
|
||||||
// Beginning of per-port stuff
|
|
||||||
|
|
||||||
let server_id = opt.server_id.unwrap_or_else (|| "bogus_server".to_string ());
|
debug! ("Accepting local TCP connections from P1 at {}", client_tcp_port);
|
||||||
|
|
||||||
let client_tcp_port = opt.client_tcp_port.unwrap_or (30381);
|
|
||||||
let server_tcp_port = opt.server_tcp_port.unwrap_or (30382);
|
|
||||||
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
|
|
||||||
|
|
||||||
// End of per-port stuff
|
// End of per-port stuff
|
||||||
// Beginning of per-connection stuff
|
// Beginning of per-connection stuff
|
||||||
|
|
||||||
let task_tcp_server = tokio::spawn (async move {
|
let (_shutdown_flag_tx, shutdown_flag_rx) = watch::channel (true);
|
||||||
loop {
|
|
||||||
let (tcp_socket, _) = listener.accept ().await?;
|
let task_tcp_server = {
|
||||||
let connection = connection.clone ();
|
let connection = connection.clone ();
|
||||||
let server_id = server_id.clone ();
|
let server_id = conf.server_id.clone ();
|
||||||
|
let server_tcp_port = conf.server_tcp_port;
|
||||||
|
|
||||||
|
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
|
||||||
|
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
|
||||||
|
|
||||||
tokio::spawn (async move {
|
tokio::spawn (async move {
|
||||||
let (local_recv, local_send) = tcp_socket.into_split ();
|
forward_port (
|
||||||
|
listener,
|
||||||
debug! ("Starting PTTH connection");
|
connection,
|
||||||
|
ForwardingParams {
|
||||||
let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?;
|
client_tcp_port,
|
||||||
|
server_id,
|
||||||
trace! ("Relaying bytes...");
|
server_tcp_port,
|
||||||
|
},
|
||||||
let ptth_conn = quic_demo::connection::NewConnection {
|
shutdown_flag_rx,
|
||||||
local_send,
|
).await?;
|
||||||
local_recv,
|
|
||||||
relay_send,
|
|
||||||
relay_recv,
|
|
||||||
}.build ();
|
|
||||||
|
|
||||||
ptth_conn.wait_for_close ().await?;
|
|
||||||
|
|
||||||
debug! ("Ended PTTH connection");
|
|
||||||
|
|
||||||
Ok::<_, anyhow::Error> (())
|
Ok::<_, anyhow::Error> (())
|
||||||
});
|
})
|
||||||
}
|
};
|
||||||
|
|
||||||
Ok::<_, anyhow::Error> (())
|
|
||||||
});
|
|
||||||
|
|
||||||
debug! ("Accepting local TCP connections from P1");
|
|
||||||
|
|
||||||
task_tcp_server.await??;
|
task_tcp_server.await??;
|
||||||
|
|
||||||
Ok (())
|
Ok (())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A filled-out config for constructing a P2 client
|
||||||
|
#[derive (Clone)]
|
||||||
|
pub struct Config {
|
||||||
|
client_tcp_port: u16,
|
||||||
|
server_tcp_port: u16,
|
||||||
|
client_id: String,
|
||||||
|
server_id: String,
|
||||||
|
relay_addr: SocketAddr,
|
||||||
|
relay_cert: Vec <u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Opt {
|
||||||
|
pub async fn into_config (self) -> anyhow::Result <Config> {
|
||||||
|
let client_tcp_port = self.client_tcp_port.unwrap_or (30381);
|
||||||
|
let server_tcp_port = self.server_tcp_port.unwrap_or (30382);
|
||||||
|
let client_id = self.client_id.unwrap_or_else (|| "bogus_client".to_string ());
|
||||||
|
let server_id = self.server_id.unwrap_or_else (|| "bogus_server".to_string ());
|
||||||
|
let relay_addr = self.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
|
||||||
|
|
||||||
|
// Begin I/O
|
||||||
|
|
||||||
|
let relay_cert = tokio::fs::read ("ptth_quic_output/quic_server.crt").await?;
|
||||||
|
|
||||||
|
Ok (Config {
|
||||||
|
client_tcp_port,
|
||||||
|
server_tcp_port,
|
||||||
|
client_id,
|
||||||
|
server_id,
|
||||||
|
relay_addr,
|
||||||
|
relay_cert,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2,11 +2,22 @@ use std::{
|
||||||
iter::FromIterator,
|
iter::FromIterator,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
use quic_demo::prelude::*;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main () -> anyhow::Result <()> {
|
async fn main () -> anyhow::Result <()> {
|
||||||
tracing_subscriber::fmt::init ();
|
tracing_subscriber::fmt::init ();
|
||||||
|
|
||||||
let args = Vec::from_iter (std::env::args_os ());
|
let args = Vec::from_iter (std::env::args_os ());
|
||||||
|
|
||||||
quic_demo::executable_end_server::main (&args).await
|
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
||||||
|
|
||||||
|
ctrlc::set_handler (move || {
|
||||||
|
shutdown_tx.send (true).expect ("Couldn't forward Ctrl+C signal");
|
||||||
|
})?;
|
||||||
|
trace! ("Set Ctrl+C handler");
|
||||||
|
|
||||||
|
quic_demo::executable_end_server::main (&args, Some (shutdown_rx)).await
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ use hyper::{
|
||||||
StatusCode,
|
StatusCode,
|
||||||
};
|
};
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use quic_demo::prelude::*;
|
use quic_demo::prelude::*;
|
||||||
use protocol::PeerId;
|
use protocol::PeerId;
|
||||||
|
@ -56,6 +57,8 @@ async fn main () -> anyhow::Result <()> {
|
||||||
let tcp_port = 30382;
|
let tcp_port = 30382;
|
||||||
let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?;
|
let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?;
|
||||||
|
|
||||||
|
let (_running_tx, running_rx) = watch::channel (true);
|
||||||
|
|
||||||
let task_quic_server = {
|
let task_quic_server = {
|
||||||
let relay_state = Arc::clone (&relay_state);
|
let relay_state = Arc::clone (&relay_state);
|
||||||
tokio::spawn (async move {
|
tokio::spawn (async move {
|
||||||
|
@ -89,24 +92,22 @@ async fn main () -> anyhow::Result <()> {
|
||||||
let task_tcp_server = {
|
let task_tcp_server = {
|
||||||
let relay_state = Arc::clone (&relay_state);
|
let relay_state = Arc::clone (&relay_state);
|
||||||
tokio::spawn (async move {
|
tokio::spawn (async move {
|
||||||
loop {
|
while *running_rx.borrow () {
|
||||||
let (tcp_socket, _) = tcp_listener.accept ().await?;
|
let (tcp_socket, _) = tcp_listener.accept ().await?;
|
||||||
|
|
||||||
let server_id = "bogus_server".to_string ();
|
|
||||||
|
|
||||||
let relay_state = Arc::clone (&relay_state);
|
let relay_state = Arc::clone (&relay_state);
|
||||||
tokio::spawn (async move {
|
tokio::spawn (async move {
|
||||||
let (client_recv, client_send) = tcp_socket.into_split ();
|
let (_client_recv, _client_send) = tcp_socket.into_split ();
|
||||||
|
|
||||||
debug! ("Accepted direct TCP connection P1 --> P3");
|
debug! ("Accepted direct TCP connection P1 --> P3");
|
||||||
|
|
||||||
let p4_server_proxies = relay_state.p4_server_proxies.lock ().await;
|
let p4_server_proxies = relay_state.p4_server_proxies.lock ().await;
|
||||||
let p4 = match p4_server_proxies.get ("bogus_server") {
|
let _p4 = match p4_server_proxies.get ("bogus_server") {
|
||||||
Some (x) => x,
|
Some (x) => x,
|
||||||
None => bail! ("That server isn't connected"),
|
None => bail! ("That server isn't connected"),
|
||||||
};
|
};
|
||||||
|
|
||||||
unimplemented! ();
|
// unimplemented! ();
|
||||||
/*
|
/*
|
||||||
p4.req_channel.send (RequestP2ToP4 {
|
p4.req_channel.send (RequestP2ToP4 {
|
||||||
client_send,
|
client_send,
|
||||||
|
|
|
@ -12,28 +12,37 @@ pub struct ForwardingInstance {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ForwardingInstance {
|
impl ForwardingInstance {
|
||||||
pub fn new (
|
pub async fn new (
|
||||||
connection_p2_p3: quinn::Connection,
|
connection_p2_p3: quinn::Connection,
|
||||||
params: ForwardingParams,
|
params: ForwardingParams,
|
||||||
) -> Self
|
) -> anyhow::Result <Self>
|
||||||
{
|
{
|
||||||
let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true);
|
let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true);
|
||||||
|
|
||||||
|
let listener = TcpListener::bind (("127.0.0.1", params.client_tcp_port)).await?;
|
||||||
|
trace! ("Accepting local TCP connections from P1 on {}", params.client_tcp_port);
|
||||||
|
|
||||||
let task = tokio::spawn (forward_port (
|
let task = tokio::spawn (forward_port (
|
||||||
|
listener,
|
||||||
connection_p2_p3,
|
connection_p2_p3,
|
||||||
params,
|
params,
|
||||||
shutdown_flag_rx
|
shutdown_flag_rx
|
||||||
));
|
));
|
||||||
|
|
||||||
Self {
|
Ok (Self {
|
||||||
task,
|
task,
|
||||||
shutdown_flag,
|
shutdown_flag,
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn close (self) -> anyhow::Result <()> {
|
pub async fn close (self) -> anyhow::Result <()> {
|
||||||
self.shutdown_flag.send (false)?;
|
match self.shutdown_flag.send (false) {
|
||||||
self.task.await??;
|
Err (_) => warn! ("Trying to gracefully shutdown forwarding task but it appears to already be shut down"),
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
self.task.await
|
||||||
|
.context ("awaiting ForwardingInstance task")?
|
||||||
|
.context ("inside ForwardingInstance task")?;
|
||||||
Ok (())
|
Ok (())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -44,10 +53,14 @@ pub struct ForwardingParams {
|
||||||
pub server_tcp_port: u16,
|
pub server_tcp_port: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn forward_port (
|
/// Starts a TCP listener that can forward any number of TCP streams to
|
||||||
|
/// the same client:server port combination
|
||||||
|
|
||||||
|
pub async fn forward_port (
|
||||||
|
listener: TcpListener,
|
||||||
connection_p2_p3: quinn::Connection,
|
connection_p2_p3: quinn::Connection,
|
||||||
params: ForwardingParams,
|
params: ForwardingParams,
|
||||||
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
|
mut shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
|
||||||
) -> anyhow::Result <()>
|
) -> anyhow::Result <()>
|
||||||
{
|
{
|
||||||
let ForwardingParams {
|
let ForwardingParams {
|
||||||
|
@ -56,13 +69,7 @@ async fn forward_port (
|
||||||
server_tcp_port,
|
server_tcp_port,
|
||||||
} = params;
|
} = params;
|
||||||
|
|
||||||
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
|
|
||||||
|
|
||||||
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
|
|
||||||
|
|
||||||
while *shutdown_flag_rx.borrow () {
|
while *shutdown_flag_rx.borrow () {
|
||||||
let mut shutdown_flag_rx_2 = shutdown_flag_rx.clone ();
|
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
x = listener.accept () => {
|
x = listener.accept () => {
|
||||||
let (tcp_socket, _) = x?;
|
let (tcp_socket, _) = x?;
|
||||||
|
@ -72,7 +79,7 @@ async fn forward_port (
|
||||||
|
|
||||||
tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx));
|
tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx));
|
||||||
},
|
},
|
||||||
_ = shutdown_flag_rx_2.changed () => (),
|
_ = shutdown_flag_rx.changed () => (),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,14 @@
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
use tokio::net::TcpStream;
|
use tokio::{
|
||||||
|
net::TcpStream,
|
||||||
|
sync::watch,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use protocol::PeerId;
|
use protocol::PeerId;
|
||||||
|
|
||||||
|
/// A partially-filled-out config that structopt can deal with
|
||||||
|
/// Try to turn this into a Config as soon as possible.
|
||||||
#[derive (Debug, StructOpt)]
|
#[derive (Debug, StructOpt)]
|
||||||
struct Opt {
|
struct Opt {
|
||||||
#[structopt (long)]
|
#[structopt (long)]
|
||||||
|
@ -16,37 +21,145 @@ struct Opt {
|
||||||
cert_url: Option <String>,
|
cert_url: Option <String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn main (args: &[OsString]) -> anyhow::Result <()> {
|
pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool>>) -> anyhow::Result <()> {
|
||||||
let opt = Arc::new (Opt::from_iter (args));
|
let opt = Opt::from_iter (args);
|
||||||
|
let conf = opt.into_config ().await?;
|
||||||
|
|
||||||
let server_cert = match opt.cert_url.as_ref () {
|
let end_server = Arc::new (P4EndServer::connect (conf)?);
|
||||||
Some (url) => reqwest::get (url).await?.bytes ().await?,
|
|
||||||
None => tokio::fs::read ("quic_server.crt").await?.into (),
|
let run_task = {
|
||||||
|
let end_server = Arc::clone (&end_server);
|
||||||
|
tokio::spawn (async move {
|
||||||
|
end_server.run ().await?;
|
||||||
|
Ok::<_, anyhow::Error> (())
|
||||||
|
})
|
||||||
};
|
};
|
||||||
let relay_addr = opt.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
|
|
||||||
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?;
|
|
||||||
|
|
||||||
trace! ("Connecting to relay server");
|
if let Some (mut shutdown_rx) = shutdown_rx {
|
||||||
|
while ! *shutdown_rx.borrow () {
|
||||||
|
shutdown_rx.changed ().await?;
|
||||||
|
}
|
||||||
|
end_server.shut_down ()?;
|
||||||
|
}
|
||||||
|
|
||||||
let server_id = opt.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ());
|
run_task.await??;
|
||||||
|
|
||||||
|
trace! ("P4 end server shut down gracefully.");
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A filled-out config for constructing an end server
|
||||||
|
#[derive (Clone)]
|
||||||
|
pub struct Config {
|
||||||
|
pub debug_echo: bool,
|
||||||
|
pub id: String,
|
||||||
|
pub relay_addr: SocketAddr,
|
||||||
|
pub relay_cert: Vec <u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Opt {
|
||||||
|
/// Converts self into a Config that the server can use.
|
||||||
|
/// Performs I/O to load the relay cert from disk or from HTTP.
|
||||||
|
/// Fails if arguments can't be parsed or if I/O fails.
|
||||||
|
|
||||||
|
pub async fn into_config (self) -> anyhow::Result <Config> {
|
||||||
|
let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ());
|
||||||
|
|
||||||
|
let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
|
||||||
|
|
||||||
|
// Do I/O after all parsing is done.
|
||||||
|
// We don't want to waste a network request only to come back and error
|
||||||
|
// out on like "127.oooo.1" not parsing into a relay address.
|
||||||
|
|
||||||
|
let relay_cert: Vec <u8> = match self.cert_url.as_ref () {
|
||||||
|
Some (url) => reqwest::get (url).await?.bytes ().await?.into_iter ().collect (),
|
||||||
|
None => tokio::fs::read ("ptth_quic_output/quic_server.crt").await?,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok (Config {
|
||||||
|
debug_echo: self.debug_echo,
|
||||||
|
id,
|
||||||
|
relay_addr,
|
||||||
|
relay_cert,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct P4EndServer {
|
||||||
|
endpoint: quinn::Endpoint,
|
||||||
|
conf: Arc <Config>,
|
||||||
|
shutdown_tx: watch::Sender <bool>,
|
||||||
|
shutdown_rx: watch::Receiver <bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl P4EndServer {
|
||||||
|
pub fn connect (conf: Config) -> anyhow::Result <Self> {
|
||||||
|
trace! ("P4 end server making its QUIC endpoint");
|
||||||
|
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
|
||||||
|
|
||||||
|
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
||||||
|
|
||||||
|
Ok (P4EndServer {
|
||||||
|
conf: Arc::new (conf),
|
||||||
|
endpoint,
|
||||||
|
shutdown_tx,
|
||||||
|
shutdown_rx,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn config (&self) -> &Config {
|
||||||
|
&*self.conf
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run (&self) -> anyhow::Result <()> {
|
||||||
|
trace! ("P4 end server connecting to P3 relay server");
|
||||||
let quinn::NewConnection {
|
let quinn::NewConnection {
|
||||||
mut bi_streams,
|
mut bi_streams,
|
||||||
..
|
..
|
||||||
} = protocol::p4_connect_to_p3 (&endpoint, &relay_addr, &server_id).await?;
|
} = protocol::p4_connect_to_p3 (
|
||||||
|
&self.endpoint,
|
||||||
|
&self.conf.relay_addr,
|
||||||
|
&self.conf.id
|
||||||
|
).await?;
|
||||||
|
|
||||||
debug! ("Connected to relay server");
|
debug! ("Connected to relay server");
|
||||||
|
|
||||||
trace! ("Accepting bi streams from P3");
|
trace! ("Accepting bi streams from P3");
|
||||||
|
|
||||||
loop {
|
let mut shutdown_rx = self.shutdown_rx.clone ();
|
||||||
let (relay_send, relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??;
|
|
||||||
|
|
||||||
tokio::spawn (handle_bi_stream (Arc::clone (&opt), relay_send, relay_recv));
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_rx.changed () => {
|
||||||
|
if *shutdown_rx.borrow () {
|
||||||
|
trace! ("P4 incoming bi streams task caught graceful shutdown");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stream_opt = bi_streams.next () => {
|
||||||
|
let (relay_send, relay_recv) = stream_opt.ok_or_else (|| anyhow::anyhow! ("P4 ran out of incoming streams. Maybe P3 shut down or disconnected?"))??;
|
||||||
|
|
||||||
|
tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn shut_down (&self) -> anyhow::Result <()> {
|
||||||
|
trace! ("P4 end server shutting down...");
|
||||||
|
Ok (self.shutdown_tx.send (true)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn shutting_down (&self) -> bool {
|
||||||
|
*self.shutdown_rx.borrow ()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_bi_stream (
|
async fn handle_bi_stream (
|
||||||
opt: Arc <Opt>,
|
conf: Arc <Config>,
|
||||||
relay_send: quinn::SendStream,
|
relay_send: quinn::SendStream,
|
||||||
mut relay_recv: quinn::RecvStream,
|
mut relay_recv: quinn::RecvStream,
|
||||||
) -> anyhow::Result <()>
|
) -> anyhow::Result <()>
|
||||||
|
@ -55,14 +168,14 @@ async fn handle_bi_stream (
|
||||||
protocol::P3ToP4Stream::NewPtthConnection {
|
protocol::P3ToP4Stream::NewPtthConnection {
|
||||||
client_id,
|
client_id,
|
||||||
..
|
..
|
||||||
} => handle_new_ptth_connection (opt, relay_send, relay_recv, client_id).await?,
|
} => handle_new_ptth_connection (conf, relay_send, relay_recv, client_id).await?,
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok (())
|
Ok (())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_new_ptth_connection (
|
async fn handle_new_ptth_connection (
|
||||||
opt: Arc <Opt>,
|
conf: Arc <Config>,
|
||||||
mut relay_send: quinn::SendStream,
|
mut relay_send: quinn::SendStream,
|
||||||
mut relay_recv: quinn::RecvStream,
|
mut relay_recv: quinn::RecvStream,
|
||||||
_client_id: String,
|
_client_id: String,
|
||||||
|
@ -79,7 +192,7 @@ async fn handle_new_ptth_connection (
|
||||||
|
|
||||||
debug! ("Started PTTH connection");
|
debug! ("Started PTTH connection");
|
||||||
|
|
||||||
if opt.debug_echo {
|
if conf.debug_echo {
|
||||||
relay_send.write (b"Connected to P4=P5 debug echo server\n").await?;
|
relay_send.write (b"Connected to P4=P5 debug echo server\n").await?;
|
||||||
debug! ("Relaying bytes using internal debug echo server (P4=P5)");
|
debug! ("Relaying bytes using internal debug echo server (P4=P5)");
|
||||||
tokio::io::copy (&mut relay_recv, &mut relay_send).await?;
|
tokio::io::copy (&mut relay_recv, &mut relay_send).await?;
|
||||||
|
|
|
@ -61,7 +61,7 @@ pub async fn p2_connect_to_p5 (
|
||||||
let cmd_type = Command::CONNECT_P2_TO_P4.0;
|
let cmd_type = Command::CONNECT_P2_TO_P4.0;
|
||||||
|
|
||||||
send.write_all (&[cmd_type, 0, 0, 0]).await?;
|
send.write_all (&[cmd_type, 0, 0, 0]).await?;
|
||||||
send_lv_string (&mut send, &server_id).await?;
|
send_lv_string (&mut send, server_id).await?;
|
||||||
|
|
||||||
expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await
|
expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await
|
||||||
.context ("P2 didn't get OK response when asking P3 to connect P2 to P4")?;
|
.context ("P2 didn't get OK response when asking P3 to connect P2 to P4")?;
|
||||||
|
|
|
@ -58,7 +58,7 @@ pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Incoming,
|
||||||
fn configure_client(server_certs: &[&[u8]]) -> anyhow::Result<ClientConfig> {
|
fn configure_client(server_certs: &[&[u8]]) -> anyhow::Result<ClientConfig> {
|
||||||
let mut cfg_builder = ClientConfigBuilder::default();
|
let mut cfg_builder = ClientConfigBuilder::default();
|
||||||
for cert in server_certs {
|
for cert in server_certs {
|
||||||
cfg_builder.add_certificate_authority(Certificate::from_der(&cert)?)?;
|
cfg_builder.add_certificate_authority(Certificate::from_der(cert)?)?;
|
||||||
}
|
}
|
||||||
Ok(cfg_builder.build())
|
Ok(cfg_builder.build())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue