Compare commits
42 Commits
2a90930723
...
dfc6885b8c
Author | SHA1 | Date |
---|---|---|
_ | dfc6885b8c | |
(on company time) | 43fe373847 | |
(on company time) | 0d6290ed60 | |
(on company time) | ffeeda1e0a | |
(on company time) | 1905d85a59 | |
(on company time) | 5e7f691a60 | |
(on company time) | 200c07da2f | |
(on company time) | ce7a539413 | |
(on company time) | 96af820ffb | |
(on company time) | 74fb0cbe5a | |
(on company time) | 3f4bce85a4 | |
(on company time) | e3ff600b51 | |
(on company time) | 0992d76fdc | |
(on company time) | 27ed72b196 | |
(on company time) | 3f0272ed09 | |
(on company time) | 605c15468a | |
(on company time) | 415c8954c1 | |
(on company time) | 93be903b86 | |
(on company time) | 33fe10ee27 | |
(on company time) | 6ba988c2c8 | |
(on company time) | 0fc99e7c26 | |
(on company time) | 86e5305630 | |
(on company time) | 5eda2c4288 | |
(on company time) | 50332bab69 | |
(on company time) | 24dac2cc39 | |
(on company time) | 996543cecc | |
(on company time) | c13d1f37bf | |
(on company time) | 5a9c301747 | |
(on company time) | 91a29abb39 | |
(on company time) | 9ab3b42e32 | |
(on company time) | b53748b2c4 | |
(on company time) | e05c4fa8bf | |
(on company time) | d93c1404b7 | |
(on company time) | 7f2dc47aec | |
(on company time) | 80c2ef17be | |
(on company time) | f9e10e0f64 | |
(on company time) | 8a302f3049 | |
(on company time) | fd3c85fccd | |
(on company time) | 963631ff96 | |
(on company time) | 036193a19e | |
(on company time) | b5be7709a3 | |
(on company time) | edd7e8de54 |
|
@ -9,6 +9,10 @@
|
||||||
/scope/untracked
|
/scope/untracked
|
||||||
/scraper-secret.txt
|
/scraper-secret.txt
|
||||||
/target
|
/target
|
||||||
|
/untracked
|
||||||
|
|
||||||
# TLS certs used for QUIC experiments
|
# TLS certs used for QUIC experiments
|
||||||
*.crt
|
*.crt
|
||||||
|
|
||||||
|
# Kate editor temp file
|
||||||
|
*.kate-swp
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,8 +1,8 @@
|
||||||
# https://whitfin.io/speeding-up-rust-docker-builds/
|
# https://whitfin.io/speeding-up-rust-docker-builds/
|
||||||
# TODO: https://stackoverflow.com/questions/57389547/how-to-define-the-context-for-a-docker-build-as-a-specific-commit-on-one-of-the
|
# TODO: https://stackoverflow.com/questions/57389547/how-to-define-the-context-for-a-docker-build-as-a-specific-commit-on-one-of-the
|
||||||
|
|
||||||
# rust:1.60-slim-buster
|
# docker pull rust:1.66-slim-buster
|
||||||
FROM rust@sha256:c0f26a0b299a8a74cd87be0b4bd291d55aa292198bab1bafd906edd8665edb82 as build
|
FROM rust@sha256:98c9b1fca0c9a6183369daf9efadb57c634340ae877bb027aeadf72afdd086a3 as build
|
||||||
|
|
||||||
WORKDIR /
|
WORKDIR /
|
||||||
ENV USER root
|
ENV USER root
|
||||||
|
@ -20,7 +20,8 @@ cargo new --bin crates/ptth_server && \
|
||||||
cargo new --bin crates/ptth_file_server_bin && \
|
cargo new --bin crates/ptth_file_server_bin && \
|
||||||
cargo new --bin tools/ptth_tail && \
|
cargo new --bin tools/ptth_tail && \
|
||||||
cargo new --bin crates/debug_proxy && \
|
cargo new --bin crates/debug_proxy && \
|
||||||
cargo new --bin crates/ptth_quic
|
cargo new --bin crates/ptth_quic && \
|
||||||
|
cargo new --lib crates/udp_over_tcp
|
||||||
|
|
||||||
# copy over your manifests
|
# copy over your manifests
|
||||||
COPY ./Cargo.lock ./
|
COPY ./Cargo.lock ./
|
||||||
|
@ -29,6 +30,7 @@ COPY ./crates/always_equal/Cargo.toml ./crates/always_equal/
|
||||||
COPY ./crates/ptth_core/Cargo.toml ./crates/ptth_core/
|
COPY ./crates/ptth_core/Cargo.toml ./crates/ptth_core/
|
||||||
COPY ./crates/ptth_relay/Cargo.toml ./crates/ptth_relay/
|
COPY ./crates/ptth_relay/Cargo.toml ./crates/ptth_relay/
|
||||||
COPY ./crates/ptth_quic/Cargo.toml ./crates/ptth_quic/
|
COPY ./crates/ptth_quic/Cargo.toml ./crates/ptth_quic/
|
||||||
|
COPY ./crates/udp_over_tcp/Cargo.toml ./crates/udp_over_tcp/
|
||||||
|
|
||||||
# this build step will cache your dependencies
|
# this build step will cache your dependencies
|
||||||
RUN cargo build --release -p ptth_relay
|
RUN cargo build --release -p ptth_relay
|
||||||
|
|
|
@ -10,23 +10,28 @@ repository = "https://six-five-six-four.com/git/reactor/ptth"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.38"
|
anyhow = "1.0.66"
|
||||||
base64 = "0.13.0"
|
arc-swap = "1.5.1"
|
||||||
ctrlc = "3.2.1"
|
base64 = "0.20.0"
|
||||||
# fltk = "1.1.1"
|
ctrlc = "3.2.4"
|
||||||
futures-util = "0.3.9"
|
futures-util = "0.3.25"
|
||||||
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] }
|
hyper = { version = "0.14.23", features = ["http1", "server", "stream", "tcp"] }
|
||||||
quinn = "0.8.5"
|
quinn = "0.9.3"
|
||||||
rand = "0.8.4"
|
rand = "0.8.5"
|
||||||
rcgen = "0.8.11"
|
rcgen = "0.10.0"
|
||||||
rmp-serde = "0.15.5"
|
ring = "0.16.20"
|
||||||
rustls = "0.20.4"
|
rmp-serde = "1.1.1"
|
||||||
structopt = "0.3.20"
|
rustls = "0.20.7"
|
||||||
tokio = { version = "1.8.1", features = ["full"] }
|
rusty_ulid = "1.0.0"
|
||||||
tracing-subscriber = "0.2.16"
|
serde = "1.0.151"
|
||||||
tracing = "0.1.25"
|
serde_json = "1.0.89"
|
||||||
|
structopt = "0.3.26"
|
||||||
|
tokio = { version = "1.23.0", features = ["full"] }
|
||||||
|
tracing-subscriber = "0.3.16"
|
||||||
|
tracing = "0.1.37"
|
||||||
|
udp_over_tcp = { path = "../udp_over_tcp" }
|
||||||
|
|
||||||
[dependencies.reqwest]
|
[dependencies.reqwest]
|
||||||
version = "0.11.10"
|
version = "0.11.13"
|
||||||
default-features = false
|
default-features = false
|
||||||
features = ["stream", "rustls-tls", "hyper-rustls"]
|
features = ["stream", "rustls-tls", "hyper-rustls"]
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
# https://whitfin.io/speeding-up-rust-docker-builds/
|
# https://whitfin.io/speeding-up-rust-docker-builds/
|
||||||
# TODO: https://stackoverflow.com/questions/57389547/how-to-define-the-context-for-a-docker-build-as-a-specific-commit-on-one-of-the
|
# TODO: https://stackoverflow.com/questions/57389547/how-to-define-the-context-for-a-docker-build-as-a-specific-commit-on-one-of-the
|
||||||
|
|
||||||
# rust:1.64-slim-buster
|
# docker pull rust:1.66-slim-buster
|
||||||
FROM rust@sha256:7da4fbd2dc7176746e8e5c371aeb0bbe742598c4906fa48cb2d87a4b89d50357 as build
|
FROM rust@sha256:98c9b1fca0c9a6183369daf9efadb57c634340ae877bb027aeadf72afdd086a3 as build
|
||||||
|
|
||||||
WORKDIR /
|
WORKDIR /
|
||||||
ENV USER root
|
ENV USER root
|
||||||
|
@ -20,7 +20,8 @@ cargo new --bin crates/ptth_server && \
|
||||||
cargo new --bin crates/ptth_file_server_bin && \
|
cargo new --bin crates/ptth_file_server_bin && \
|
||||||
cargo new --bin tools/ptth_tail && \
|
cargo new --bin tools/ptth_tail && \
|
||||||
cargo new --bin crates/debug_proxy && \
|
cargo new --bin crates/debug_proxy && \
|
||||||
cargo new --bin crates/ptth_quic
|
cargo new --bin crates/ptth_quic && \
|
||||||
|
cargo new --lib crates/udp_over_tcp
|
||||||
|
|
||||||
# copy over your manifests
|
# copy over your manifests
|
||||||
COPY ./Cargo.lock ./
|
COPY ./Cargo.lock ./
|
||||||
|
@ -29,6 +30,7 @@ COPY ./crates/always_equal/Cargo.toml ./crates/always_equal/
|
||||||
COPY ./crates/ptth_core/Cargo.toml ./crates/ptth_core/
|
COPY ./crates/ptth_core/Cargo.toml ./crates/ptth_core/
|
||||||
COPY ./crates/ptth_relay/Cargo.toml ./crates/ptth_relay/
|
COPY ./crates/ptth_relay/Cargo.toml ./crates/ptth_relay/
|
||||||
COPY ./crates/ptth_quic/Cargo.toml ./crates/ptth_quic/
|
COPY ./crates/ptth_quic/Cargo.toml ./crates/ptth_quic/
|
||||||
|
COPY ./crates/udp_over_tcp/Cargo.toml ./crates/udp_over_tcp/
|
||||||
|
|
||||||
# this build step will cache your dependencies
|
# this build step will cache your dependencies
|
||||||
RUN cargo build --release -p ptth_quic
|
RUN cargo build --release -p ptth_quic
|
||||||
|
@ -39,7 +41,8 @@ src/*.rs \
|
||||||
crates/always_equal/src/*.rs \
|
crates/always_equal/src/*.rs \
|
||||||
crates/ptth_core/src/*.rs \
|
crates/ptth_core/src/*.rs \
|
||||||
crates/ptth_relay/src/*.rs \
|
crates/ptth_relay/src/*.rs \
|
||||||
crates/ptth_quic/src/*.rs
|
crates/ptth_quic/src/*.rs \
|
||||||
|
crates/udp_over_tcp/src/*.rs
|
||||||
|
|
||||||
# Copy source tree
|
# Copy source tree
|
||||||
# Yes, I tried a few variations on the syntax. Dockerfiles are just rough.
|
# Yes, I tried a few variations on the syntax. Dockerfiles are just rough.
|
||||||
|
@ -50,6 +53,7 @@ COPY ./crates/ptth_core ./crates/ptth_core
|
||||||
COPY ./crates/ptth_relay ./crates/ptth_relay
|
COPY ./crates/ptth_relay ./crates/ptth_relay
|
||||||
COPY ./handlebars/ ./handlebars
|
COPY ./handlebars/ ./handlebars
|
||||||
COPY ./crates/ptth_quic ./crates/ptth_quic
|
COPY ./crates/ptth_quic ./crates/ptth_quic
|
||||||
|
COPY ./crates/udp_over_tcp ./crates/udp_over_tcp
|
||||||
|
|
||||||
# Bug in cargo's incremental build logic, triggered by
|
# Bug in cargo's incremental build logic, triggered by
|
||||||
# Docker doing something funny with mtimes? Maybe?
|
# Docker doing something funny with mtimes? Maybe?
|
||||||
|
@ -58,6 +62,7 @@ RUN touch crates/ptth_core/src/lib.rs
|
||||||
# build for release
|
# build for release
|
||||||
# gate only on ptth_relay tests for now
|
# gate only on ptth_relay tests for now
|
||||||
RUN \
|
RUN \
|
||||||
|
find . && \
|
||||||
cargo build --release -p ptth_quic --bin ptth_quic_relay_server && \
|
cargo build --release -p ptth_quic --bin ptth_quic_relay_server && \
|
||||||
cargo test --release -p ptth_quic --bin ptth_quic_relay_server
|
cargo test --release -p ptth_quic --bin ptth_quic_relay_server
|
||||||
|
|
||||||
|
|
|
@ -61,10 +61,7 @@ impl P2Client {
|
||||||
|
|
||||||
let conf = Arc::clone (&self.conf);
|
let conf = Arc::clone (&self.conf);
|
||||||
|
|
||||||
let quinn::NewConnection {
|
let connection = protocol::p2_connect_to_p3 (&self.endpoint, conf.relay_addr, &conf.client_id).await?;
|
||||||
connection,
|
|
||||||
..
|
|
||||||
} = protocol::p2_connect_to_p3 (&self.endpoint, conf.relay_addr, &conf.client_id).await?;
|
|
||||||
|
|
||||||
let client_tcp_port = conf.client_tcp_port;
|
let client_tcp_port = conf.client_tcp_port;
|
||||||
|
|
||||||
|
|
|
@ -1,16 +1,13 @@
|
||||||
use std::{
|
|
||||||
iter::FromIterator,
|
|
||||||
};
|
|
||||||
|
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use ptth_quic::prelude::*;
|
use ptth_quic::prelude::*;
|
||||||
|
use ptth_quic::executable_end_server as server;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main () -> anyhow::Result <()> {
|
async fn main () -> anyhow::Result <()> {
|
||||||
tracing_subscriber::fmt::init ();
|
tracing_subscriber::fmt::init ();
|
||||||
|
|
||||||
let args = Vec::from_iter (std::env::args_os ());
|
let args: Vec <_> = std::env::args_os ().collect ();
|
||||||
|
|
||||||
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
||||||
|
|
||||||
|
@ -19,5 +16,5 @@ async fn main () -> anyhow::Result <()> {
|
||||||
})?;
|
})?;
|
||||||
trace! ("Set Ctrl+C handler");
|
trace! ("Set Ctrl+C handler");
|
||||||
|
|
||||||
ptth_quic::executable_end_server::main (&args, Some (shutdown_rx)).await
|
server::main (&args, Some (shutdown_rx)).await
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use ptth_quic::prelude::*;
|
use ptth_quic::prelude::*;
|
||||||
|
use ptth_quic::executable_relay_server as relay;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main () -> anyhow::Result <()> {
|
async fn main () -> anyhow::Result <()> {
|
||||||
|
@ -8,7 +9,7 @@ async fn main () -> anyhow::Result <()> {
|
||||||
|
|
||||||
tracing_subscriber::fmt::init ();
|
tracing_subscriber::fmt::init ();
|
||||||
|
|
||||||
let opt = ptth_quic::executable_relay_server::Opt::from_args ();
|
let opt = relay::Opt::from_args ();
|
||||||
|
|
||||||
let (running_tx, mut running_rx) = watch::channel (true);
|
let (running_tx, mut running_rx) = watch::channel (true);
|
||||||
|
|
||||||
|
@ -17,8 +18,15 @@ async fn main () -> anyhow::Result <()> {
|
||||||
})?;
|
})?;
|
||||||
trace! ("Set Ctrl+C handler");
|
trace! ("Set Ctrl+C handler");
|
||||||
|
|
||||||
|
let app = relay::App::new (opt).await?;
|
||||||
|
println! ("Base64 cert: {}", base64::encode (app.server_cert ()));
|
||||||
|
println! ("Listening on {}", app.listen_addr ());
|
||||||
|
|
||||||
|
tokio::fs::create_dir_all ("ptth_quic_output").await?;
|
||||||
|
tokio::fs::write ("ptth_quic_output/quic_server.crt", app.server_cert ()).await?;
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
val = ptth_quic::executable_relay_server::main (opt) => {
|
val = app.run () => {
|
||||||
|
|
||||||
},
|
},
|
||||||
val = running_rx.changed () => {
|
val = running_rx.changed () => {
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
|
||||||
|
#[cfg (test)]
|
||||||
|
mod test {
|
||||||
|
#[test]
|
||||||
|
fn signing () -> anyhow::Result <()> {
|
||||||
|
use std::fs;
|
||||||
|
use ring::{
|
||||||
|
signature::{
|
||||||
|
self,
|
||||||
|
Ed25519KeyPair,
|
||||||
|
KeyPair,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
fs::create_dir_all ("untracked")?;
|
||||||
|
|
||||||
|
let rng = ring::rand::SystemRandom::new ();
|
||||||
|
let pkcs8_bytes = Ed25519KeyPair::generate_pkcs8 (&rng).map_err (|_| anyhow::anyhow! ("generate_pkcs8"))?;
|
||||||
|
|
||||||
|
let key_pair = Ed25519KeyPair::from_pkcs8 (pkcs8_bytes.as_ref ()).map_err (|_| anyhow::anyhow! ("from_pkcs8"))?;
|
||||||
|
|
||||||
|
const MESSAGE: &[u8] = b":V";
|
||||||
|
let sig = key_pair.sign (MESSAGE);
|
||||||
|
|
||||||
|
let peer_public_key_bytes = key_pair.public_key ().as_ref ();
|
||||||
|
let peer_public_key = signature::UnparsedPublicKey::new (&signature::ED25519, peer_public_key_bytes);
|
||||||
|
|
||||||
|
peer_public_key.verify (MESSAGE, sig.as_ref ()).map_err (|_| anyhow::anyhow! ("verify"))?;
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
|
}
|
|
@ -10,7 +10,7 @@ use protocol::PeerId;
|
||||||
/// A partially-filled-out config that structopt can deal with
|
/// A partially-filled-out config that structopt can deal with
|
||||||
/// Try to turn this into a Config as soon as possible.
|
/// Try to turn this into a Config as soon as possible.
|
||||||
#[derive (Debug, StructOpt)]
|
#[derive (Debug, StructOpt)]
|
||||||
struct Opt {
|
pub struct Opt {
|
||||||
#[structopt (long)]
|
#[structopt (long)]
|
||||||
relay_addr: Option <String>,
|
relay_addr: Option <String>,
|
||||||
#[structopt (long)]
|
#[structopt (long)]
|
||||||
|
@ -19,6 +19,18 @@ struct Opt {
|
||||||
debug_echo: bool,
|
debug_echo: bool,
|
||||||
#[structopt (long)]
|
#[structopt (long)]
|
||||||
cert_url: Option <String>,
|
cert_url: Option <String>,
|
||||||
|
#[structopt (long)]
|
||||||
|
use_udp_over_tcp: Option <bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A filled-out config for constructing an end server
|
||||||
|
#[derive (Clone)]
|
||||||
|
pub (crate) struct Config {
|
||||||
|
pub debug_echo: bool,
|
||||||
|
pub id: String,
|
||||||
|
pub relay_addr: SocketAddr,
|
||||||
|
pub relay_cert: Vec <u8>,
|
||||||
|
pub use_udp_over_tcp: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool>>) -> anyhow::Result <()> {
|
pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool>>) -> anyhow::Result <()> {
|
||||||
|
@ -26,10 +38,9 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool
|
||||||
let opt = Opt::from_iter (args);
|
let opt = Opt::from_iter (args);
|
||||||
let conf = opt.into_config ().await?;
|
let conf = opt.into_config ().await?;
|
||||||
|
|
||||||
let end_server = Arc::new (P4EndServer::connect (conf)?);
|
let (end_server, shutdown_tx) = P4EndServer::connect (conf).await?;
|
||||||
|
|
||||||
let run_task = {
|
let run_task = {
|
||||||
let end_server = Arc::clone (&end_server);
|
|
||||||
tokio::spawn (async move {
|
tokio::spawn (async move {
|
||||||
end_server.run ().await?;
|
end_server.run ().await?;
|
||||||
Ok::<_, anyhow::Error> (())
|
Ok::<_, anyhow::Error> (())
|
||||||
|
@ -40,7 +51,8 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool
|
||||||
while ! *shutdown_rx.borrow () {
|
while ! *shutdown_rx.borrow () {
|
||||||
shutdown_rx.changed ().await?;
|
shutdown_rx.changed ().await?;
|
||||||
}
|
}
|
||||||
end_server.shut_down ()?;
|
trace! ("P4 end server shutting down...");
|
||||||
|
shutdown_tx.send (true)?
|
||||||
}
|
}
|
||||||
|
|
||||||
run_task.await??;
|
run_task.await??;
|
||||||
|
@ -50,21 +62,12 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool
|
||||||
Ok (())
|
Ok (())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A filled-out config for constructing an end server
|
|
||||||
#[derive (Clone)]
|
|
||||||
pub struct Config {
|
|
||||||
pub debug_echo: bool,
|
|
||||||
pub id: String,
|
|
||||||
pub relay_addr: SocketAddr,
|
|
||||||
pub relay_cert: Vec <u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Opt {
|
impl Opt {
|
||||||
/// Converts self into a Config that the server can use.
|
/// Converts self into a Config that the server can use.
|
||||||
/// Performs I/O to load the relay cert from disk or from HTTP.
|
/// Performs I/O to load the relay cert from disk or from HTTP.
|
||||||
/// Fails if arguments can't be parsed or if I/O fails.
|
/// Fails if arguments can't be parsed or if I/O fails.
|
||||||
|
|
||||||
pub async fn into_config (self) -> anyhow::Result <Config> {
|
pub (crate) async fn into_config (self) -> anyhow::Result <Config> {
|
||||||
let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ());
|
let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ());
|
||||||
|
|
||||||
let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
|
let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
|
||||||
|
@ -83,53 +86,74 @@ impl Opt {
|
||||||
id,
|
id,
|
||||||
relay_addr,
|
relay_addr,
|
||||||
relay_cert,
|
relay_cert,
|
||||||
|
use_udp_over_tcp: self.use_udp_over_tcp.unwrap_or (false),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct P4EndServer {
|
pub struct P4EndServer {
|
||||||
|
conf: Config,
|
||||||
|
conn: quinn::Connection,
|
||||||
endpoint: quinn::Endpoint,
|
endpoint: quinn::Endpoint,
|
||||||
conf: Arc <Config>,
|
|
||||||
shutdown_tx: watch::Sender <bool>,
|
|
||||||
shutdown_rx: watch::Receiver <bool>,
|
shutdown_rx: watch::Receiver <bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl P4EndServer {
|
impl P4EndServer {
|
||||||
pub fn connect (conf: Config) -> anyhow::Result <Self> {
|
pub (crate) async fn connect (conf: Config) -> anyhow::Result <(Self, watch::Sender <bool>)> {
|
||||||
trace! ("P4 end server making its QUIC endpoint");
|
debug! ("P4 end server making its QUIC endpoint");
|
||||||
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
|
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
|
||||||
|
|
||||||
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
let conf = if conf.use_udp_over_tcp {
|
||||||
|
let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?;
|
||||||
|
udp_sock.connect ((Ipv4Addr::LOCALHOST, endpoint.local_addr ()?.port ())).await?;
|
||||||
|
|
||||||
Ok (P4EndServer {
|
let udp_local_server_port = udp_sock.local_addr ()?.port ();
|
||||||
conf: Arc::new (conf),
|
|
||||||
endpoint,
|
let tcp_sock = TcpSocket::new_v4 ()?;
|
||||||
shutdown_tx,
|
let tcp_conn = tcp_sock.connect (conf.relay_addr).await?;
|
||||||
shutdown_rx,
|
|
||||||
})
|
tokio::spawn (async move {
|
||||||
|
udp_over_tcp::client::main_with_sockets (udp_sock, tcp_conn).await
|
||||||
|
});
|
||||||
|
|
||||||
|
Config {
|
||||||
|
debug_echo: conf.debug_echo,
|
||||||
|
id: conf.id,
|
||||||
|
relay_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, udp_local_server_port)),
|
||||||
|
relay_cert: conf.relay_cert,
|
||||||
|
use_udp_over_tcp: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn config (&self) -> &Config {
|
|
||||||
&*self.conf
|
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
conf
|
||||||
|
};
|
||||||
|
|
||||||
pub async fn run (&self) -> anyhow::Result <()> {
|
debug! ("P4 end server connecting to P3 relay server");
|
||||||
trace! ("P4 end server connecting to P3 relay server");
|
let conn = protocol::p4_connect_to_p3 (
|
||||||
let quinn::NewConnection {
|
&endpoint,
|
||||||
mut bi_streams,
|
conf.relay_addr,
|
||||||
..
|
&conf.id
|
||||||
} = protocol::p4_connect_to_p3 (
|
|
||||||
&self.endpoint,
|
|
||||||
self.conf.relay_addr,
|
|
||||||
&self.conf.id
|
|
||||||
).await?;
|
).await?;
|
||||||
|
|
||||||
debug! ("Connected to relay server");
|
debug! ("Connected to relay server");
|
||||||
|
|
||||||
|
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
||||||
|
|
||||||
|
Ok ((P4EndServer {
|
||||||
|
conf,
|
||||||
|
conn,
|
||||||
|
endpoint,
|
||||||
|
shutdown_rx,
|
||||||
|
}, shutdown_tx))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub (crate) async fn run (self) -> anyhow::Result <()> {
|
||||||
trace! ("Accepting bi streams from P3");
|
trace! ("Accepting bi streams from P3");
|
||||||
|
|
||||||
let mut shutdown_rx = self.shutdown_rx.clone ();
|
let mut shutdown_rx = self.shutdown_rx.clone ();
|
||||||
|
|
||||||
|
let conf = Arc::new (self.conf);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = shutdown_rx.changed () => {
|
_ = shutdown_rx.changed () => {
|
||||||
|
@ -138,10 +162,10 @@ impl P4EndServer {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stream_opt = bi_streams.next () => {
|
stream_opt = self.conn.accept_bi () => {
|
||||||
let (relay_send, relay_recv) = stream_opt.ok_or_else (|| anyhow::anyhow! ("P4 ran out of incoming streams. Maybe P3 shut down or disconnected?"))??;
|
let (relay_send, relay_recv) = stream_opt?;
|
||||||
|
|
||||||
tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv));
|
tokio::spawn (handle_bi_stream (Arc::clone (&conf), relay_send, relay_recv));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -149,11 +173,6 @@ impl P4EndServer {
|
||||||
Ok (())
|
Ok (())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn shut_down (&self) -> anyhow::Result <()> {
|
|
||||||
trace! ("P4 end server shutting down...");
|
|
||||||
Ok (self.shutdown_tx.send (true)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn shutting_down (&self) -> bool {
|
pub fn shutting_down (&self) -> bool {
|
||||||
*self.shutdown_rx.borrow ()
|
*self.shutdown_rx.borrow ()
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,19 +20,85 @@ use protocol::PeerId;
|
||||||
#[derive (Debug, StructOpt)]
|
#[derive (Debug, StructOpt)]
|
||||||
pub struct Opt {
|
pub struct Opt {
|
||||||
#[structopt (long)]
|
#[structopt (long)]
|
||||||
listen_addr: Option <String>,
|
pub (crate) listen_addr: Option <String>,
|
||||||
|
#[structopt (long)]
|
||||||
|
pub (crate) tcp_listen_port: Option <u16>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn main (opt: Opt) -> anyhow::Result <()>
|
pub struct App {
|
||||||
{
|
endpoint: quinn::Endpoint,
|
||||||
|
listen_addr: SocketAddr,
|
||||||
|
pub (crate) metrics: Arc <RwLock <Metrics>>,
|
||||||
|
server_cert: Vec <u8>,
|
||||||
|
tcp_listener: Option <udp_over_tcp::server::Listener>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive (Default)]
|
||||||
|
pub (crate) struct Metrics {
|
||||||
|
pub (crate) connected_end_servers: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl App {
|
||||||
|
pub async fn new (opt: Opt) -> anyhow::Result <Self> {
|
||||||
|
let config = load_config ().await.ok ();
|
||||||
|
|
||||||
let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?;
|
let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?;
|
||||||
let (mut incoming, server_cert) = make_server_endpoint (listen_addr)?;
|
let (endpoint, server_cert) = make_server_endpoint (listen_addr)?;
|
||||||
println! ("Base64 cert: {}", base64::encode (&server_cert));
|
|
||||||
|
|
||||||
tokio::fs::create_dir_all ("ptth_quic_output").await?;
|
let listen_addr = endpoint.local_addr ()?;
|
||||||
tokio::fs::write ("ptth_quic_output/quic_server.crt", &server_cert).await?;
|
let tcp_port = opt.tcp_listen_port.or (config.map (|cfg| cfg.tcp_listen_port).flatten ());
|
||||||
|
|
||||||
let relay_state = Arc::new (RelayState::default ());
|
let tcp_listener = if let Some (tcp_port) = tcp_port {
|
||||||
|
let cfg = udp_over_tcp::server::Config {
|
||||||
|
tcp_port,
|
||||||
|
udp_port: listen_addr.port (),
|
||||||
|
};
|
||||||
|
|
||||||
|
Some (udp_over_tcp::server::Listener::new (cfg).await?)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok (Self {
|
||||||
|
endpoint,
|
||||||
|
listen_addr,
|
||||||
|
metrics: Default::default (),
|
||||||
|
server_cert,
|
||||||
|
tcp_listener,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn listen_addr (&self) -> SocketAddr {
|
||||||
|
self.listen_addr
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn server_cert (&self) -> &[u8] {
|
||||||
|
&self.server_cert
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tcp_listen_port (&self) -> anyhow::Result <Option <u16>> {
|
||||||
|
match self.tcp_listener.as_ref () {
|
||||||
|
None => Ok (None),
|
||||||
|
Some (tcp_listener) => Ok (tcp_listener.tcp_port ()?.into ()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run (self) -> anyhow::Result <()> {
|
||||||
|
let Self {
|
||||||
|
endpoint,
|
||||||
|
listen_addr,
|
||||||
|
metrics,
|
||||||
|
server_cert,
|
||||||
|
tcp_listener,
|
||||||
|
} = self;
|
||||||
|
|
||||||
|
let mut relay_state = RelayState::default ();
|
||||||
|
relay_state.metrics = metrics;
|
||||||
|
if let Err (e) = relay_state.reload_config ().await {
|
||||||
|
error! ("{:?}", e);
|
||||||
|
}
|
||||||
|
let relay_state = Arc::new (relay_state);
|
||||||
|
|
||||||
let make_svc = {
|
let make_svc = {
|
||||||
let relay_state = Arc::clone (&relay_state);
|
let relay_state = Arc::clone (&relay_state);
|
||||||
|
@ -52,18 +118,30 @@ pub async fn main (opt: Opt) -> anyhow::Result <()>
|
||||||
let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004));
|
let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004));
|
||||||
let http_server = Server::bind (&http_addr);
|
let http_server = Server::bind (&http_addr);
|
||||||
|
|
||||||
let tcp_port = 30382;
|
let _task_reload_config = {
|
||||||
let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?;
|
let relay_state = Arc::clone (&relay_state);
|
||||||
|
tokio::spawn (async move {
|
||||||
|
let mut interval = tokio::time::interval (std::time::Duration::from_secs (60));
|
||||||
|
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
interval.tick ().await;
|
||||||
|
|
||||||
|
relay_state.reload_config ().await.ok ();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
let task_quic_server = {
|
let task_quic_server = {
|
||||||
let relay_state = Arc::clone (&relay_state);
|
let relay_state = Arc::clone (&relay_state);
|
||||||
tokio::spawn (async move {
|
tokio::spawn (async move {
|
||||||
while let Some (conn) = incoming.next ().await {
|
while let Some (conn) = endpoint.accept ().await {
|
||||||
let relay_state = Arc::clone (&relay_state);
|
let relay_state = Arc::clone (&relay_state);
|
||||||
|
|
||||||
// Each new peer QUIC connection gets its own task
|
// Each new peer QUIC connection gets its own task
|
||||||
tokio::spawn (async move {
|
tokio::spawn (async move {
|
||||||
let active = relay_state.stats.quic.connect ();
|
let active = relay_state.stats.quic.connect ();
|
||||||
|
|
||||||
debug! ("QUIC connections: {}", active);
|
debug! ("QUIC connections: {}", active);
|
||||||
|
|
||||||
match handle_quic_connection (Arc::clone (&relay_state), conn).await {
|
match handle_quic_connection (Arc::clone (&relay_state), conn).await {
|
||||||
|
@ -114,49 +192,47 @@ pub async fn main (opt: Opt) -> anyhow::Result <()>
|
||||||
Ok::<_, anyhow::Error> (())
|
Ok::<_, anyhow::Error> (())
|
||||||
});
|
});
|
||||||
|
|
||||||
let task_tcp_server = {
|
debug! ("Serving HTTP on {:?}", http_addr);
|
||||||
let relay_state = Arc::clone (&relay_state);
|
|
||||||
|
if let Some (tcp_listener) = tcp_listener {
|
||||||
tokio::spawn (async move {
|
tokio::spawn (async move {
|
||||||
loop {
|
if let Err (e) = tcp_listener.run ().await {
|
||||||
let (tcp_socket, _) = tcp_listener.accept ().await?;
|
eprintln! ("udp_over_tcp::server::main exited with err {:?}", e);
|
||||||
|
|
||||||
let relay_state = Arc::clone (&relay_state);
|
|
||||||
tokio::spawn (async move {
|
|
||||||
let (_client_recv, _client_send) = tcp_socket.into_split ();
|
|
||||||
|
|
||||||
debug! ("Accepted direct TCP connection P1 --> P3");
|
|
||||||
|
|
||||||
let p4_server_proxies = relay_state.p4_server_proxies.lock ().await;
|
|
||||||
let _p4 = match p4_server_proxies.get ("bogus_server") {
|
|
||||||
Some (x) => x,
|
|
||||||
None => bail! ("That server isn't connected"),
|
|
||||||
};
|
|
||||||
|
|
||||||
// unimplemented! ();
|
|
||||||
/*
|
|
||||||
p4.req_channel.send (RequestP2ToP4 {
|
|
||||||
client_send,
|
|
||||||
client_recv,
|
|
||||||
client_id: "bogus_client".to_string (),
|
|
||||||
}).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?;
|
|
||||||
*/
|
|
||||||
Ok (())
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok::<_, anyhow::Error> (())
|
Ok::<_, anyhow::Error> (())
|
||||||
})
|
});
|
||||||
};
|
}
|
||||||
|
|
||||||
debug! ("Serving HTTP on {:?}", http_addr);
|
{
|
||||||
|
let config = relay_state.config.load ();
|
||||||
|
dbg! (&config.webhook_url);
|
||||||
|
if let Some (webhook_url) = config.webhook_url.clone () {
|
||||||
|
let j = json! ({
|
||||||
|
"text": "Booting up",
|
||||||
|
}).to_string ();
|
||||||
|
let http_client = relay_state.http_client.clone ();
|
||||||
|
tokio::spawn (async move {
|
||||||
|
http_client.post (webhook_url).body (j).send ().await
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
task_quic_server.await??;
|
tokio::select! {
|
||||||
task_http_server.await??;
|
_val = task_quic_server => {
|
||||||
task_tcp_server.await??;
|
eprintln! ("QUIC relay server exited, exiting");
|
||||||
task_direc_server.await??;
|
},
|
||||||
|
_val = task_http_server => {
|
||||||
|
eprintln! ("HTTP server exited, exiting");
|
||||||
|
},
|
||||||
|
_val = task_direc_server => {
|
||||||
|
eprintln! ("PTTH_DIREC server exited, exiting");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
Ok (())
|
Ok (())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
|
async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
|
||||||
-> anyhow::Result <Response <Body>>
|
-> anyhow::Result <Response <Body>>
|
||||||
|
@ -178,9 +254,36 @@ async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
|
||||||
|
|
||||||
#[derive (Default)]
|
#[derive (Default)]
|
||||||
struct RelayState {
|
struct RelayState {
|
||||||
|
config: arc_swap::ArcSwap <Config>,
|
||||||
p4_server_proxies: Mutex <HashMap <PeerId, P4State>>,
|
p4_server_proxies: Mutex <HashMap <PeerId, P4State>>,
|
||||||
direc_cookies: Mutex <HashMap <Vec <u8>, DirecState>>,
|
direc_cookies: Mutex <HashMap <Vec <u8>, DirecState>>,
|
||||||
|
metrics: Arc <RwLock <Metrics>>,
|
||||||
stats: Stats,
|
stats: Stats,
|
||||||
|
http_client: reqwest::Client,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive (Default)]
|
||||||
|
struct Config {
|
||||||
|
ip_nicknames: BTreeMap <[u8; 4], String>,
|
||||||
|
tcp_listen_port: Option <u16>,
|
||||||
|
webhook_url: Option <String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From <ConfigFile> for Config {
|
||||||
|
fn from (x: ConfigFile) -> Self {
|
||||||
|
Self {
|
||||||
|
ip_nicknames: x.ip_nicknames.into_iter ().collect (),
|
||||||
|
tcp_listen_port: x.tcp_listen_port,
|
||||||
|
webhook_url: x.webhook_url,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive (Deserialize)]
|
||||||
|
struct ConfigFile {
|
||||||
|
ip_nicknames: Vec <([u8; 4], String)>,
|
||||||
|
tcp_listen_port: Option <u16>,
|
||||||
|
webhook_url: Option <String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DirecState {
|
struct DirecState {
|
||||||
|
@ -222,11 +325,24 @@ impl ConnectEvents {
|
||||||
|
|
||||||
struct P4State {
|
struct P4State {
|
||||||
req_channel: mpsc::Sender <RequestP2ToP4>,
|
req_channel: mpsc::Sender <RequestP2ToP4>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_config () -> anyhow::Result <ConfigFile>
|
||||||
|
{
|
||||||
|
let s = tokio::fs::read_to_string ("config/ptth_quic_relay_server.json").await?;
|
||||||
|
let cfg: ConfigFile = serde_json::from_str (&s)?;
|
||||||
|
Ok (cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RelayState {
|
impl RelayState {
|
||||||
|
async fn reload_config (&self) -> anyhow::Result <()> {
|
||||||
|
let config = load_config ().await?;
|
||||||
|
let config = Arc::new (Config::from (config));
|
||||||
|
|
||||||
|
self.config.store (config);
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct RequestP2ToP4 {
|
struct RequestP2ToP4 {
|
||||||
|
@ -301,30 +417,69 @@ async fn handle_quic_connection (
|
||||||
conn: quinn::Connecting,
|
conn: quinn::Connecting,
|
||||||
) -> anyhow::Result <()>
|
) -> anyhow::Result <()>
|
||||||
{
|
{
|
||||||
let mut conn = conn.await?;
|
let id = Ulid::generate ();
|
||||||
|
|
||||||
|
let config = relay_state.config.load ();
|
||||||
|
|
||||||
|
let remote_addr = conn.remote_address ();
|
||||||
|
let ip_nickname = match remote_addr {
|
||||||
|
SocketAddr::V4 (x) => {
|
||||||
|
let ip = x.ip ().octets ();
|
||||||
|
|
||||||
|
match config.ip_nicknames.get (&ip) {
|
||||||
|
Some (nick) => nick.as_str (),
|
||||||
|
_ => "Unknown",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => "Unknown, not IPv4",
|
||||||
|
};
|
||||||
|
|
||||||
|
debug! ("EHG7NVUD Incoming QUIC connection {} from {:?} ({})", id, remote_addr, ip_nickname);
|
||||||
|
|
||||||
|
if let Some (webhook_url) = config.webhook_url.clone () {
|
||||||
|
let j = json! ({
|
||||||
|
"text": format! ("Incoming QUIC connection from {:?} ({})", remote_addr, ip_nickname),
|
||||||
|
}).to_string ();
|
||||||
|
let http_client = relay_state.http_client.clone ();
|
||||||
|
tokio::spawn (async move {
|
||||||
|
http_client.post (webhook_url).body (j).send ().await
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let conn = conn.await?;
|
||||||
|
|
||||||
// Everyone who connects must identify themselves with the first
|
// Everyone who connects must identify themselves with the first
|
||||||
// bi stream
|
// bi stream
|
||||||
// TODO: Timeout
|
// TODO: Timeout
|
||||||
|
|
||||||
let (mut send, mut recv) = conn.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("QUIC client didn't identify itself"))??;
|
let (mut send, mut recv) = conn.accept_bi ().await?;
|
||||||
|
|
||||||
let peer = protocol::p3_accept_peer (&mut recv).await?;
|
let peer = protocol::p3_accept_peer (&mut recv).await?;
|
||||||
|
|
||||||
match peer {
|
match peer {
|
||||||
protocol::P3Peer::P2ClientProxy (peer) => {
|
protocol::P3Peer::P2ClientProxy (peer) => {
|
||||||
trace! ("Accepting connection from P2 client");
|
trace! ("H36JTVE5 Handling connection {} as P2 client", id);
|
||||||
// TODO: Check authorization for P2 peers
|
// TODO: Check authorization for P2 peers
|
||||||
|
|
||||||
protocol::p3_authorize_p2_peer (&mut send).await?;
|
protocol::p3_authorize_p2_peer (&mut send).await?;
|
||||||
handle_p2_connection (relay_state, conn, peer).await?;
|
handle_p2_connection (relay_state, conn, peer).await?;
|
||||||
},
|
},
|
||||||
protocol::P3Peer::P4ServerProxy (peer) => {
|
protocol::P3Peer::P4ServerProxy (peer) => {
|
||||||
trace! ("Accepting connection from P4 end server");
|
trace! ("LRHUKB7K Handling connection {} as P4 end server", id);
|
||||||
// TODO: Check authorization for P4 peers
|
// TODO: Check authorization for P4 peers
|
||||||
|
|
||||||
protocol::p3_authorize_p4_peer (&mut send).await?;
|
protocol::p3_authorize_p4_peer (&mut send).await?;
|
||||||
|
let metrics = Arc::clone (&relay_state.metrics);
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut m = metrics.write ().await;
|
||||||
|
m.connected_end_servers += 1;
|
||||||
|
}
|
||||||
handle_p4_connection (relay_state, conn, peer).await?;
|
handle_p4_connection (relay_state, conn, peer).await?;
|
||||||
|
{
|
||||||
|
let mut m = metrics.write ().await;
|
||||||
|
m.connected_end_servers -= 1;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -333,19 +488,13 @@ async fn handle_quic_connection (
|
||||||
|
|
||||||
async fn handle_p2_connection (
|
async fn handle_p2_connection (
|
||||||
relay_state: Arc <RelayState>,
|
relay_state: Arc <RelayState>,
|
||||||
conn: quinn::NewConnection,
|
conn: quinn::Connection,
|
||||||
peer: protocol::P2ClientProxy,
|
peer: protocol::P2ClientProxy,
|
||||||
) -> anyhow::Result <()>
|
) -> anyhow::Result <()>
|
||||||
{
|
{
|
||||||
let client_id = peer.id;
|
let client_id = peer.id;
|
||||||
|
|
||||||
let quinn::NewConnection {
|
while let Ok ((send, mut recv)) = conn.accept_bi ().await {
|
||||||
mut bi_streams,
|
|
||||||
..
|
|
||||||
} = conn;
|
|
||||||
|
|
||||||
while let Some (bi_stream) = bi_streams.next ().await {
|
|
||||||
let (send, mut recv) = bi_stream?;
|
|
||||||
let relay_state = Arc::clone (&relay_state);
|
let relay_state = Arc::clone (&relay_state);
|
||||||
let client_id = client_id.clone ();
|
let client_id = client_id.clone ();
|
||||||
|
|
||||||
|
@ -457,15 +606,11 @@ async fn handle_direc_p2_to_p4 (
|
||||||
|
|
||||||
async fn handle_p4_connection (
|
async fn handle_p4_connection (
|
||||||
relay_state: Arc <RelayState>,
|
relay_state: Arc <RelayState>,
|
||||||
conn: quinn::NewConnection,
|
connection: quinn::Connection,
|
||||||
peer: protocol::P4ServerProxy,
|
peer: protocol::P4ServerProxy,
|
||||||
) -> anyhow::Result <()>
|
) -> anyhow::Result <()>
|
||||||
{
|
{
|
||||||
let server_id = peer.id;
|
let server_id = peer.id;
|
||||||
let quinn::NewConnection {
|
|
||||||
connection,
|
|
||||||
..
|
|
||||||
} = conn;
|
|
||||||
let (tx, mut rx) = mpsc::channel (2);
|
let (tx, mut rx) = mpsc::channel (2);
|
||||||
|
|
||||||
let p4_state = P4State {
|
let p4_state = P4State {
|
||||||
|
|
|
@ -1,7 +1,11 @@
|
||||||
pub mod client_proxy;
|
pub mod client_proxy;
|
||||||
pub mod connection;
|
pub mod connection;
|
||||||
|
pub mod crypto;
|
||||||
pub mod executable_end_server;
|
pub mod executable_end_server;
|
||||||
pub mod executable_relay_server;
|
pub mod executable_relay_server;
|
||||||
pub mod prelude;
|
pub mod prelude;
|
||||||
pub mod protocol;
|
pub mod protocol;
|
||||||
pub mod quinn_utils;
|
pub mod quinn_utils;
|
||||||
|
|
||||||
|
#[cfg (test)]
|
||||||
|
mod tests;
|
||||||
|
|
|
@ -2,7 +2,11 @@ pub use std::{
|
||||||
collections::*,
|
collections::*,
|
||||||
ffi::OsString,
|
ffi::OsString,
|
||||||
iter::FromIterator,
|
iter::FromIterator,
|
||||||
net::SocketAddr,
|
net::{
|
||||||
|
Ipv4Addr,
|
||||||
|
SocketAddr,
|
||||||
|
SocketAddrV4,
|
||||||
|
},
|
||||||
sync::{
|
sync::{
|
||||||
Arc,
|
Arc,
|
||||||
atomic::{
|
atomic::{
|
||||||
|
@ -26,9 +30,14 @@ pub use tokio::{
|
||||||
AsyncReadExt,
|
AsyncReadExt,
|
||||||
AsyncWriteExt,
|
AsyncWriteExt,
|
||||||
},
|
},
|
||||||
net::TcpListener,
|
net::{
|
||||||
|
TcpListener,
|
||||||
|
TcpSocket,
|
||||||
|
UdpSocket,
|
||||||
|
},
|
||||||
sync::{
|
sync::{
|
||||||
Mutex,
|
Mutex,
|
||||||
|
RwLock,
|
||||||
mpsc,
|
mpsc,
|
||||||
},
|
},
|
||||||
task::JoinHandle,
|
task::JoinHandle,
|
||||||
|
@ -37,6 +46,9 @@ pub use rand::{
|
||||||
Rng,
|
Rng,
|
||||||
RngCore,
|
RngCore,
|
||||||
};
|
};
|
||||||
|
pub use rusty_ulid::Ulid;
|
||||||
|
pub use serde::Deserialize;
|
||||||
|
pub use serde_json::json;
|
||||||
pub use tracing::{
|
pub use tracing::{
|
||||||
debug,
|
debug,
|
||||||
error,
|
error,
|
||||||
|
|
|
@ -33,14 +33,14 @@ pub async fn p2_connect_to_p3 (
|
||||||
endpoint: &quinn::Endpoint,
|
endpoint: &quinn::Endpoint,
|
||||||
relay_addr: std::net::SocketAddr,
|
relay_addr: std::net::SocketAddr,
|
||||||
client_id: &str,
|
client_id: &str,
|
||||||
) -> Result <quinn::NewConnection>
|
) -> Result <quinn::Connection>
|
||||||
{
|
{
|
||||||
if client_id.as_bytes ().len () > MAX_ID_LENGTH {
|
if client_id.as_bytes ().len () > MAX_ID_LENGTH {
|
||||||
bail! ("Client ID is longer than MAX_ID_LENGTH");
|
bail! ("Client ID is longer than MAX_ID_LENGTH");
|
||||||
}
|
}
|
||||||
|
|
||||||
let new_conn = endpoint.connect (relay_addr, "localhost")?.await?;
|
let new_conn = endpoint.connect (relay_addr, "localhost")?.await?;
|
||||||
let (mut send, mut recv) = new_conn.connection.open_bi ().await?;
|
let (mut send, mut recv) = new_conn.open_bi ().await?;
|
||||||
let cmd_type = Command::CONNECT_P2_TO_P3.0;
|
let cmd_type = Command::CONNECT_P2_TO_P3.0;
|
||||||
|
|
||||||
send.write_all (&[cmd_type, 0, 0, 0]).await?;
|
send.write_all (&[cmd_type, 0, 0, 0]).await?;
|
||||||
|
@ -251,21 +251,21 @@ pub async fn p4_connect_to_p3 (
|
||||||
endpoint: &quinn::Endpoint,
|
endpoint: &quinn::Endpoint,
|
||||||
relay_addr: std::net::SocketAddr,
|
relay_addr: std::net::SocketAddr,
|
||||||
server_id: &str,
|
server_id: &str,
|
||||||
) -> Result <quinn::NewConnection>
|
) -> Result <quinn::Connection>
|
||||||
{
|
{
|
||||||
if server_id.as_bytes ().len () > MAX_ID_LENGTH {
|
if server_id.as_bytes ().len () > MAX_ID_LENGTH {
|
||||||
bail! ("Server ID is longer than MAX_ID_LENGTH");
|
bail! ("Server ID is longer than MAX_ID_LENGTH");
|
||||||
}
|
}
|
||||||
|
|
||||||
let new_conn = endpoint.connect (relay_addr, "localhost")?.await?;
|
let new_conn = endpoint.connect (relay_addr, "localhost")?.await.context ("UXTDVL2V quinn::Endpoint::connect")?;
|
||||||
let (mut send, mut recv) = new_conn.connection.open_bi ().await?;
|
let (mut send, mut recv) = new_conn.open_bi ().await?;
|
||||||
let cmd_type = Command::CONNECT_P4_TO_P3.0;
|
let cmd_type = Command::CONNECT_P4_TO_P3.0;
|
||||||
|
|
||||||
send.write_all (&[cmd_type, 0, 0, 0]).await?;
|
send.write_all (&[cmd_type, 0, 0, 0]).await?;
|
||||||
send_lv_string (&mut send, server_id).await?;
|
send_lv_string (&mut send, server_id).await?;
|
||||||
|
|
||||||
expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await
|
expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await
|
||||||
.context ("P4 didn't get OK response when connecting to P3")?;
|
.context ("WMGW2RXU P4 didn't get OK response when connecting to P3")?;
|
||||||
|
|
||||||
Ok (new_conn)
|
Ok (new_conn)
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use quinn::{
|
use quinn::{
|
||||||
ClientConfig, Endpoint, Incoming,
|
ClientConfig, Endpoint,
|
||||||
ServerConfig, TransportConfig,
|
ServerConfig, TransportConfig,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ pub fn make_client_endpoint(
|
||||||
let mut transport = quinn::TransportConfig::default ();
|
let mut transport = quinn::TransportConfig::default ();
|
||||||
transport.keep_alive_interval (Some (Duration::from_millis (5_000)));
|
transport.keep_alive_interval (Some (Duration::from_millis (5_000)));
|
||||||
|
|
||||||
client_cfg.transport = Arc::new (transport);
|
client_cfg.transport_config (Arc::new (transport));
|
||||||
|
|
||||||
let mut endpoint = Endpoint::client (bind_addr)?;
|
let mut endpoint = Endpoint::client (bind_addr)?;
|
||||||
endpoint.set_default_client_config (client_cfg);
|
endpoint.set_default_client_config (client_cfg);
|
||||||
|
@ -41,10 +41,10 @@ pub fn make_client_endpoint(
|
||||||
/// - a stream of incoming QUIC connections
|
/// - a stream of incoming QUIC connections
|
||||||
/// - server certificate serialized into DER format
|
/// - server certificate serialized into DER format
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Incoming, Vec<u8>)> {
|
pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Endpoint, Vec<u8>)> {
|
||||||
let (server_config, server_cert) = configure_server()?;
|
let (server_config, server_cert) = configure_server()?;
|
||||||
let (_endpoint, incoming) = Endpoint::server (server_config, bind_addr)?;
|
let endpoint = Endpoint::server (server_config, bind_addr)?;
|
||||||
Ok((incoming, server_cert))
|
Ok((endpoint, server_cert))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Builds default quinn client config and trusts given certificates.
|
/// Builds default quinn client config and trusts given certificates.
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
use crate::prelude::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn end_to_end () -> anyhow::Result <()> {
|
||||||
|
let rt = tokio::runtime::Runtime::new ()?;
|
||||||
|
rt.block_on (end_to_end_async ())?;
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn end_to_end_async () -> anyhow::Result <()> {
|
||||||
|
use crate::executable_end_server as server;
|
||||||
|
use crate::executable_relay_server as relay;
|
||||||
|
|
||||||
|
let relay_opt = relay::Opt {
|
||||||
|
listen_addr: "127.0.0.1:0".to_string ().into (),
|
||||||
|
tcp_listen_port: Some (0),
|
||||||
|
};
|
||||||
|
let relay_app = relay::App::new (relay_opt).await?;
|
||||||
|
|
||||||
|
let relay_quic_port = relay_app.listen_addr ().port ();
|
||||||
|
let relay_cert = Vec::from (relay_app.server_cert ());
|
||||||
|
let relay_metrics = Arc::clone (&relay_app.metrics);
|
||||||
|
let tcp_listen_port = relay_app.tcp_listen_port ()?.unwrap ();
|
||||||
|
|
||||||
|
assert_ne! (tcp_listen_port, 0);
|
||||||
|
|
||||||
|
let task_relay = tokio::spawn (async move {
|
||||||
|
relay_app.run ().await
|
||||||
|
});
|
||||||
|
|
||||||
|
{
|
||||||
|
let m = relay_metrics.read ().await;
|
||||||
|
assert_eq! (m.connected_end_servers, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect with wrong port, should fail
|
||||||
|
|
||||||
|
let server_conf = server::Config {
|
||||||
|
debug_echo: false,
|
||||||
|
id: "bogus".into (),
|
||||||
|
relay_addr: "127.0.0.1:80".parse ()?,
|
||||||
|
relay_cert: relay_cert.clone (),
|
||||||
|
use_udp_over_tcp: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
let server_err = server::P4EndServer::connect (server_conf).await;
|
||||||
|
|
||||||
|
assert! (server_err.is_err ());
|
||||||
|
|
||||||
|
// Connect with wrong cert, should fail
|
||||||
|
|
||||||
|
let server_conf = server::Config {
|
||||||
|
debug_echo: false,
|
||||||
|
id: "bogus".into (),
|
||||||
|
relay_addr: ([127, 0, 0, 1], relay_quic_port).into (),
|
||||||
|
relay_cert: vec! [],
|
||||||
|
use_udp_over_tcp: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
let server_err = server::P4EndServer::connect (server_conf).await;
|
||||||
|
|
||||||
|
assert! (server_err.is_err ());
|
||||||
|
|
||||||
|
{
|
||||||
|
let m = relay_metrics.read ().await;
|
||||||
|
assert_eq! (m.connected_end_servers, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect over UDP
|
||||||
|
|
||||||
|
let server_conf = server::Config {
|
||||||
|
debug_echo: false,
|
||||||
|
id: "bogus_VZBNRUA5".into (),
|
||||||
|
relay_addr: ([127, 0, 0, 1], relay_quic_port).into (),
|
||||||
|
relay_cert: relay_cert.clone (),
|
||||||
|
use_udp_over_tcp: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
let t = Instant::now ();
|
||||||
|
let (server, _) = server::P4EndServer::connect (server_conf).await?;
|
||||||
|
let dur = t.elapsed ();
|
||||||
|
assert! (dur < Duration::from_millis (1_000), "{:?}", dur);
|
||||||
|
|
||||||
|
{
|
||||||
|
let m = relay_metrics.read ().await;
|
||||||
|
assert_eq! (m.connected_end_servers, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect over TCP
|
||||||
|
|
||||||
|
let server_conf = server::Config {
|
||||||
|
debug_echo: false,
|
||||||
|
id: "bogus_6E5CZIAI".into (),
|
||||||
|
relay_addr: ([127, 0, 0, 1], tcp_listen_port).into (),
|
||||||
|
relay_cert: relay_cert.clone (),
|
||||||
|
use_udp_over_tcp: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
let t = Instant::now ();
|
||||||
|
let (server, _) = server::P4EndServer::connect (server_conf).await?;
|
||||||
|
let dur = t.elapsed ();
|
||||||
|
assert! (dur < Duration::from_millis (1_000), "{:?}", dur);
|
||||||
|
|
||||||
|
{
|
||||||
|
let m = relay_metrics.read ().await;
|
||||||
|
assert_eq! (m.connected_end_servers, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
|
@ -12,15 +12,19 @@ repository = "https://six-five-six-four.com/git/reactor/ptth"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.38"
|
anyhow = "1.0.38"
|
||||||
blake3 = "1.0.0"
|
blake3 = "1.0.0"
|
||||||
fltk = "1.2.8"
|
fltk = "1.3.24"
|
||||||
ptth_quic = { path = "../ptth_quic" }
|
ptth_quic = { path = "../ptth_quic" }
|
||||||
quinn = "0.8.5"
|
quinn = "0.9.3"
|
||||||
rand = "0.8.4"
|
rand = "0.8.4"
|
||||||
rand_chacha = "0.3.1"
|
rand_chacha = "0.3.1"
|
||||||
reqwest = "0.11.4"
|
|
||||||
rmp-serde = "0.15.5"
|
rmp-serde = "0.15.5"
|
||||||
serde = "1.0.130"
|
serde = "1.0.130"
|
||||||
structopt = "0.3.20"
|
structopt = "0.3.20"
|
||||||
tokio = { version = "1.8.1", features = ["full"] }
|
tokio = { version = "1.8.1", features = ["full"] }
|
||||||
tracing-subscriber = "0.2.16"
|
tracing-subscriber = "0.2.16"
|
||||||
tracing = "0.1.25"
|
tracing = "0.1.25"
|
||||||
|
|
||||||
|
[dependencies.reqwest]
|
||||||
|
version = "0.11.4"
|
||||||
|
default-features = false
|
||||||
|
features = ["stream", "rustls-tls", "hyper-rustls"]
|
||||||
|
|
|
@ -235,10 +235,7 @@ fn main () -> anyhow::Result <()> {
|
||||||
|
|
||||||
let client_id = opt.client_id.unwrap_or_else (|| "bogus_client".to_string ());
|
let client_id = opt.client_id.unwrap_or_else (|| "bogus_client".to_string ());
|
||||||
|
|
||||||
let quinn::NewConnection {
|
let connection = protocol::p2_connect_to_p3 (&endpoint, relay_addr, &client_id).await
|
||||||
connection,
|
|
||||||
..
|
|
||||||
} = protocol::p2_connect_to_p3 (&endpoint, relay_addr, &client_id).await
|
|
||||||
.context ("P2 can't connect to P3")?;
|
.context ("P2 can't connect to P3")?;
|
||||||
|
|
||||||
Ok::<_, anyhow::Error> (connection)
|
Ok::<_, anyhow::Error> (connection)
|
||||||
|
|
|
@ -11,30 +11,35 @@ repository = "https://six-five-six-four.com/git/reactor/ptth"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
||||||
anyhow = "1.0.38"
|
anyhow = "1.0.66"
|
||||||
base64 = "0.13.0"
|
base64 = "0.13.0"
|
||||||
blake3 = "1.0.0"
|
blake3 = "1.0.0"
|
||||||
chrono = { version = "0.4.19", features = ["serde"] }
|
chrono = { version = "0.4.23", features = ["serde"] }
|
||||||
clap = "2.33.3"
|
clap = "2.33.3"
|
||||||
dashmap = "4.0.2"
|
dashmap = "4.0.2"
|
||||||
futures = "0.3.7"
|
futures = "0.3.7"
|
||||||
futures-util = "0.3.8"
|
futures-util = "0.3.8"
|
||||||
handlebars = "3.5.3"
|
handlebars = "3.5.3"
|
||||||
http = "0.2.3"
|
http = "0.2.3"
|
||||||
hyper = { version = "0.14.20", features = ["http1", "http2", "server", "stream", "tcp"] }
|
hyper = { version = "0.14.23", features = ["http1", "http2", "server", "stream", "tcp"] }
|
||||||
itertools = "0.9.0"
|
itertools = "0.9.0"
|
||||||
rand = "0.8.3"
|
rand = "0.8.5"
|
||||||
rmp-serde = "0.15.5"
|
rmp-serde = "0.15.5"
|
||||||
rusty_ulid = "0.10.1"
|
rusty_ulid = "1.0.0"
|
||||||
serde = { version = "1.0.117", features = ["derive"] }
|
serde = { version = "1.0.150", features = ["derive"] }
|
||||||
serde_json = "1.0.60"
|
serde_json = "1.0.89"
|
||||||
serde_urlencoded = "0.7.0"
|
serde_urlencoded = "0.7.1"
|
||||||
thiserror = "1.0.22"
|
thiserror = "1.0.37"
|
||||||
tokio = { version = "1.8.1", features = [] }
|
tokio = { version = "1.23.0", features = [] }
|
||||||
tokio-stream = "0.1.3"
|
tokio-stream = "0.1.11"
|
||||||
toml = "0.5.7"
|
toml = "0.5.10"
|
||||||
tracing = "0.1.25"
|
tracing = "0.1.37"
|
||||||
tracing-futures = "0.2.4"
|
tracing-futures = "0.2.4"
|
||||||
tracing-subscriber = "0.2.15"
|
tracing-subscriber = "0.2.15"
|
||||||
|
|
||||||
ptth_core = { path = "../ptth_core", version = "2.0.0" }
|
ptth_core = { path = "../ptth_core", version = "2.0.0" }
|
||||||
|
|
||||||
|
[dependencies.reqwest]
|
||||||
|
version = "0.11.13"
|
||||||
|
default-features = false
|
||||||
|
features = ["stream", "rustls-tls", "hyper-rustls"]
|
||||||
|
|
|
@ -144,6 +144,8 @@ pub mod file {
|
||||||
|
|
||||||
pub news_url: Option <String>,
|
pub news_url: Option <String>,
|
||||||
pub hide_audit_log: Option <bool>,
|
pub hide_audit_log: Option <bool>,
|
||||||
|
pub webhook_url: Option <String>,
|
||||||
|
pub webhook_interval_s: Option <u32>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,6 +160,8 @@ pub struct Config {
|
||||||
pub scraper_keys: HashMap <String, ScraperKey>,
|
pub scraper_keys: HashMap <String, ScraperKey>,
|
||||||
pub news_url: Option <String>,
|
pub news_url: Option <String>,
|
||||||
pub hide_audit_log: bool,
|
pub hide_audit_log: bool,
|
||||||
|
pub webhook_url: Option <String>,
|
||||||
|
pub webhook_interval_s: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
|
@ -170,6 +174,8 @@ impl Default for Config {
|
||||||
scraper_keys: Default::default (),
|
scraper_keys: Default::default (),
|
||||||
news_url: None,
|
news_url: None,
|
||||||
hide_audit_log: false,
|
hide_audit_log: false,
|
||||||
|
webhook_url: None,
|
||||||
|
webhook_interval_s: 7200,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -199,6 +205,8 @@ impl TryFrom <file::Config> for Config {
|
||||||
scraper_keys,
|
scraper_keys,
|
||||||
news_url: f.news_url,
|
news_url: f.news_url,
|
||||||
hide_audit_log: f.hide_audit_log.unwrap_or (false),
|
hide_audit_log: f.hide_audit_log.unwrap_or (false),
|
||||||
|
webhook_url: f.webhook_url,
|
||||||
|
webhook_interval_s: f.webhook_interval_s.unwrap_or (7200),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -589,6 +589,10 @@ async fn handle_all (
|
||||||
use routing::Route::*;
|
use routing::Route::*;
|
||||||
|
|
||||||
let state = &*state;
|
let state = &*state;
|
||||||
|
{
|
||||||
|
let mut counters = state.webhook_counters.write ().await;
|
||||||
|
counters.requests_total += 1;
|
||||||
|
}
|
||||||
|
|
||||||
// The path is cloned here, so it's okay to consume the request
|
// The path is cloned here, so it's okay to consume the request
|
||||||
// later.
|
// later.
|
||||||
|
@ -818,7 +822,7 @@ pub async fn run_relay (
|
||||||
|
|
||||||
let mut request_rendezvous = state_2.request_rendezvous.lock ().await;
|
let mut request_rendezvous = state_2.request_rendezvous.lock ().await;
|
||||||
request_rendezvous.iter_mut ()
|
request_rendezvous.iter_mut ()
|
||||||
.for_each (|(k, v)| {
|
.for_each (|(_k, v)| {
|
||||||
match v {
|
match v {
|
||||||
RequestRendezvous::ParkedServer (_) => (),
|
RequestRendezvous::ParkedServer (_) => (),
|
||||||
RequestRendezvous::ParkedClients (requests) => requests.retain (|req| req.id.as_str () >= timeout_ulid.as_str ()),
|
RequestRendezvous::ParkedClients (requests) => requests.retain (|req| req.id.as_str () >= timeout_ulid.as_str ()),
|
||||||
|
@ -878,6 +882,11 @@ pub async fn run_relay (
|
||||||
|
|
||||||
state.audit_log.push (AuditEvent::new (AuditData::RelayStart)).await;
|
state.audit_log.push (AuditEvent::new (AuditData::RelayStart)).await;
|
||||||
|
|
||||||
|
{
|
||||||
|
let state = state.clone ();
|
||||||
|
tokio::spawn (webhook_task (state));
|
||||||
|
}
|
||||||
|
|
||||||
trace! ("Serving relay on {:?}", addr);
|
trace! ("Serving relay on {:?}", addr);
|
||||||
|
|
||||||
server.with_graceful_shutdown (async {
|
server.with_graceful_shutdown (async {
|
||||||
|
@ -913,5 +922,64 @@ pub async fn run_relay (
|
||||||
Ok (())
|
Ok (())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn webhook_task (state: Arc <Relay>) {
|
||||||
|
use crate::relay_state::MonitoringCounters;
|
||||||
|
|
||||||
|
let client = reqwest::Client::default ();
|
||||||
|
|
||||||
|
let webhook_interval_s = {
|
||||||
|
let config = state.config.read ().await;
|
||||||
|
config.webhook_interval_s
|
||||||
|
};
|
||||||
|
dbg! (webhook_interval_s);
|
||||||
|
|
||||||
|
let mut interval = tokio::time::interval (std::time::Duration::from_secs (webhook_interval_s.into ()));
|
||||||
|
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
|
let mut tick_seq = 1;
|
||||||
|
let mut last_counters_reported = (MonitoringCounters::default (), Utc::now (), 0);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
interval.tick ().await;
|
||||||
|
|
||||||
|
let webhook_url = {
|
||||||
|
let config = state.config.read ().await;
|
||||||
|
config.webhook_url.clone ()
|
||||||
|
};
|
||||||
|
|
||||||
|
let webhook_url = match webhook_url {
|
||||||
|
Some (x) => x,
|
||||||
|
None => {
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let now = Utc::now ();
|
||||||
|
|
||||||
|
let counters = {
|
||||||
|
state.webhook_counters.read ().await.clone ()
|
||||||
|
};
|
||||||
|
|
||||||
|
let requests_total_diff = counters.requests_total - last_counters_reported.0.requests_total;
|
||||||
|
|
||||||
|
let j = serde_json::json! ({
|
||||||
|
"text": format! ("From tick {} to {}: Handled {} requests", last_counters_reported.2, tick_seq, requests_total_diff),
|
||||||
|
}).to_string ();
|
||||||
|
|
||||||
|
match client.post (webhook_url).body (j).send ().await {
|
||||||
|
Ok (resp) => {
|
||||||
|
if resp.status () == StatusCode::OK {
|
||||||
|
last_counters_reported = (counters, now, tick_seq);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
dbg! (resp.status ());
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err (e) => { dbg! (e); },
|
||||||
|
}
|
||||||
|
tick_seq += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg (test)]
|
#[cfg (test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
|
@ -101,6 +101,16 @@ pub struct Relay {
|
||||||
/// Memory backend for audit logging
|
/// Memory backend for audit logging
|
||||||
// TODO: Add file / database / network server logging backend
|
// TODO: Add file / database / network server logging backend
|
||||||
pub (crate) audit_log: BoundedVec <AuditEvent>,
|
pub (crate) audit_log: BoundedVec <AuditEvent>,
|
||||||
|
|
||||||
|
/// Counters for webhook reporting
|
||||||
|
pub (crate) webhook_counters: RwLock <MonitoringCounters>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive (Clone, Default)]
|
||||||
|
pub (crate) struct MonitoringCounters {
|
||||||
|
pub (crate) requests_total: u64,
|
||||||
|
pub (crate) requests_by_scraper_api: HashMap <String, u64>,
|
||||||
|
pub (crate) requests_by_email: HashMap <String, u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive (Clone)]
|
#[derive (Clone)]
|
||||||
|
@ -204,6 +214,7 @@ impl TryFrom <Config> for Relay {
|
||||||
shutdown_watch_rx,
|
shutdown_watch_rx,
|
||||||
unregistered_servers: BoundedVec::new (20),
|
unregistered_servers: BoundedVec::new (20),
|
||||||
audit_log: BoundedVec::new (256),
|
audit_log: BoundedVec::new (256),
|
||||||
|
webhook_counters: Default::default (),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -558,7 +558,7 @@ pub mod executable {
|
||||||
name: opt.name.or (config_file.name).ok_or (anyhow::anyhow! ("`name` must be provided in command line or config file"))?,
|
name: opt.name.or (config_file.name).ok_or (anyhow::anyhow! ("`name` must be provided in command line or config file"))?,
|
||||||
api_key: config_file.api_key,
|
api_key: config_file.api_key,
|
||||||
relay_url: opt.relay_url.or (config_file.relay_url).ok_or (anyhow::anyhow! ("`--relay-url` must be provided in command line or `relay_url` in config file"))?,
|
relay_url: opt.relay_url.or (config_file.relay_url).ok_or (anyhow::anyhow! ("`--relay-url` must be provided in command line or `relay_url` in config file"))?,
|
||||||
file_server_root: opt.file_server_root.or (config_file.file_server_root).unwrap_or_else (PathBuf::new),
|
file_server_root: opt.file_server_root.or (config_file.file_server_root).unwrap_or_else (|| PathBuf::from (".")),
|
||||||
file_server_roots,
|
file_server_roots,
|
||||||
throttle_upload: opt.throttle_upload,
|
throttle_upload: opt.throttle_upload,
|
||||||
allow_any_client: true,
|
allow_any_client: true,
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
[package]
|
||||||
|
name = "udp_over_tcp"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0.66"
|
||||||
|
tokio = { version = "1.23.0", features = ["full"] }
|
||||||
|
tracing = "0.1.37"
|
|
@ -0,0 +1,59 @@
|
||||||
|
use std::{
|
||||||
|
net::{
|
||||||
|
Ipv4Addr,
|
||||||
|
SocketAddr,
|
||||||
|
SocketAddrV4,
|
||||||
|
},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use tokio::{
|
||||||
|
net::{
|
||||||
|
TcpSocket,
|
||||||
|
TcpStream,
|
||||||
|
UdpSocket,
|
||||||
|
},
|
||||||
|
spawn,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::loops;
|
||||||
|
|
||||||
|
pub struct Config {
|
||||||
|
pub udp_eph_port: u16,
|
||||||
|
pub udp_local_server_port: u16,
|
||||||
|
pub tcp_server_addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn main (cfg: Config) -> anyhow::Result <()> {
|
||||||
|
let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, cfg.udp_local_server_port)).await?;
|
||||||
|
udp_sock.connect ((Ipv4Addr::LOCALHOST, cfg.udp_eph_port)).await?;
|
||||||
|
|
||||||
|
let tcp_sock = TcpSocket::new_v4 ()?;
|
||||||
|
let tcp_conn = tcp_sock.connect (cfg.tcp_server_addr).await?;
|
||||||
|
|
||||||
|
main_with_sockets (udp_sock, tcp_conn).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn main_with_sockets (udp_sock: UdpSocket, tcp_conn: TcpStream) -> anyhow::Result <()> {
|
||||||
|
let (tcp_read, tcp_write) = tcp_conn.into_split ();
|
||||||
|
|
||||||
|
let tx_task;
|
||||||
|
let rx_task;
|
||||||
|
|
||||||
|
{
|
||||||
|
let udp_sock = Arc::new (udp_sock);
|
||||||
|
rx_task = spawn (loops::rx (Arc::clone (&udp_sock), tcp_read));
|
||||||
|
tx_task = spawn (loops::tx (Arc::clone (&udp_sock), tcp_write));
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_val = tx_task => {
|
||||||
|
println! ("client_main: tx_task exited, exiting");
|
||||||
|
}
|
||||||
|
_val = rx_task => {
|
||||||
|
println! ("client_main: rx_task exited, exiting");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
|
@ -0,0 +1,4 @@
|
||||||
|
pub mod client;
|
||||||
|
pub mod server;
|
||||||
|
|
||||||
|
mod loops;
|
|
@ -0,0 +1,84 @@
|
||||||
|
use std::{
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::bail;
|
||||||
|
use tokio::{
|
||||||
|
io::{
|
||||||
|
AsyncReadExt,
|
||||||
|
AsyncWriteExt,
|
||||||
|
},
|
||||||
|
net::{
|
||||||
|
UdpSocket,
|
||||||
|
tcp,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
pub async fn rx (
|
||||||
|
udp_sock: Arc <UdpSocket>,
|
||||||
|
mut tcp_read: tcp::OwnedReadHalf,
|
||||||
|
) -> anyhow::Result <()> {
|
||||||
|
for i in 0u64.. {
|
||||||
|
// Optimizes down to a bitwise AND
|
||||||
|
if i % 8_192 == 0 {
|
||||||
|
tracing::trace! ("rx loop");
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut tag = [0u8, 0, 0, 0];
|
||||||
|
let bytes_read = tcp_read.read (&mut tag).await?;
|
||||||
|
if bytes_read != 4 {
|
||||||
|
bail! ("loops::rx: Couldn't read 4 bytes for tag");
|
||||||
|
}
|
||||||
|
if tag != [1, 0, 0, 0] {
|
||||||
|
bail! ("loops::rx: unexpected tag in framing");
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut length = [0u8, 0, 0, 0];
|
||||||
|
let bytes_read = tcp_read.read (&mut length).await?;
|
||||||
|
if bytes_read != 4 {
|
||||||
|
bail! ("loops::rx: Couldn't read 4 bytes for tag");
|
||||||
|
}
|
||||||
|
|
||||||
|
let length = usize::try_from (u32::from_le_bytes (length))?;
|
||||||
|
if length >= 8_192 {
|
||||||
|
bail! ("loops::rx: Length too big for UDP packets");
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut buf = vec! [0u8; length];
|
||||||
|
let bytes_read = tcp_read.read_exact (&mut buf).await?;
|
||||||
|
if length != bytes_read {
|
||||||
|
bail! ("loops::rx: read_exact failed");
|
||||||
|
}
|
||||||
|
buf.truncate (bytes_read);
|
||||||
|
|
||||||
|
udp_sock.send (&buf).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn tx (
|
||||||
|
udp_sock: Arc <UdpSocket>,
|
||||||
|
mut tcp_write: tcp::OwnedWriteHalf,
|
||||||
|
) -> anyhow::Result <()>
|
||||||
|
{
|
||||||
|
for i in 0u64.. {
|
||||||
|
// Optimizes down to a bitwise AND
|
||||||
|
if i % 8_192 == 0 {
|
||||||
|
tracing::trace! ("tx loop");
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut buf = vec! [0u8; 8_192];
|
||||||
|
let bytes_read = udp_sock.recv (&mut buf).await?;
|
||||||
|
buf.truncate (bytes_read);
|
||||||
|
|
||||||
|
let tag = [1u8, 0, 0, 0];
|
||||||
|
let length = u32::try_from (bytes_read)?.to_le_bytes ();
|
||||||
|
|
||||||
|
tcp_write.write_all (&tag).await?;
|
||||||
|
tcp_write.write_all (&length).await?;
|
||||||
|
tcp_write.write_all (&buf).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
|
@ -0,0 +1,76 @@
|
||||||
|
/*
|
||||||
|
|
||||||
|
To test manually, run this 3 commands:
|
||||||
|
|
||||||
|
- Terminal A: `nc -l -u -p 9502`
|
||||||
|
- Terminal B: `cargo run -p udp_over_tcp`
|
||||||
|
- Terminal C: `nc -p 9500 -u 127.0.0.1 9501`
|
||||||
|
|
||||||
|
Terminals A and C should be connected through the UDP-over-TCP connection
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
net::{
|
||||||
|
Ipv4Addr,
|
||||||
|
SocketAddr,
|
||||||
|
SocketAddrV4,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
use tokio::{
|
||||||
|
runtime,
|
||||||
|
spawn,
|
||||||
|
};
|
||||||
|
|
||||||
|
mod client;
|
||||||
|
mod loops;
|
||||||
|
mod server;
|
||||||
|
|
||||||
|
// The ephemeral UDP port that the PTTH_QUIC client will bind
|
||||||
|
const PORT_0: u16 = 9500;
|
||||||
|
|
||||||
|
// The well-known UDP port that the UDP-over-TCP client will bind
|
||||||
|
// The PTTH_QUIC client must connect to this instead of the real relay address
|
||||||
|
const PORT_1: u16 = 9501;
|
||||||
|
|
||||||
|
// The well-known TCP port that the UDP-over-TCP server will bind
|
||||||
|
const PORT_2: u16 = 9502;
|
||||||
|
|
||||||
|
// The well-known UDP port that the PTTH_QUIC relay will bind
|
||||||
|
const PORT_3: u16 = 9502;
|
||||||
|
|
||||||
|
fn main () -> anyhow::Result <()> {
|
||||||
|
let rt = runtime::Runtime::new ()?;
|
||||||
|
|
||||||
|
rt.block_on (async_main ())?;
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn async_main () -> anyhow::Result <()> {
|
||||||
|
let server_cfg = server::Config {
|
||||||
|
tcp_port: PORT_2,
|
||||||
|
udp_port: PORT_3,
|
||||||
|
};
|
||||||
|
let server_app = server::Listener::new (server_cfg).await?;
|
||||||
|
let server_task = spawn (server_app.run ());
|
||||||
|
|
||||||
|
let client_cfg = client::Config {
|
||||||
|
udp_eph_port: PORT_0,
|
||||||
|
udp_local_server_port: PORT_1,
|
||||||
|
tcp_server_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, PORT_2)),
|
||||||
|
};
|
||||||
|
let client_task = spawn (client::main (client_cfg));
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_val = client_task => {
|
||||||
|
println! ("Client exited, exiting");
|
||||||
|
},
|
||||||
|
_val = server_task => {
|
||||||
|
println! ("Server exited, exiting");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
|
@ -0,0 +1,88 @@
|
||||||
|
use std::{
|
||||||
|
net::{
|
||||||
|
Ipv4Addr,
|
||||||
|
SocketAddrV4,
|
||||||
|
},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use tokio::{
|
||||||
|
net::{
|
||||||
|
TcpListener,
|
||||||
|
TcpStream,
|
||||||
|
UdpSocket,
|
||||||
|
},
|
||||||
|
spawn,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::loops;
|
||||||
|
|
||||||
|
#[derive (Clone)]
|
||||||
|
pub struct Config {
|
||||||
|
/// The well-known TCP port that the UDP-over-TCP server will bind
|
||||||
|
pub tcp_port: u16,
|
||||||
|
|
||||||
|
/// The well-known UDP port that the PTTH_QUIC relay will bind
|
||||||
|
pub udp_port: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Listener {
|
||||||
|
cfg: Config,
|
||||||
|
tcp_listener: TcpListener,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Listener {
|
||||||
|
pub async fn new (cfg: Config) -> anyhow::Result <Self> {
|
||||||
|
let tcp_listener = TcpListener::bind ((Ipv4Addr::UNSPECIFIED, cfg.tcp_port)).await?;
|
||||||
|
|
||||||
|
Ok (Self {
|
||||||
|
cfg,
|
||||||
|
tcp_listener,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tcp_port (&self) -> anyhow::Result <u16> {
|
||||||
|
Ok (self.tcp_listener.local_addr ()?.port ())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run (self) -> anyhow::Result <()> {
|
||||||
|
let Self {
|
||||||
|
cfg,
|
||||||
|
tcp_listener,
|
||||||
|
} = self;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (conn, _peer_addr) = tcp_listener.accept ().await?;
|
||||||
|
|
||||||
|
let cfg = cfg.clone ();
|
||||||
|
spawn (handle_connection (cfg, conn));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_connection (cfg: Config, conn: TcpStream) -> anyhow::Result <()> {
|
||||||
|
let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?;
|
||||||
|
udp_sock.connect ((Ipv4Addr::LOCALHOST, cfg.udp_port)).await?;
|
||||||
|
|
||||||
|
let (tcp_read, tcp_write) = conn.into_split ();
|
||||||
|
|
||||||
|
let rx_task;
|
||||||
|
let tx_task;
|
||||||
|
|
||||||
|
{
|
||||||
|
let udp_sock = Arc::new (udp_sock);
|
||||||
|
rx_task = spawn (loops::rx (Arc::clone (&udp_sock), tcp_read));
|
||||||
|
tx_task = spawn (loops::tx (Arc::clone (&udp_sock), tcp_write));
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_val = tx_task => {
|
||||||
|
println! ("server_handle_connection: tx_task exited, exiting");
|
||||||
|
}
|
||||||
|
_val = rx_task => {
|
||||||
|
println! ("server_handle_connection: rx_task exited, exiting");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok (())
|
||||||
|
}
|
Loading…
Reference in New Issue