Merge remote-tracking branch 'origin/main'

main
_ 2021-10-14 18:35:31 -05:00
commit 134035f198
29 changed files with 1773 additions and 860 deletions

665
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -23,7 +23,7 @@ exclude = [
[dependencies]
anyhow = "1.0.38"
blake3 = "0.3.7"
blake3 = "1.0.0"
tokio = { version = "1.4.0", features = ["full"] }
tracing-subscriber = "0.2.16"
tracing = "0.1.25"

View File

@ -20,14 +20,9 @@ rm -rf "$TEMP_GIBBERISH"
mkdir "$TEMP_GIBBERISH"
mkdir "$DEST"
cargo build --release -p ptth_server
cargo build --release -p quic_demo --bin quic_demo_end_server
cargo build --release -p ptth_multi_call_server
mkdir -p "$DEST/handlebars/server"
rsync -r handlebars/server/ "$DEST/handlebars/server/"
cp target/release/ptth_server "$DEST/"
cp target/release/quic_demo_end_server "$DEST/"
cp target/release/ptth_multi_call_server "$DEST/"
(
cd "$TEMP_GIBBERISH" || exit

View File

@ -10,6 +10,7 @@ description = "Common code for the PTTH relay and server"
[dependencies]
anyhow = "1.0.38"
base64 = "0.13.0"
ctrlc = { version = "3.1.8", features = [ "termination" ] }
futures = "0.3.7"

View File

@ -1,8 +1,14 @@
pub use std::{
ffi::OsString,
io::Write,
sync::Arc,
time::{Duration, Instant},
};
pub use anyhow::{
Context,
bail,
};
pub use tracing::{
debug, error, info, trace, warn,
instrument,

View File

@ -0,0 +1,145 @@
#![warn (clippy::pedantic)]
use std::{
ffi::OsString,
net::SocketAddr,
path::{PathBuf},
sync::Arc,
};
use arc_swap::ArcSwap;
use hyper::{
Body,
Request,
Response,
Server,
service::{
make_service_fn,
service_fn,
},
StatusCode,
};
use serde::Deserialize;
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;
use ptth_core::{
http_serde::RequestParts,
prelude::*,
};
use ptth_server::{
file_server::{
self,
metrics,
FileServer,
},
load_toml,
};
async fn handle_all (req: Request <Body>, state: Arc <FileServer>)
-> anyhow::Result <Response <Body>>
{
use std::str::FromStr;
use hyper::header::HeaderName;
debug! ("req.uri () = {:?}", req.uri ());
let path_and_query = req.uri ().path_and_query ().map_or_else (|| req.uri ().path (), http::uri::PathAndQuery::as_str);
let path_and_query = path_and_query.into ();
let (parts, _) = req.into_parts ();
let ptth_req = RequestParts::from_hyper (parts.method, path_and_query, parts.headers)?;
let ptth_resp = state.serve_all (
ptth_req.method,
&ptth_req.uri,
&ptth_req.headers
).await?;
let mut resp = Response::builder ()
.status (StatusCode::from (ptth_resp.parts.status_code));
for (k, v) in ptth_resp.parts.headers {
resp = resp.header (HeaderName::from_str (&k)?, v);
}
let body = ptth_resp.body.map_or_else (Body::empty, |body| {
Body::wrap_stream (ReceiverStream::new (body))
});
Ok (resp.body (body)?)
}
#[derive (Deserialize)]
struct ConfigFile {
file_server_root: Option <PathBuf>,
name: Option <String>,
}
pub async fn main (_args: &[OsString]) -> anyhow::Result <()> {
let path = PathBuf::from ("./config/ptth_server.toml");
let file_server_root;
let name;
match load_toml::load::<ConfigFile, _> (&path) {
Ok (config_file) => {
file_server_root = config_file.file_server_root;
name = config_file.name;
},
_ => {
info! ("No ptth_server.toml file, using default configs");
file_server_root = None;
name = None;
},
};
let name = name.unwrap_or_else (|| "PTTH File Server".to_string ());
info! ("file_server_root: {:?}", file_server_root);
let addr = SocketAddr::from(([0, 0, 0, 0], 4000));
info! ("Serving at {:?}", addr);
let metrics_interval = Arc::new (ArcSwap::default ());
let interval_writer = Arc::clone (&metrics_interval);
tokio::spawn (async move {
file_server::metrics::Interval::monitor (interval_writer).await;
});
let state = Arc::new (FileServer::new (
file_server_root.unwrap_or_else (PathBuf::new),
&PathBuf::new (),
name,
metrics_interval,
Some (path),
)?);
let make_svc = make_service_fn (|_conn| {
let state = state.clone ();
async {
Ok::<_, String> (service_fn (move |req| {
let state = state.clone ();
handle_all (req, state)
}))
}
});
let (shutdown_rx, forced_shutdown) = ptth_core::graceful_shutdown::init_with_force ();
let server = Server::bind (&addr)
.serve (make_svc)
.with_graceful_shutdown (async move {
shutdown_rx.await.ok ();
});
forced_shutdown.wrap_server (server).await??;
Ok (())
}

View File

@ -1,128 +1,12 @@
#![warn (clippy::pedantic)]
use std::{
net::SocketAddr,
path::PathBuf,
sync::Arc,
iter::FromIterator,
};
use arc_swap::ArcSwap;
use hyper::{
Body,
Request,
Response,
Server,
service::{
make_service_fn,
service_fn,
},
StatusCode,
};
use serde::Deserialize;
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;
use ptth_core::{
http_serde::RequestParts,
prelude::*,
};
use ptth_server::{
file_server::{
self,
metrics,
FileServer,
},
load_toml,
};
async fn handle_all (req: Request <Body>, state: Arc <FileServer>)
-> anyhow::Result <Response <Body>>
{
use std::str::FromStr;
use hyper::header::HeaderName;
debug! ("req.uri () = {:?}", req.uri ());
let path_and_query = req.uri ().path_and_query ().map_or_else (|| req.uri ().path (), http::uri::PathAndQuery::as_str);
let path_and_query = path_and_query.into ();
let (parts, _) = req.into_parts ();
let ptth_req = RequestParts::from_hyper (parts.method, path_and_query, parts.headers)?;
let ptth_resp = state.serve_all (
ptth_req.method,
&ptth_req.uri,
&ptth_req.headers
).await?;
let mut resp = Response::builder ()
.status (StatusCode::from (ptth_resp.parts.status_code));
for (k, v) in ptth_resp.parts.headers {
resp = resp.header (HeaderName::from_str (&k)?, v);
}
let body = ptth_resp.body.map_or_else (Body::empty, |body| {
Body::wrap_stream (ReceiverStream::new (body))
});
Ok (resp.body (body)?)
}
#[derive (Deserialize)]
pub struct ConfigFile {
pub file_server_root: Option <PathBuf>,
pub name: Option <String>,
}
#[tokio::main]
async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let path = PathBuf::from ("./config/ptth_server.toml");
let config_file: ConfigFile = load_toml::load (&path)?;
info! ("file_server_root: {:?}", config_file.file_server_root);
let args = Vec::from_iter (std::env::args_os ());
let addr = SocketAddr::from(([0, 0, 0, 0], 4000));
let metrics_interval = Arc::new (ArcSwap::default ());
let interval_writer = Arc::clone (&metrics_interval);
tokio::spawn (async move {
file_server::metrics::Interval::monitor (interval_writer).await;
});
let state = Arc::new (FileServer::new (
config_file.file_server_root.unwrap_or_else (|| PathBuf::from (".")),
&PathBuf::new (),
config_file.name.unwrap_or_else (|| "PTTH File Server".to_string ()),
metrics_interval,
Some (path),
)?);
let make_svc = make_service_fn (|_conn| {
let state = state.clone ();
async {
Ok::<_, String> (service_fn (move |req| {
let state = state.clone ();
handle_all (req, state)
}))
}
});
let (shutdown_rx, forced_shutdown) = ptth_core::graceful_shutdown::init_with_force ();
let server = Server::bind (&addr)
.serve (make_svc)
.with_graceful_shutdown (async move {
shutdown_rx.await.ok ();
});
forced_shutdown.wrap_server (server).await??;
Ok (())
ptth_file_server::main (&args).await
}

View File

@ -0,0 +1,16 @@
[package]
name = "ptth_multi_call_server"
version = "0.1.0"
authors = ["Trish"]
edition = "2018"
license = "AGPL-3.0"
[dependencies]
anyhow = "1.0.38"
ctrlc = "3.2.1"
ptth_file_server = { path = "../ptth_file_server_bin" }
ptth_server = { path = "../ptth_server" }
quic_demo = { path = "../../prototypes/quic_demo" }
tokio = { version = "1.8.1", features = ["full"] }
tracing-subscriber = "0.2.16"
tracing = "0.1.25"

View File

@ -0,0 +1,155 @@
use std::{
ffi::OsString,
iter::FromIterator,
};
use tokio::sync::watch;
#[derive (Clone, Copy, Debug, PartialEq)]
enum Subcommand {
PtthServer,
PtthFileServer,
PtthQuicEndServer,
}
#[tokio::main]
async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let args = Vec::from_iter (std::env::args_os ());
let (subcommand, args) = parse_args (&args)?;
match subcommand {
Subcommand::PtthServer => ptth_server::executable::main (&args).await,
Subcommand::PtthFileServer => ptth_file_server::main (&args).await,
Subcommand::PtthQuicEndServer => {
let (shutdown_tx, shutdown_rx) = watch::channel (false);
ctrlc::set_handler (move || {
shutdown_tx.send (true).expect ("Couldn't forward Ctrl+C signal");
})?;
tracing::trace! ("Set Ctrl+C handler");
quic_demo::executable_end_server::main (&args, Some (shutdown_rx)).await?;
Ok (())
}
}
}
fn parse_subcommand (arg: &str) -> Option <Subcommand>
{
use Subcommand::*;
let map = vec! [
("ptth_server", PtthServer),
("ptth_file_server", PtthFileServer),
("ptth_quic_end_server", PtthQuicEndServer),
];
let arg = arg.strip_suffix (".exe").unwrap_or (arg);
for (suffix, subcommand) in &map {
if arg.ends_with (suffix) {
return Some (*subcommand);
}
}
None
}
fn parse_args (args: &[OsString]) -> anyhow::Result <(Subcommand, &[OsString])>
{
let arg_0 = match args.get (0) {
Some (x) => x,
None => anyhow::bail! ("arg 0 must be the exe name"),
};
let arg_0 = arg_0.to_str ().ok_or_else (|| anyhow::anyhow! ("arg 0 should be valid UTF-8"))?;
match parse_subcommand (arg_0) {
Some (x) => return Ok ((x, args)),
None => (),
}
let arg_1 = match args.get (1) {
Some (x) => x,
None => anyhow::bail! ("arg 1 must be the subcommand if arg 0 is not"),
};
let arg_1 = arg_1.to_str ().ok_or_else (|| anyhow::anyhow! ("arg 1 subcommand should be valid UTF-8"))?;
match parse_subcommand (arg_1) {
Some (x) => return Ok ((x, &args [1..])),
None => (),
}
anyhow::bail! ("Subcommand must be either arg 0 (exe name) or arg 1")
}
#[cfg (test)]
mod tests {
use super::*;
#[test]
fn multi_call () -> anyhow::Result <()> {
let negative_cases = vec! [
vec! [],
vec! ["invalid_exe_name"],
vec! ["ptth_multi_call_server"],
vec! ["ptth_server.ex"],
vec! ["ptth_multi_call_server", "invalid_subcommand"],
];
for input in &negative_cases {
let input: Vec <_> = input.iter ().map (OsString::from).collect ();
let actual = parse_args (&input);
assert! (actual.is_err ());
}
let positive_cases = vec! [
(vec! ["ptth_server.exe"], (Subcommand::PtthServer, vec! ["ptth_server.exe"])),
(vec! ["ptth_server"], (Subcommand::PtthServer, vec! ["ptth_server"])),
(vec! ["ptth_server", "--help"], (Subcommand::PtthServer, vec! ["ptth_server", "--help"])),
(vec! ["ptth_file_server"], (Subcommand::PtthFileServer, vec! ["ptth_file_server"])),
(vec! ["ptth_quic_end_server", "--help"], (Subcommand::PtthQuicEndServer, vec! ["ptth_quic_end_server", "--help"])),
(vec! ["ptth_multi_call_server", "ptth_server"], (Subcommand::PtthServer, vec! ["ptth_server"])),
(
vec! [
"ptth_multi_call_server",
"ptth_server",
"--help"
],
(
Subcommand::PtthServer,
vec! [
"ptth_server",
"--help"
]
)
),
(
vec! [
"invalid_exe_name",
"ptth_server",
"--help"
],
(
Subcommand::PtthServer,
vec! [
"ptth_server",
"--help"
]
)
),
];
for (input, (expected_subcommand, expected_args)) in &positive_cases {
let input: Vec <_> = input.iter ().map (OsString::from).collect ();
let (actual_subcommand, actual_args) = parse_args (&input)?;
assert_eq! (expected_subcommand, &actual_subcommand);
assert_eq! (expected_args, actual_args);
}
Ok (())
}
}

View File

@ -12,7 +12,7 @@ description = "The PTTH relay"
anyhow = "1.0.38"
base64 = "0.13.0"
blake3 = "0.3.7"
blake3 = "1.0.0"
chrono = { version = "0.4.19", features = ["serde"] }
clap = "2.33.3"
dashmap = "4.0.2"

View File

@ -17,7 +17,7 @@ aho-corasick = "0.7.15"
anyhow = "1.0.38"
arc-swap = "1.2.0"
base64 = "0.13.0"
blake3 = "0.3.7"
blake3 = "1.0.0"
chrono = {version = "0.4.19", features = ["serde"]}
futures = "0.3.7"
handlebars = "3.5.1"
@ -28,6 +28,7 @@ pulldown-cmark = { version = "0.8.0", optional = true }
rand = "0.8.3"
regex = "1.4.1"
rmp-serde = "0.15.5"
rust-embed = "6.2.0"
rusty_ulid = "0.10.1"
serde = {version = "1.0.117", features = ["derive"]}
serde_json = "1.0.60"

View File

@ -1,128 +1,12 @@
#![warn (clippy::pedantic)]
use std::{
fs::File,
path::{Path, PathBuf},
iter::FromIterator,
};
use structopt::StructOpt;
use ptth_server::{
load_toml,
prelude::*,
run_server,
};
#[derive (Debug, StructOpt)]
struct Opt {
#[structopt (long)]
auto_gen_key: bool,
#[structopt (long)]
throttle_upload: bool,
#[structopt (long)]
file_server_root: Option <PathBuf>,
#[structopt (long)]
asset_root: Option <PathBuf>,
#[structopt (long)]
config_path: Option <PathBuf>,
#[structopt (long)]
name: Option <String>,
#[structopt (long)]
print_tripcode: bool,
#[structopt (long)]
relay_url: Option <String>,
}
#[derive (Default, serde::Deserialize)]
pub struct ConfigFile {
pub name: Option <String>,
pub api_key: String,
pub relay_url: Option <String>,
pub file_server_root: Option <PathBuf>,
}
fn gen_and_save_key (path: &Path) -> anyhow::Result <()> {
let api_key = ptth_core::gen_key ();
let mut f = File::create (path).with_context (|| format! ("Can't create config file `{:?}`", path))?;
#[cfg (unix)]
{
use std::os::unix::fs::PermissionsExt;
let metadata = f.metadata ()?;
let mut permissions = metadata.permissions ();
permissions.set_mode (0o600);
f.set_permissions (permissions)?;
}
#[cfg (not (unix))]
{
tracing::warn! ("Error VR6VW5QT: API keys aren't protected from clients on non-Unix OSes yet");
}
f.write_all (format! ("api_key = \"{}\"\n", api_key).as_bytes ())?;
Ok (())
}
#[tokio::main]
async fn main () -> Result <(), anyhow::Error> {
async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let opt = Opt::from_args ();
let asset_root = opt.asset_root;
let args = Vec::from_iter (std::env::args_os ());
let path = opt.config_path.clone ().unwrap_or_else (|| PathBuf::from ("./config/ptth_server.toml"));
let config_file: ConfigFile = if opt.auto_gen_key {
// If we're in autonomous mode, try harder to fix things
match load_toml::load (&path) {
Err (_) => {
gen_and_save_key (&path)?;
load_toml::load (&path)?
},
Ok (x) => x,
}
}
else {
match load_toml::load (&path) {
Err (ptth_server::errors::LoadTomlError::Io (_)) => bail! ("API key not provided in config file and auto-gen-key not provided"),
Ok (x) => x,
Err (e) => return Err (e.into ()),
}
};
let config_file = ptth_server::ConfigFile {
name: opt.name.or (config_file.name).ok_or (anyhow::anyhow! ("`name` must be provided in command line or config file"))?,
api_key: config_file.api_key,
relay_url: opt.relay_url.or (config_file.relay_url).ok_or (anyhow::anyhow! ("`--relay-url` must be provided in command line or `relay_url` in config file"))?,
file_server_root: opt.file_server_root.or (config_file.file_server_root).unwrap_or_else (|| PathBuf::from (".")),
throttle_upload: opt.throttle_upload,
client_keys: Default::default (),
allow_any_client: true,
};
if opt.print_tripcode {
println! (r#"name = "{}""#, config_file.name);
println! (r#"tripcode = "{}""#, config_file.tripcode ());
return Ok (());
}
run_server (
config_file,
ptth_core::graceful_shutdown::init (),
Some (path),
asset_root
).await?;
Ok (())
ptth_server::executable::main (&args).await
}

View File

@ -141,6 +141,7 @@ async fn serve_dir_json (
#[instrument (level = "debug", skip (f))]
async fn serve_file (
uri: &str,
mut f: File,
client_wants_body: bool,
range: range::ValidParsed,
@ -211,6 +212,12 @@ async fn serve_file (
response.header (String::from ("content-range"), format! ("bytes {}-{}/{}", range.start, range.end - 1, range.end).into_bytes ());
}
// Guess MIME type based on the URI so that we can serve web games
if uri.ends_with (".js") {
response.header ("content-type".into (), b"application/javascript".to_vec ());
}
response.content_length = Some (content_length);
if let Some (body) = body {
@ -399,7 +406,7 @@ impl FileServer {
file,
send_body,
range,
}) => serve_file (file.into_inner (), send_body, range, headers.get ("if-none-match").map (|v| &v[..])).await?,
}) => serve_file (uri, file.into_inner (), send_body, range, headers.get ("if-none-match").map (|v| &v[..])).await?,
MarkdownErr (e) => {
#[cfg (feature = "markdown")]
{
@ -426,20 +433,26 @@ impl FileServer {
}
fn load_templates (
asset_root: &Path
_asset_root: &Path
)
-> Result <Handlebars <'static>, anyhow::Error>
-> anyhow::Result <Handlebars <'static>>
{
use rust_embed::RustEmbed;
#[derive (RustEmbed)]
#[folder = "../../handlebars/server"]
struct HandlebarsServer;
let mut handlebars = Handlebars::new ();
handlebars.set_strict_mode (true);
let asset_root = asset_root.join ("handlebars/server");
for (k, v) in &[
("file_server_dir", "file_server_dir.html"),
("file_server_root", "file_server_root.html"),
] {
handlebars.register_template_file (k, asset_root.join (v))?;
let asset_file = HandlebarsServer::get (v)
.ok_or_else (|| anyhow::anyhow! ("failed to load handlebars template file"))?;
let s = std::str::from_utf8 (asset_file.data.as_ref ())?;
handlebars.register_template_string (k, s)?;
}
Ok (handlebars)

View File

@ -39,6 +39,17 @@
// False positive on futures::select! macro
#![allow (clippy::mut_mut)]
pub mod errors;
/// In-process file server module with byte range and ETag support
pub mod file_server;
/// Load and de-serialize structures from TOML, with a size limit
/// and checking permissions (On Unix)
pub mod load_toml;
pub mod prelude;
use std::{
collections::*,
future::Future,
@ -59,22 +70,11 @@ use tokio_stream::wrappers::ReceiverStream;
use ptth_core::{
http_serde,
prelude::*,
};
// use crate::key_validity::BlakeHashWrapper;
pub mod errors;
/// In-process file server module with byte range and ETag support
pub mod file_server;
/// Load and de-serialize structures from TOML, with a size limit
/// and checking permissions (On Unix)
pub mod load_toml;
pub mod prelude;
use errors::ServerError;
use prelude::*;
pub struct State {
// file_server: file_server::FileServer,
@ -490,6 +490,130 @@ impl State {
}
}
pub mod executable {
use std::{
path::{Path, PathBuf},
};
use structopt::StructOpt;
use super::{
load_toml,
prelude::*,
};
pub async fn main (args: &[OsString]) -> anyhow::Result <()> {
let opt = Opt::from_iter (args);
let asset_root = opt.asset_root;
let path = opt.config_path.clone ().unwrap_or_else (|| PathBuf::from ("./config/ptth_server.toml"));
let config_file: ConfigFile = if opt.auto_gen_key {
// If we're in autonomous mode, try harder to fix things
match load_toml::load (&path) {
Err (_) => {
gen_and_save_key (&path)?;
load_toml::load (&path)?
},
Ok (x) => x,
}
}
else {
match load_toml::load (&path) {
Err (super::errors::LoadTomlError::Io (_)) => bail! ("API key not provided in config file and auto-gen-key not provided"),
Ok (x) => x,
Err (e) => return Err (e.into ()),
}
};
let config_file = super::ConfigFile {
name: opt.name.or (config_file.name).ok_or (anyhow::anyhow! ("`name` must be provided in command line or config file"))?,
api_key: config_file.api_key,
relay_url: opt.relay_url.or (config_file.relay_url).ok_or (anyhow::anyhow! ("`--relay-url` must be provided in command line or `relay_url` in config file"))?,
file_server_root: opt.file_server_root.or (config_file.file_server_root).unwrap_or_else (PathBuf::new),
throttle_upload: opt.throttle_upload,
allow_any_client: true,
client_keys: Default::default (),
};
if opt.print_tripcode {
println! (r#"name = "{}""#, config_file.name);
println! (r#"tripcode = "{}""#, config_file.tripcode ());
return Ok (());
}
super::run_server (
config_file,
ptth_core::graceful_shutdown::init (),
Some (path),
asset_root
).await?;
Ok (())
}
#[derive (Debug, StructOpt)]
struct Opt {
#[structopt (long)]
auto_gen_key: bool,
#[structopt (long)]
throttle_upload: bool,
#[structopt (long)]
file_server_root: Option <PathBuf>,
#[structopt (long)]
asset_root: Option <PathBuf>,
#[structopt (long)]
config_path: Option <PathBuf>,
#[structopt (long)]
name: Option <String>,
#[structopt (long)]
print_tripcode: bool,
#[structopt (long)]
relay_url: Option <String>,
}
#[derive (Default, serde::Deserialize)]
struct ConfigFile {
pub name: Option <String>,
pub api_key: String,
pub relay_url: Option <String>,
pub file_server_root: Option <PathBuf>,
}
fn gen_and_save_key (path: &Path) -> anyhow::Result <()> {
use std::fs::File;
let api_key = ptth_core::gen_key ();
let mut f = File::create (path).with_context (|| format! ("Can't create config file `{:?}`", path))?;
#[cfg (unix)]
{
use std::os::unix::fs::PermissionsExt;
let metadata = f.metadata ()?;
let mut permissions = metadata.permissions ();
permissions.set_mode (0o600);
f.set_permissions (permissions)?;
}
#[cfg (not (unix))]
{
tracing::warn! ("Error VR6VW5QT: API keys aren't protected from clients on non-Unix OSes yet");
}
f.write_all (format! ("api_key = \"{}\"\n", api_key).as_bytes ())?;
Ok (())
}
}
#[cfg (test)]
mod tests {
use super::*;

View File

@ -1,8 +1 @@
pub use std::{
io::Write,
};
pub use anyhow::{
Context,
bail,
};
pub use ptth_core::prelude::*;

View File

@ -0,0 +1,73 @@
# PTTH_DIREC - Direct P2P connections
_It could work, even!_
To keep each ridiculous new feature simple, we'll rely on bootstrapping:
1. PTTH is just HTTPS connections
2. PTTH_QUIC uses a PTTH relay to download the QUIC cert and bootstrap
3. PTTH_DIREC will use a PTTH_QUIC relay to bootstrap
# Overview
Given that:
- P2 is connected to P3
- P4 is connected to P3
Steps:
- S1. P2 starts a bi stream to P3
- S2.0. P2 says, "I want to initiate a PTTH_DIREC connection..."
- "... And I'll send you cookie X to do hole-punching..."
- "... And I want to connect to end server Y..."
- S3.0. P3 creates an ID for this connection
- S3.1. P3 replies "go ahead" to P2
- S4. P3 starts a bi stream to P4 (end server Y)
- S5. P3 says, "I want you to accept a PTTH_DIREC connection..."
- "... And you should send me cookie Z to do hole-punching..."
- "... And the client will be client W..."
- S6. P3 waits for P4 to accept the offer
- S7. P3 waits for both cookies to arrive
- S8. When the cookies arrive, P3 learns the WAN addresses of P2 and P4
- S9. P3 sends the WAN addresses of P2 and P4 to each other (on the existing bi streams)
- S10. P4 tries to connect directly to P2
- S11. P2 does the same to P4
- S12. When P4 sees round-tripped data, it attempts to upgrade to QUIC
- S13. When P2 sees round-tripped data, it attempts to upgrade to QUIC
- Cool stuff happens over QUIC
- ReactorScram implements the rest of the protocol
P2's PoV:
- S1. Start a bi stream to P3
- S2.0. Send cookie and server ID
- S2.1. Wait for go-ahead signal (this may contain the hole-punch address and a new cookie for P4)
- S2.2. Send cookie to hole-punch address repeatedly
- S2.3. While you're sending the cookie, wait to hear P4's WAN address
- S9. Learn P4's WAN address
- S10. Send the new cookie to P4's address
- S12. When you see round-tripped data, upgrade to QUIC
P4's PoV:
- S4. Accept a bi stream from P3
- S5. Receive cookie and client ID
- S6. Reply "OK"
- S7.0. Send cookie to hole-punch address repeatedly
- S7.1. While sending the cookie, wait to hear P2's WAN address
- S9. Learn P2's WAN address
- S10. Try to connect directly to P2
- S12. When you see round-tripped data, upgrade to QUIC
Commands needed:
- ???
# Decisions
I'll add a delay between giving P2's address to P4, and giving P4's address to P2.
This miiiight change things slightly if P4's firewall is suspicious of packets
coming in too early, but I doubt it.
The delay is easy to remove relay-side if it doesn't help.

View File

@ -9,10 +9,15 @@ license = "AGPL-3.0"
[dependencies]
anyhow = "1.0.38"
fltk = "1.1.1"
blake3 = "1.0.0"
fltk = "1.2.7"
quic_demo = { path = "../quic_demo" }
quinn = "0.7.2"
rand = "0.8.4"
rand_chacha = "0.3.1"
reqwest = "0.11.4"
rmp-serde = "0.15.5"
serde = "1.0.130"
structopt = "0.3.20"
tokio = { version = "1.8.1", features = ["full"] }
tracing-subscriber = "0.2.16"

View File

@ -1,15 +1,23 @@
use std::str::FromStr;
use std::{
str::FromStr,
};
use fltk::{
app,
button::Button,
enums::CallbackTrigger,
frame::Frame,
group::Flex,
input::*,
prelude::*,
window::Window
};
use rand::{
Rng,
SeedableRng,
};
use structopt::StructOpt;
use tokio::runtime::Runtime;
use quic_demo::{
client_proxy::*,
@ -33,11 +41,125 @@ struct Opt {
enum Message {
OpenPort (usize),
ClosePort (usize),
AddPort,
}
struct GuiClient <'a> {
rt: &'a Runtime,
frame_status: Frame,
ports: Vec <Port>,
but_add_port: Button,
}
struct Port {
gui: GuiPort,
forwarding_instance: Option <ForwardingInstance>,
}
impl Port {
pub fn open_port (
&mut self,
rt: &Runtime,
connection_p2_p3: quinn::Connection,
) -> anyhow::Result <()>
{
let params = self.gui.get_params ()?;
let _guard = rt.enter ();
let forwarding_instance = rt.block_on (ForwardingInstance::new (
connection_p2_p3,
params,
))?;
self.gui.input_client_port.set_value (&forwarding_instance.local_port ().to_string ());
self.forwarding_instance.replace (forwarding_instance);
self.gui.set_forwarding (true);
Ok (())
}
}
struct GuiPort {
row: fltk::group::Flex,
input_client_port: Input,
input_server_id: Input,
input_server_port: Input,
but_open: Button,
but_close: Button,
}
impl GuiClient <'_> {
pub fn open_port (
&mut self,
connection_p2_p3: quinn::Connection,
port_idx: usize,
) -> anyhow::Result <()>
{
self.ports [port_idx].open_port (&self.rt, connection_p2_p3)?;
self.sync_status ();
Ok (())
}
pub fn close_port (&mut self, port_idx: usize) -> anyhow::Result <()> {
if let Some (old_instance) = self.ports [port_idx].forwarding_instance.take () {
self.rt.block_on (async {
old_instance.close ()
.await
.context ("closing ForwardingInstance")?;
Ok::<_, anyhow::Error> (())
})?;
}
self.ports [port_idx].gui.set_forwarding (false);
self.sync_status ();
Ok (())
}
fn open_ports (&self) -> usize {
self.ports.iter ()
.map (|x| if x.forwarding_instance.is_some () { 1 } else { 0 })
.sum ()
}
pub fn sync_status (&mut self) {
let open_ports = self.open_ports ();
self.frame_status.set_label (&format! ("Forwarding {} ports", open_ports));
}
pub fn add_port (
&mut self,
ports_col: &mut Flex,
fltk_tx: fltk::app::Sender <Message>
) {
if self.ports.len () >= 5 {
return;
}
let mut gui = GuiPort::new (fltk_tx, self.ports.len ());
ports_col.add (&gui.row);
ports_col.set_size (&mut gui.row, 30);
let port = Port {
gui,
forwarding_instance: None,
};
self.ports.push (port);
if self.ports.len () >= 5 {
self.but_add_port.deactivate ();
}
}
}
fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let rt = tokio::runtime::Runtime::new ()?;
let rt = Runtime::new ()?;
let opt = Opt::from_args ();
@ -47,59 +169,54 @@ fn main () -> anyhow::Result <()> {
let window_title = opt.window_title.clone ().unwrap_or_else (|| "PTTH client proxy".to_string ());
let mut wind = Window::new (100, 100, 800, 600, None)
.with_label (&window_title);
wind.make_resizable (true);
let margin = 10;
let h = 30;
let mut x = margin;
let mut y = margin;
let mut col = Flex::default ().column ().size_of_parent ();
let mut frame_status = Frame::new (x, y, 800 - 20, h, "Forwarding 0 ports");
y += h + margin;
x = margin;
let mut frame_status = Frame::default ();
col.set_size (&mut frame_status, 30);
{
let w = 80;
Frame::new (x, y, w, h, "Local port");
x += w + margin;
let mut row = Flex::default ().row ();
let w = 120;
Frame::new (x, y, w, h, "Server ID");
x += w + margin;
let mut l = Frame::default ().with_label ("Server ID");
row.set_size (&mut l, 120);
let mut l = Frame::default ().with_label ("Server port");
row.set_size (&mut l, 80);
let mut l = Frame::default ().with_label ("Local port");
row.set_size (&mut l, 80);
row.end ();
let w = 80;
Frame::new (x, y, w, h, "Server port");
// x += w + margin;
col.set_size (&mut row, 30);
}
y += h + margin;
x = margin;
let mut ports_col = Flex::default ().column ();
ports_col.end ();
let gui_port_0 = GuiPort::new (fltk_tx, &mut x, y, 0);
y += h + margin;
x = margin;
let mut but_add_port = Button::default ().with_label ("+");
but_add_port.set_trigger (CallbackTrigger::Release);
but_add_port.emit (fltk_tx, Message::AddPort);
col.set_size (&mut but_add_port, 30);
let gui_port_1 = GuiPort::new (fltk_tx, &mut x, y, 1);
y += h + margin;
x = margin;
col.end ();
let gui_port_2 = GuiPort::new (fltk_tx, &mut x, y, 2);
// y += h + margin;
// x = margin;
let relay_addr = opt.relay_addr.as_ref ()
.map (|s| &s[..])
.unwrap_or ("127.0.0.1:30380")
.parse ()
.context ("relay_addr should be like 127.0.0.1:30380")?;
let mut gui_ports = vec! [
gui_port_0,
gui_port_1,
gui_port_2,
];
let mut gui_client = GuiClient {
rt: &rt,
frame_status,
ports: Default::default (),
but_add_port,
};
let mut forwarding_instances = vec! [
None,
None,
None,
];
gui_client.add_port (&mut ports_col, fltk_tx);
ports_col.recalc ();
// y += h + margin;
gui_client.sync_status ();
wind.end ();
wind.show ();
@ -107,13 +224,9 @@ fn main () -> anyhow::Result <()> {
let connection_p2_p3 = rt.block_on (async move {
let server_cert = match opt.cert_url.as_ref () {
Some (url) => reqwest::get (url).await?.bytes ().await?,
None => tokio::fs::read ("quic_server.crt").await?.into (),
None => tokio::fs::read ("ptth_quic_output/quic_server.crt").await?.into (),
};
let relay_addr = opt.relay_addr
.unwrap_or_else (|| String::from ("127.0.0.1:30380"))
.parse ()
.context ("relay_addr should be like 127.0.0.1:30380")?;
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?;
trace! ("Connecting to relay server");
@ -132,27 +245,18 @@ fn main () -> anyhow::Result <()> {
while app.wait () {
match fltk_rx.recv () {
Some (Message::OpenPort (port_idx)) => {
if let Ok (params) = gui_ports [port_idx].get_params () {
let connection_p2_p3 = connection_p2_p3.clone ();
let _guard = rt.enter ();
forwarding_instances [port_idx].replace (ForwardingInstance::new (
connection_p2_p3,
params,
));
gui_ports [port_idx].set_forwarding (true);
frame_status.set_label ("Forwarding 1 port");
if let Err (e) = gui_client.open_port (connection_p2_p3.clone (), port_idx)
{
error! ("{:?}", e);
}
},
Some (Message::ClosePort (port_idx)) => {
if let Some (old_instance) = forwarding_instances [port_idx].take () {
rt.block_on (old_instance.close ())?;
}
gui_ports [port_idx].set_forwarding (false);
frame_status.set_label ("Forwarding 0 ports");
gui_client.close_port (port_idx)?;
},
Some (Message::AddPort) => {
gui_client.add_port (&mut ports_col, fltk_tx);
ports_col.recalc ();
ports_col.redraw ();
},
None => (),
}
@ -170,65 +274,57 @@ fn set_active <W: WidgetExt> (w: &mut W, b: bool) {
}
}
struct GuiPort {
input_client_port: Input,
input_server_id: Input,
input_server_port: Input,
but_open: Button,
but_close: Button,
}
impl GuiPort {
fn new (fltk_tx: fltk::app::Sender <Message>, x: &mut i32, y: i32, port_idx: usize) -> Self {
let margin = 10;
let h = 30;
fn new (fltk_tx: fltk::app::Sender <Message>, port_idx: usize) -> Self {
let mut row = Flex::default ().row ();
let w = 80;
let mut input_client_port = Input::new (*x, y, w, h, "");
*x += w + margin;
let mut input_server_id = Input::default ();
let mut input_server_port = Input::default ();
let mut input_client_port = Input::default ();
let mut but_open = Button::default ().with_label ("Open");
let mut but_close = Button::default ().with_label ("Close");
let w = 120;
let mut input_server_id = Input::new (*x, y, w, h, "");
*x += w + margin;
row.set_size (&mut input_server_id, 120);
row.set_size (&mut input_server_port, 80);
row.set_size (&mut input_client_port, 80);
row.set_size (&mut but_open, 80);
row.set_size (&mut but_close, 80);
let w = 80;
let mut input_server_port = Input::new (*x, y, w, h, "");
*x += w + margin;
let w = 80;
let mut but_open = Button::new (*x, y, w, h, "Open");
*x += w + margin;
let w = 80;
let mut but_close = Button::new (*x, y, w, h, "Close");
// *x += w + margin;
input_client_port.set_value ("5901");
input_client_port.set_value ("");
input_client_port.set_readonly (true);
input_server_id.set_value ("bogus_server");
input_server_port.set_value ("5900");
but_open.set_trigger (CallbackTrigger::Changed);
but_open.set_trigger (CallbackTrigger::Release);
but_open.emit (fltk_tx, Message::OpenPort (port_idx));
but_close.set_trigger (CallbackTrigger::Changed);
but_close.set_trigger (CallbackTrigger::Release);
but_close.emit (fltk_tx, Message::ClosePort (port_idx));
set_active (&mut but_open, true);
set_active (&mut but_close, false);
row.end ();
Self {
let mut output = Self {
row,
input_client_port,
input_server_id,
input_server_port,
but_open,
but_close,
}
};
output.set_forwarding (false);
output
}
fn get_params (&self) -> anyhow::Result <ForwardingParams>
{
let client_tcp_port = u16::from_str (&self.input_client_port.value ())?;
let server_id = self.input_server_id.value ();
let server_tcp_port = u16::from_str (&self.input_server_port.value ())?;
let server_id = self.input_server_id.value ();
let client_tcp_port = PortInfo {
server_id: &server_id,
server_tcp_port,
}.random_eph_port ();
Ok (ForwardingParams {
client_tcp_port,
@ -238,7 +334,7 @@ impl GuiPort {
}
fn set_forwarding (&mut self, x: bool) {
set_active (&mut self.input_client_port, !x);
set_active (&mut self.input_client_port, x);
set_active (&mut self.input_server_id, !x);
set_active (&mut self.input_server_port, !x);
set_active (&mut self.but_open, !x);
@ -248,3 +344,65 @@ impl GuiPort {
self.but_close.set (!x);
}
}
// This can collide, but who cares
// It's not secure or anything - It's just supposed to pick a port somewhat
// deterministically based on the server and relay info.
#[derive (serde::Serialize)]
struct PortInfo <'a> {
// relay_addr: SocketAddr,
server_id: &'a str,
server_tcp_port: u16
}
impl PortInfo <'_> {
// https://en.wikipedia.org/wiki/TCP_ports#Dynamic,_private_or_ephemeral_ports
fn random_eph_port (&self) -> u16
{
let seed = blake3::hash (&rmp_serde::to_vec (self).expect ("Can't hash PortInfo - impossible error"));
let mut rng = rand_chacha::ChaCha20Rng::from_seed (*seed.as_bytes ());
let tcp_eph_range = 49152..=65535;
rng.gen_range (tcp_eph_range)
}
}
#[cfg (test)]
mod test {
use blake3::Hasher;
use super::*;
#[test]
fn prng () {
let hasher = Hasher::default ();
let seed = hasher.finalize ();
let mut rng = rand_chacha::ChaCha20Rng::from_seed (*seed.as_bytes ());
let tcp_eph_range = 49152..=65535;
let port = rng.gen_range (tcp_eph_range);
assert_eq! (port, 49408);
for (input, expected) in vec! [
(("127.0.0.1:4000", "bogus_server", 22), 51168),
// The relay address is explicitly excluded from the eph port
// computation in case I want to support connecting to a server
// across multiple relays
(("127.0.0.1:30380", "bogus_server", 22), 51168),
(("127.0.0.1:4000", "real_server", 22), 53873),
(("127.0.0.1:4000", "bogus_server", 5900), 53844),
] {
let (_relay_addr, server_id, server_tcp_port) = input;
let input = PortInfo {
server_id,
server_tcp_port,
};
let actual = input.random_eph_port ();
assert_eq! (expected, actual);
}
}
}

View File

@ -10,10 +10,12 @@ license = "AGPL-3.0"
[dependencies]
anyhow = "1.0.38"
base64 = "0.13.0"
ctrlc = "3.2.1"
# fltk = "1.1.1"
futures-util = "0.3.9"
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] }
quinn = "0.7.2"
rand = "0.8.4"
rcgen = "0.8.11"
reqwest = "0.11.4"
rmp-serde = "0.15.5"

View File

@ -1,7 +1,16 @@
use structopt::StructOpt;
use tokio::net::TcpListener;
use tokio::{
net::UdpSocket,
sync::watch,
};
use quic_demo::prelude::*;
use quic_demo::{
client_proxy::{
ForwardingParams,
forward_port,
},
prelude::*,
};
use protocol::PeerId;
#[derive (Debug, StructOpt)]
@ -23,68 +32,136 @@ async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let opt = Opt::from_args ();
let conf = opt.into_config ().await?;
let server_cert = tokio::fs::read ("quic_server.crt").await?;
let relay_addr = opt.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?;
debug! ("Connecting to relay server");
let client_id = opt.client_id.unwrap_or_else (|| "bogus_client".to_string ());
let quinn::NewConnection {
connection,
..
} = protocol::p2_connect_to_p3 (&endpoint, &relay_addr, &client_id).await?;
// End of per-client stuff
// Beginning of per-port stuff
let server_id = opt.server_id.unwrap_or_else (|| "bogus_server".to_string ());
let client_tcp_port = opt.client_tcp_port.unwrap_or (30381);
let server_tcp_port = opt.server_tcp_port.unwrap_or (30382);
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
// End of per-port stuff
// Beginning of per-connection stuff
let task_tcp_server = tokio::spawn (async move {
loop {
let (tcp_socket, _) = listener.accept ().await?;
let connection = connection.clone ();
let server_id = server_id.clone ();
tokio::spawn (async move {
let (local_recv, local_send) = tcp_socket.into_split ();
debug! ("Starting PTTH connection");
let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?;
trace! ("Relaying bytes...");
let ptth_conn = quic_demo::connection::NewConnection {
local_send,
local_recv,
relay_send,
relay_recv,
}.build ();
ptth_conn.wait_for_close ().await?;
debug! ("Ended PTTH connection");
Ok::<_, anyhow::Error> (())
});
}
Ok::<_, anyhow::Error> (())
});
debug! ("Accepting local TCP connections from P1");
task_tcp_server.await??;
let client = P2Client::connect (conf)?;
client.run ().await?;
Ok (())
}
pub struct P2Client {
endpoint: quinn::Endpoint,
conf: Arc <Config>,
}
impl P2Client {
pub fn connect (conf: Config) -> anyhow::Result <Self> {
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
let conf = Arc::new (conf);
Ok (Self {
endpoint,
conf,
})
}
pub async fn run (&self) -> anyhow::Result <()> {
debug! ("P2 client connecting to P3 relay server");
let conf = Arc::clone (&self.conf);
let quinn::NewConnection {
connection,
..
} = protocol::p2_connect_to_p3 (&self.endpoint, &conf.relay_addr, &conf.client_id).await?;
let client_tcp_port = conf.client_tcp_port;
debug! ("Accepting local TCP connections from P1 at {}", client_tcp_port);
// End of per-port stuff
// Beginning of per-connection stuff
let (_shutdown_flag_tx, shutdown_flag_rx) = watch::channel (true);
let task_tcp_server = {
let connection = connection.clone ();
let server_id = conf.server_id.clone ();
let server_tcp_port = conf.server_tcp_port;
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
tokio::spawn (async move {
forward_port (
listener,
connection,
ForwardingParams {
client_tcp_port,
server_id,
server_tcp_port,
},
shutdown_flag_rx,
).await?;
Ok::<_, anyhow::Error> (())
})
};
if false {
let task_direc_connect = {
let connection = connection.clone ();
tokio::spawn (async move {
let cookie = protocol::p2_direc_to_p4 (
&connection,
"bogus_server",
).await?;
let sock = UdpSocket::bind ("0.0.0.0:0").await?;
let mut interval = tokio::time::interval (Duration::from_millis (1000));
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Delay);
loop {
interval.tick ().await;
sock.send_to(&cookie [..], "127.0.0.1:30379").await?;
debug! ("P2 sent cookie to P3 over plain UDP");
}
Ok::<_, anyhow::Error> (())
})
};
}
task_tcp_server.await??;
//task_direc_connect.await??;
Ok (())
}
}
/// A filled-out config for constructing a P2 client
#[derive (Clone)]
pub struct Config {
client_tcp_port: u16,
server_tcp_port: u16,
client_id: String,
server_id: String,
relay_addr: SocketAddr,
relay_cert: Vec <u8>,
}
impl Opt {
pub async fn into_config (self) -> anyhow::Result <Config> {
let client_tcp_port = self.client_tcp_port.unwrap_or (30381);
let server_tcp_port = self.server_tcp_port.unwrap_or (30382);
let client_id = self.client_id.unwrap_or_else (|| "bogus_client".to_string ());
let server_id = self.server_id.unwrap_or_else (|| "bogus_server".to_string ());
let relay_addr = self.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
// Begin I/O
let relay_cert = tokio::fs::read ("ptth_quic_output/quic_server.crt").await?;
Ok (Config {
client_tcp_port,
server_tcp_port,
client_id,
server_id,
relay_addr,
relay_cert,
})
}
}

View File

@ -1,107 +1,23 @@
use structopt::StructOpt;
use tokio::net::TcpStream;
use std::{
iter::FromIterator,
};
use tokio::sync::watch;
use quic_demo::prelude::*;
use protocol::PeerId;
#[derive (Debug, StructOpt)]
struct Opt {
#[structopt (long)]
relay_addr: Option <String>,
#[structopt (long)]
server_id: Option <PeerId>,
#[structopt (long)]
debug_echo: bool,
#[structopt (long)]
cert_url: Option <String>,
}
#[tokio::main]
async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let opt = Arc::new (Opt::from_args ());
let args = Vec::from_iter (std::env::args_os ());
let server_cert = match opt.cert_url.as_ref () {
Some (url) => reqwest::get (url).await?.bytes ().await?,
None => tokio::fs::read ("quic_server.crt").await?.into (),
};
let relay_addr = opt.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?;
let (shutdown_tx, shutdown_rx) = watch::channel (false);
trace! ("Connecting to relay server");
ctrlc::set_handler (move || {
shutdown_tx.send (true).expect ("Couldn't forward Ctrl+C signal");
})?;
trace! ("Set Ctrl+C handler");
let server_id = opt.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ());
let quinn::NewConnection {
mut bi_streams,
..
} = protocol::p4_connect_to_p3 (&endpoint, &relay_addr, &server_id).await?;
debug! ("Connected to relay server");
trace! ("Accepting bi streams from P3");
loop {
let (relay_send, relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??;
tokio::spawn (handle_bi_stream (Arc::clone (&opt), relay_send, relay_recv));
}
}
async fn handle_bi_stream (
opt: Arc <Opt>,
relay_send: quinn::SendStream,
mut relay_recv: quinn::RecvStream,
) -> anyhow::Result <()>
{
match protocol::p4_accept_p3_stream (&mut relay_recv).await? {
protocol::P3ToP4Stream::NewPtthConnection {
client_id,
..
} => handle_new_ptth_connection (opt, relay_send, relay_recv, client_id).await?,
}
Ok (())
}
async fn handle_new_ptth_connection (
opt: Arc <Opt>,
mut relay_send: quinn::SendStream,
mut relay_recv: quinn::RecvStream,
_client_id: String,
) -> anyhow::Result <()>
{
// TODO: Check authorization for P2 --> P4
protocol::p4_authorize_p2_connection (&mut relay_send).await?;
let p4_to_p5_req = protocol::p4_expect_p5_request (&mut relay_recv).await?;
// TODO: Check authorization for P1 --> P5
protocol::p4_authorize_p1_connection (&mut relay_send).await?;
debug! ("Started PTTH connection");
if opt.debug_echo {
relay_send.write (b"Connected to P4=P5 debug echo server\n").await?;
debug! ("Relaying bytes using internal debug echo server (P4=P5)");
tokio::io::copy (&mut relay_recv, &mut relay_send).await?;
}
else {
let stream = TcpStream::connect (("127.0.0.1", p4_to_p5_req.port)).await?;
let (local_recv, local_send) = stream.into_split ();
trace! ("Relaying bytes...");
let ptth_conn = quic_demo::connection::NewConnection {
local_send,
local_recv,
relay_send,
relay_recv,
}.build ();
ptth_conn.wait_for_close ().await?;
}
Ok (())
quic_demo::executable_end_server::main (&args, Some (shutdown_rx)).await
}

View File

@ -10,6 +10,10 @@ use hyper::{
StatusCode,
};
use structopt::StructOpt;
use tokio::{
net::UdpSocket,
sync::watch,
};
use quic_demo::prelude::*;
use protocol::PeerId;
@ -56,6 +60,8 @@ async fn main () -> anyhow::Result <()> {
let tcp_port = 30382;
let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?;
let (_running_tx, running_rx) = watch::channel (true);
let task_quic_server = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
@ -81,6 +87,35 @@ async fn main () -> anyhow::Result <()> {
})
};
let task_direc_server = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
let sock = UdpSocket::bind("0.0.0.0:30379").await?;
let mut buf = [0; 2048];
loop {
let (len, addr) = sock.recv_from (&mut buf).await?;
debug! ("{:?} bytes received from {:?}", len, addr);
let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x));
{
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
if let Some (direc_state) = direc_cookies.remove (&packet) {
debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id);
direc_state.p2_addr.send (addr).ok ();
}
else {
debug! ("UDP packet didn't match any PTTH_DIREC cookie");
}
}
}
Ok::<_, anyhow::Error> (())
})
};
let task_http_server = tokio::spawn (async move {
http_server.serve (make_svc).await?;
Ok::<_, anyhow::Error> (())
@ -89,24 +124,22 @@ async fn main () -> anyhow::Result <()> {
let task_tcp_server = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
loop {
while *running_rx.borrow () {
let (tcp_socket, _) = tcp_listener.accept ().await?;
let server_id = "bogus_server".to_string ();
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
let (client_recv, client_send) = tcp_socket.into_split ();
let (_client_recv, _client_send) = tcp_socket.into_split ();
debug! ("Accepted direct TCP connection P1 --> P3");
let p4_server_proxies = relay_state.p4_server_proxies.lock ().await;
let p4 = match p4_server_proxies.get ("bogus_server") {
let _p4 = match p4_server_proxies.get ("bogus_server") {
Some (x) => x,
None => bail! ("That server isn't connected"),
};
unimplemented! ();
// unimplemented! ();
/*
p4.req_channel.send (RequestP2ToP4 {
client_send,
@ -127,6 +160,7 @@ async fn main () -> anyhow::Result <()> {
task_quic_server.await??;
task_http_server.await??;
task_tcp_server.await??;
task_direc_server.await??;
Ok (())
}
@ -152,9 +186,16 @@ async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
#[derive (Default)]
struct RelayState {
p4_server_proxies: Mutex <HashMap <PeerId, P4State>>,
direc_cookies: Mutex <HashMap <Vec <u8>, DirecState>>,
stats: Stats,
}
struct DirecState {
start_time: Instant,
p2_id: PeerId,
p2_addr: tokio::sync::oneshot::Sender <SocketAddr>,
}
#[derive (Default)]
struct Stats {
quic: ConnectEvents,
@ -319,7 +360,28 @@ async fn handle_p2_connection (
match protocol::p3_accept_p2_stream (&mut recv).await? {
protocol::P2ToP3Stream::ConnectP2ToP4 {
server_id,
} => handle_request_p2_to_p4 (relay_state, client_id, server_id, send, recv).await?,
} => {
handle_request_p2_to_p4 (
relay_state,
client_id,
server_id,
send,
recv
).await?
},
protocol::P2ToP3Stream::DirecP2ToP4 {
server_id,
cookie,
} => {
handle_direc_p2_to_p4 (
relay_state,
client_id,
server_id,
cookie,
send,
recv
).await?
},
}
debug! ("Request ended for P2");
@ -363,6 +425,41 @@ async fn handle_request_p2_to_p4 (
Ok (())
}
async fn handle_direc_p2_to_p4 (
relay_state: Arc <RelayState>,
client_id: String,
server_id: PeerId,
cookie: Vec <u8>,
mut client_send: quinn::SendStream,
client_recv: quinn::RecvStream,
) -> anyhow::Result <()>
{
debug! ("P2 {} wants a P2P connection to P4 {}", client_id, server_id);
// TODO: Check authorization
protocol::p3_authorize_p2_to_p4_direc (&mut client_send).await?;
let (tx, rx) = tokio::sync::oneshot::channel ();
{
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
direc_cookies.insert (cookie, DirecState {
start_time: Instant::now (),
p2_id: client_id.clone (),
p2_addr: tx,
});
}
debug! ("Waiting to learn P2's WAN address...");
let wan_addr = rx.await?;
debug! ("And that WAN address is {}", wan_addr);
Ok (())
}
async fn handle_p4_connection (
relay_state: Arc <RelayState>,
conn: quinn::NewConnection,

View File

@ -9,33 +9,48 @@ use crate::prelude::*;
pub struct ForwardingInstance {
task: JoinHandle <anyhow::Result <()>>,
shutdown_flag: watch::Sender <bool>,
local_port: u16,
}
impl ForwardingInstance {
pub fn new (
pub async fn new (
connection_p2_p3: quinn::Connection,
params: ForwardingParams,
) -> Self
) -> anyhow::Result <Self>
{
let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true);
let listener = TcpListener::bind (("127.0.0.1", params.client_tcp_port)).await?;
let local_port = listener.local_addr ()?.port ();
trace! ("Accepting local TCP connections from P1 on {}", local_port);
let task = tokio::spawn (forward_port (
listener,
connection_p2_p3,
params,
shutdown_flag_rx
));
Self {
Ok (Self {
task,
shutdown_flag,
}
local_port,
})
}
pub async fn close (self) -> anyhow::Result <()> {
self.shutdown_flag.send (false)?;
self.task.await??;
if self.shutdown_flag.send (false).is_err () {
warn! ("Trying to gracefully shutdown forwarding task but it appears to already be shut down");
}
self.task.await
.context ("awaiting ForwardingInstance task")?
.context ("inside ForwardingInstance task")?;
Ok (())
}
pub fn local_port (&self) -> u16 {
self.local_port
}
}
pub struct ForwardingParams {
@ -44,25 +59,23 @@ pub struct ForwardingParams {
pub server_tcp_port: u16,
}
async fn forward_port (
/// Starts a TCP listener that can forward any number of TCP streams to
/// the same client:server port combination
pub async fn forward_port (
listener: TcpListener,
connection_p2_p3: quinn::Connection,
params: ForwardingParams,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
mut shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()>
{
let ForwardingParams {
client_tcp_port,
server_id,
server_tcp_port,
..
} = params;
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
while *shutdown_flag_rx.borrow () {
let mut shutdown_flag_rx_2 = shutdown_flag_rx.clone ();
tokio::select! {
x = listener.accept () => {
let (tcp_socket, _) = x?;
@ -72,7 +85,7 @@ async fn forward_port (
tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx));
},
_ = shutdown_flag_rx_2.changed () => (),
_ = shutdown_flag_rx.changed () => (),
};
}

View File

@ -0,0 +1,217 @@
use structopt::StructOpt;
use tokio::{
net::TcpStream,
sync::watch,
};
use crate::prelude::*;
use protocol::PeerId;
/// A partially-filled-out config that structopt can deal with
/// Try to turn this into a Config as soon as possible.
#[derive (Debug, StructOpt)]
struct Opt {
#[structopt (long)]
relay_addr: Option <String>,
#[structopt (long)]
server_id: Option <PeerId>,
#[structopt (long)]
debug_echo: bool,
#[structopt (long)]
cert_url: Option <String>,
}
pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool>>) -> anyhow::Result <()> {
let opt = Opt::from_iter (args);
let conf = opt.into_config ().await?;
let end_server = Arc::new (P4EndServer::connect (conf)?);
let run_task = {
let end_server = Arc::clone (&end_server);
tokio::spawn (async move {
end_server.run ().await?;
Ok::<_, anyhow::Error> (())
})
};
if let Some (mut shutdown_rx) = shutdown_rx {
while ! *shutdown_rx.borrow () {
shutdown_rx.changed ().await?;
}
end_server.shut_down ()?;
}
run_task.await??;
trace! ("P4 end server shut down gracefully.");
Ok (())
}
/// A filled-out config for constructing an end server
#[derive (Clone)]
pub struct Config {
pub debug_echo: bool,
pub id: String,
pub relay_addr: SocketAddr,
pub relay_cert: Vec <u8>,
}
impl Opt {
/// Converts self into a Config that the server can use.
/// Performs I/O to load the relay cert from disk or from HTTP.
/// Fails if arguments can't be parsed or if I/O fails.
pub async fn into_config (self) -> anyhow::Result <Config> {
let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ());
let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
// Do I/O after all parsing is done.
// We don't want to waste a network request only to come back and error
// out on like "127.oooo.1" not parsing into a relay address.
let relay_cert: Vec <u8> = match self.cert_url.as_ref () {
Some (url) => reqwest::get (url).await?.bytes ().await?.into_iter ().collect (),
None => tokio::fs::read ("ptth_quic_output/quic_server.crt").await?,
};
Ok (Config {
debug_echo: self.debug_echo,
id,
relay_addr,
relay_cert,
})
}
}
pub struct P4EndServer {
endpoint: quinn::Endpoint,
conf: Arc <Config>,
shutdown_tx: watch::Sender <bool>,
shutdown_rx: watch::Receiver <bool>,
}
impl P4EndServer {
pub fn connect (conf: Config) -> anyhow::Result <Self> {
trace! ("P4 end server making its QUIC endpoint");
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
let (shutdown_tx, shutdown_rx) = watch::channel (false);
Ok (P4EndServer {
conf: Arc::new (conf),
endpoint,
shutdown_tx,
shutdown_rx,
})
}
pub fn config (&self) -> &Config {
&*self.conf
}
pub async fn run (&self) -> anyhow::Result <()> {
trace! ("P4 end server connecting to P3 relay server");
let quinn::NewConnection {
mut bi_streams,
..
} = protocol::p4_connect_to_p3 (
&self.endpoint,
&self.conf.relay_addr,
&self.conf.id
).await?;
debug! ("Connected to relay server");
trace! ("Accepting bi streams from P3");
let mut shutdown_rx = self.shutdown_rx.clone ();
loop {
tokio::select! {
_ = shutdown_rx.changed () => {
if *shutdown_rx.borrow () {
trace! ("P4 incoming bi streams task caught graceful shutdown");
break;
}
}
stream_opt = bi_streams.next () => {
let (relay_send, relay_recv) = stream_opt.ok_or_else (|| anyhow::anyhow! ("P4 ran out of incoming streams. Maybe P3 shut down or disconnected?"))??;
tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv));
}
};
}
Ok (())
}
pub fn shut_down (&self) -> anyhow::Result <()> {
trace! ("P4 end server shutting down...");
Ok (self.shutdown_tx.send (true)?)
}
pub fn shutting_down (&self) -> bool {
*self.shutdown_rx.borrow ()
}
}
async fn handle_bi_stream (
conf: Arc <Config>,
relay_send: quinn::SendStream,
mut relay_recv: quinn::RecvStream,
) -> anyhow::Result <()>
{
match protocol::p4_accept_p3_stream (&mut relay_recv).await? {
protocol::P3ToP4Stream::NewPtthConnection {
client_id,
..
} => handle_new_ptth_connection (conf, relay_send, relay_recv, client_id).await?,
}
Ok (())
}
async fn handle_new_ptth_connection (
conf: Arc <Config>,
mut relay_send: quinn::SendStream,
mut relay_recv: quinn::RecvStream,
_client_id: String,
) -> anyhow::Result <()>
{
// TODO: Check authorization for P2 --> P4
protocol::p4_authorize_p2_connection (&mut relay_send).await?;
let p4_to_p5_req = protocol::p4_expect_p5_request (&mut relay_recv).await?;
// TODO: Check authorization for P1 --> P5
protocol::p4_authorize_p1_connection (&mut relay_send).await?;
debug! ("Started PTTH connection");
if conf.debug_echo {
relay_send.write (b"Connected to P4=P5 debug echo server\n").await?;
debug! ("Relaying bytes using internal debug echo server (P4=P5)");
tokio::io::copy (&mut relay_recv, &mut relay_send).await?;
}
else {
let stream = TcpStream::connect (("127.0.0.1", p4_to_p5_req.port)).await?;
let (local_recv, local_send) = stream.into_split ();
trace! ("Relaying bytes...");
let ptth_conn = crate::connection::NewConnection {
local_send,
local_recv,
relay_send,
relay_recv,
}.build ();
ptth_conn.wait_for_close ().await?;
}
Ok (())
}

View File

@ -1,5 +1,6 @@
pub mod client_proxy;
pub mod connection;
pub mod executable_end_server;
pub mod prelude;
pub mod protocol;
pub mod quinn_utils;

View File

@ -1,5 +1,7 @@
pub use std::{
collections::*,
ffi::OsString,
iter::FromIterator,
net::SocketAddr,
sync::{
Arc,
@ -8,7 +10,10 @@ pub use std::{
Ordering,
},
},
time::Duration,
time::{
Duration,
Instant,
},
};
pub use anyhow::{
@ -28,6 +33,10 @@ pub use tokio::{
},
task::JoinHandle,
};
pub use rand::{
Rng,
RngCore,
};
pub use tracing::{
debug,
error,

View File

@ -16,6 +16,9 @@ const MAX_ID_LENGTH: usize = 128;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Command (pub u8);
// I can't remember how I picked the numbers. Just increment I guess,
// and then switch to a variable-length format around 200.
impl Command {
pub const CONNECT_P2_TO_P3: Command = Command (2);
pub const CONNECT_P4_TO_P3: Command = Command (4);
@ -23,6 +26,7 @@ impl Command {
pub const CONNECT_P2_TO_P4_STEP_2: Command = Command (11);
pub const CONNECT_P2_TO_P5: Command = Command (12);
pub const OKAY: Command = Command (20);
pub const DIREC_P2_TO_P3: Command = Command (21);
}
pub async fn p2_connect_to_p3 (
@ -61,7 +65,7 @@ pub async fn p2_connect_to_p5 (
let cmd_type = Command::CONNECT_P2_TO_P4.0;
send.write_all (&[cmd_type, 0, 0, 0]).await?;
send_lv_string (&mut send, &server_id).await?;
send_lv_string (&mut send, server_id).await?;
expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await
.context ("P2 didn't get OK response when asking P3 to connect P2 to P4")?;
@ -79,6 +83,30 @@ pub async fn p2_connect_to_p5 (
Ok ((send, recv))
}
pub async fn p2_direc_to_p4 (
connection: &quinn::Connection,
server_id: &str,
) -> Result <Vec <u8>>
{
let (mut send, mut recv) = connection.open_bi ().await?;
let cmd_type = Command::DIREC_P2_TO_P3.0;
let mut cookie = vec! [0u8; 32];
rand::thread_rng ().fill_bytes (&mut cookie [..]);
let cookie = cookie;
send.write_all (&[cmd_type, 0, 0, 0]).await?;
send_lv_string (&mut send, server_id).await?;
send_lv_u16 (&mut send, &cookie).await?;
debug! ("Waiting for OK response for DIREC");
expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await?;
Ok (cookie)
}
pub enum P3Peer {
P2ClientProxy (P2ClientProxy),
P4ServerProxy (P4ServerProxy),
@ -163,6 +191,14 @@ pub enum P2ToP3Stream {
ConnectP2ToP4 {
server_id: PeerId,
},
DirecP2ToP4 {
/// P2 wants a P2P connection to this P4
server_id: PeerId,
/// P2 will send this cookie over plain UDP to P3
/// P3 will learn P2's WAN address from that.
cookie: Vec <u8>,
},
}
pub async fn p3_accept_p2_stream (
@ -182,6 +218,15 @@ pub async fn p3_accept_p2_stream (
server_id,
}
},
Command::DIREC_P2_TO_P3 => {
let server_id = recv_lv_string (recv, MAX_ID_LENGTH).await?;
let cookie = recv_lv_u16 (recv, 64).await?;
P2ToP3Stream::DirecP2ToP4 {
server_id,
cookie,
}
},
_ => bail! ("Invalid command type while P3 was accepting a new bi stream from P2"),
})
}
@ -194,6 +239,14 @@ pub async fn p3_authorize_p2_to_p4_connection (
Ok (())
}
pub async fn p3_authorize_p2_to_p4_direc (
send: &mut SendStream,
) -> Result <()>
{
send.write_all (&[Command::OKAY.0, Command::DIREC_P2_TO_P3.0, 0, 0]).await?;
Ok (())
}
pub async fn p4_connect_to_p3 (
endpoint: &quinn::Endpoint,
relay_addr: &std::net::SocketAddr,

View File

@ -58,7 +58,7 @@ pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Incoming,
fn configure_client(server_certs: &[&[u8]]) -> anyhow::Result<ClientConfig> {
let mut cfg_builder = ClientConfigBuilder::default();
for cert in server_certs {
cfg_builder.add_certificate_authority(Certificate::from_der(&cert)?)?;
cfg_builder.add_certificate_authority(Certificate::from_der(cert)?)?;
}
Ok(cfg_builder.build())
}

View File

@ -1 +1 @@
1.50.0
1.55.0