Compare commits

...

42 Commits

Author SHA1 Message Date
_ dfc6885b8c ⬆️ deps: make reqwest use rustls instead of libcrypto
Fixes a bug that a user had.
2023-08-31 18:47:41 -05:00
(on company time) 43fe373847 Merge branch 'main' of https://six-five-six-four.com/git/reactor/ptth into main 2023-08-29 08:53:37 -05:00
(on company time) 0d6290ed60 🐛 bug: fix a config bug
In ptth_multi_call_server, the file server root was "" instead of ".".
This happened in 134035f due to a bad merge.

I'm not sure how to add a test for it.
2023-01-26 15:52:44 -06:00
(on company time) ffeeda1e0a add untracked dir 2023-01-12 16:21:41 -06:00
(on company time) 1905d85a59 🚧 wip 2022-12-20 14:54:51 -06:00
(on company time) 5e7f691a60 🐛 bug: fix constructing errors and forgetting to actually throw the errors 2022-12-20 14:46:08 -06:00
(on company time) 200c07da2f 🚧 wip: finding a bug. It's in the udp_over_tcp code
When a TCP client disconnects, the rx loop gets confused and goes into an
infinite loop without ever yielding.
2022-12-20 14:40:51 -06:00
(on company time) ce7a539413 🔧 config: add tcp_listen_port to config file 2022-12-19 17:05:39 -06:00
(on company time) 96af820ffb 🚨 remove unused vars 2022-12-19 16:37:54 -06:00
(on company time) 74fb0cbe5a 🐛 bug: patch UDP-over-TCP example 2022-12-19 16:36:38 -06:00
(on company time) 3f4bce85a4 UDP-over-TCP passes okay
Next step is updating the relay and making sure it integrates into (our thing that uses PTTH_QUIC)
2022-12-19 16:25:50 -06:00
(on company time) e3ff600b51 expose count of connected end servers to testing code 2022-12-19 14:58:12 -06:00
(on company time) 0992d76fdc 🚨 remove unused `use` 2022-12-19 14:33:35 -06:00
(on company time) 27ed72b196 test(ptth_quic): add end server --> relay server connection tests 2022-12-19 14:26:50 -06:00
(on company time) 3f0272ed09 👕 refactor: break apart the relay's main fn so we can see which ports it bound 2022-12-19 13:51:37 -06:00
(on company time) 605c15468a 👕 refactor: making PTTH_QUIC more testable 2022-12-19 13:21:46 -06:00
(on company time) 415c8954c1 🚧 wip: setting up testing for PTTH_QUIC 2022-12-19 13:17:22 -06:00
(on company time) 93be903b86 ;loud_sound: report all incoming connections over WebHook, for now 2022-12-16 16:46:30 -06:00
(on company time) 33fe10ee27 🔇 remove this debugging thing 2022-12-16 16:19:50 -06:00
(on company time) 6ba988c2c8 📦 build: fix bug in Dockerfile 2022-12-16 16:13:24 -06:00
(on company time) 0fc99e7c26 set up IP addr nicknames for eventual IP allow-listing 2022-12-16 16:05:29 -06:00
(on company time) 86e5305630 🚧 wip: start making a place for PTTH_QUIC relay server config 2022-12-16 15:43:47 -06:00
(on company time) 5eda2c4288 📦 build: Fix PTTH_QUIC Dockerfile 2022-12-16 14:05:35 -06:00
(on company time) 50332bab69 ⬆️ (ptth_quic_client_gui) FLTK 1.3.24 2022-12-16 13:56:46 -06:00
(on company time) 24dac2cc39 🔊 (ptth_quic relay server) log listen address 2022-12-16 13:56:31 -06:00
(on company time) 996543cecc 🚧 wip: PTTH_QUIC to quinn 0.9.x 2022-12-16 13:49:34 -06:00
(on company time) c13d1f37bf ⬆️ update ptth_quic deps 2022-12-16 13:29:57 -06:00
(on company time) 5a9c301747 🔊 add random IDs for QUIC connection events 2022-12-16 13:25:21 -06:00
(on company time) 91a29abb39 🐛 bug: give up on env var, it wasn't working 2022-12-16 11:23:54 -06:00
(on company time) 9ab3b42e32 👕 refactor: move the webhook task into run_relay 2022-12-16 09:57:38 -06:00
(on company time) b53748b2c4 try sending webhooks from the relay 2022-12-16 09:33:56 -06:00
(on company time) e05c4fa8bf ⬆️ update deps for ptth_relay 2022-12-15 17:24:45 -06:00
(on company time) d93c1404b7 ⬆️ cargo update 2022-12-15 17:17:30 -06:00
(on company time) 7f2dc47aec 🐛 bug: fix manifest that wasn't set up in the Docker build correctly 2022-12-15 17:07:01 -06:00
(on company time) 80c2ef17be ⬆️ Rust 1.66 2022-12-15 17:04:24 -06:00
(on company time) f9e10e0f64 use udp_over_tcp for both PTTH_QUIC end server and relay server
- `cargo run -p ptth_quic --bin ptth_quic_relay_server -- --tcp-listen-port 4440`
- `cargo run -p ptth_quic --bin ptth_quic_end_server -- --use-udp-over-tcp true --relay-addr 127.0.0.1:4440 --server-id my_server`
- `cargo run -p ptth_quic_client_gui -- --client-id me`
- `nc -l -p 5900`
- (Open my_server, 5900 in the client GUI)
- `nc 127.0.0.1 50369` (or whatever port the GUI picked)
2022-10-31 13:50:42 -05:00
(on company time) 8a302f3049 👕 refactor: use full address for udp_over_tcp client instead of just port 2022-10-31 13:15:23 -05:00
(on company time) fd3c85fccd add udp_over_tcp server to PTTH_QUIC relay server
maybe it'll work
2022-10-31 13:11:31 -05:00
(on company time) 963631ff96 remove old unused TCP server from PTTH_QUIC relay 2022-10-31 12:55:29 -05:00
(on company time) 036193a19e add udp_over_tcp as dep to ptth_quic 2022-10-31 12:54:46 -05:00
(on company time) b5be7709a3 👕 refactor: extract configs for client and server 2022-10-31 11:36:03 -05:00
(on company time) edd7e8de54 proof of concept for UDP-over-TCP 2022-10-31 11:24:15 -05:00
29 changed files with 1739 additions and 797 deletions

4
.gitignore vendored
View File

@ -9,6 +9,10 @@
/scope/untracked
/scraper-secret.txt
/target
/untracked
# TLS certs used for QUIC experiments
*.crt
# Kate editor temp file
*.kate-swp

1263
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,8 @@
# https://whitfin.io/speeding-up-rust-docker-builds/
# TODO: https://stackoverflow.com/questions/57389547/how-to-define-the-context-for-a-docker-build-as-a-specific-commit-on-one-of-the
# rust:1.60-slim-buster
FROM rust@sha256:c0f26a0b299a8a74cd87be0b4bd291d55aa292198bab1bafd906edd8665edb82 as build
# docker pull rust:1.66-slim-buster
FROM rust@sha256:98c9b1fca0c9a6183369daf9efadb57c634340ae877bb027aeadf72afdd086a3 as build
WORKDIR /
ENV USER root
@ -20,7 +20,8 @@ cargo new --bin crates/ptth_server && \
cargo new --bin crates/ptth_file_server_bin && \
cargo new --bin tools/ptth_tail && \
cargo new --bin crates/debug_proxy && \
cargo new --bin crates/ptth_quic
cargo new --bin crates/ptth_quic && \
cargo new --lib crates/udp_over_tcp
# copy over your manifests
COPY ./Cargo.lock ./
@ -29,6 +30,7 @@ COPY ./crates/always_equal/Cargo.toml ./crates/always_equal/
COPY ./crates/ptth_core/Cargo.toml ./crates/ptth_core/
COPY ./crates/ptth_relay/Cargo.toml ./crates/ptth_relay/
COPY ./crates/ptth_quic/Cargo.toml ./crates/ptth_quic/
COPY ./crates/udp_over_tcp/Cargo.toml ./crates/udp_over_tcp/
# this build step will cache your dependencies
RUN cargo build --release -p ptth_relay

View File

@ -10,23 +10,28 @@ repository = "https://six-five-six-four.com/git/reactor/ptth"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.38"
base64 = "0.13.0"
ctrlc = "3.2.1"
# fltk = "1.1.1"
futures-util = "0.3.9"
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] }
quinn = "0.8.5"
rand = "0.8.4"
rcgen = "0.8.11"
rmp-serde = "0.15.5"
rustls = "0.20.4"
structopt = "0.3.20"
tokio = { version = "1.8.1", features = ["full"] }
tracing-subscriber = "0.2.16"
tracing = "0.1.25"
anyhow = "1.0.66"
arc-swap = "1.5.1"
base64 = "0.20.0"
ctrlc = "3.2.4"
futures-util = "0.3.25"
hyper = { version = "0.14.23", features = ["http1", "server", "stream", "tcp"] }
quinn = "0.9.3"
rand = "0.8.5"
rcgen = "0.10.0"
ring = "0.16.20"
rmp-serde = "1.1.1"
rustls = "0.20.7"
rusty_ulid = "1.0.0"
serde = "1.0.151"
serde_json = "1.0.89"
structopt = "0.3.26"
tokio = { version = "1.23.0", features = ["full"] }
tracing-subscriber = "0.3.16"
tracing = "0.1.37"
udp_over_tcp = { path = "../udp_over_tcp" }
[dependencies.reqwest]
version = "0.11.10"
version = "0.11.13"
default-features = false
features = ["stream", "rustls-tls", "hyper-rustls"]

View File

@ -1,8 +1,8 @@
# https://whitfin.io/speeding-up-rust-docker-builds/
# TODO: https://stackoverflow.com/questions/57389547/how-to-define-the-context-for-a-docker-build-as-a-specific-commit-on-one-of-the
# rust:1.64-slim-buster
FROM rust@sha256:7da4fbd2dc7176746e8e5c371aeb0bbe742598c4906fa48cb2d87a4b89d50357 as build
# docker pull rust:1.66-slim-buster
FROM rust@sha256:98c9b1fca0c9a6183369daf9efadb57c634340ae877bb027aeadf72afdd086a3 as build
WORKDIR /
ENV USER root
@ -20,7 +20,8 @@ cargo new --bin crates/ptth_server && \
cargo new --bin crates/ptth_file_server_bin && \
cargo new --bin tools/ptth_tail && \
cargo new --bin crates/debug_proxy && \
cargo new --bin crates/ptth_quic
cargo new --bin crates/ptth_quic && \
cargo new --lib crates/udp_over_tcp
# copy over your manifests
COPY ./Cargo.lock ./
@ -29,6 +30,7 @@ COPY ./crates/always_equal/Cargo.toml ./crates/always_equal/
COPY ./crates/ptth_core/Cargo.toml ./crates/ptth_core/
COPY ./crates/ptth_relay/Cargo.toml ./crates/ptth_relay/
COPY ./crates/ptth_quic/Cargo.toml ./crates/ptth_quic/
COPY ./crates/udp_over_tcp/Cargo.toml ./crates/udp_over_tcp/
# this build step will cache your dependencies
RUN cargo build --release -p ptth_quic
@ -39,7 +41,8 @@ src/*.rs \
crates/always_equal/src/*.rs \
crates/ptth_core/src/*.rs \
crates/ptth_relay/src/*.rs \
crates/ptth_quic/src/*.rs
crates/ptth_quic/src/*.rs \
crates/udp_over_tcp/src/*.rs
# Copy source tree
# Yes, I tried a few variations on the syntax. Dockerfiles are just rough.
@ -50,6 +53,7 @@ COPY ./crates/ptth_core ./crates/ptth_core
COPY ./crates/ptth_relay ./crates/ptth_relay
COPY ./handlebars/ ./handlebars
COPY ./crates/ptth_quic ./crates/ptth_quic
COPY ./crates/udp_over_tcp ./crates/udp_over_tcp
# Bug in cargo's incremental build logic, triggered by
# Docker doing something funny with mtimes? Maybe?
@ -58,6 +62,7 @@ RUN touch crates/ptth_core/src/lib.rs
# build for release
# gate only on ptth_relay tests for now
RUN \
find . && \
cargo build --release -p ptth_quic --bin ptth_quic_relay_server && \
cargo test --release -p ptth_quic --bin ptth_quic_relay_server

View File

@ -61,10 +61,7 @@ impl P2Client {
let conf = Arc::clone (&self.conf);
let quinn::NewConnection {
connection,
..
} = protocol::p2_connect_to_p3 (&self.endpoint, conf.relay_addr, &conf.client_id).await?;
let connection = protocol::p2_connect_to_p3 (&self.endpoint, conf.relay_addr, &conf.client_id).await?;
let client_tcp_port = conf.client_tcp_port;

View File

@ -1,16 +1,13 @@
use std::{
iter::FromIterator,
};
use tokio::sync::watch;
use ptth_quic::prelude::*;
use ptth_quic::executable_end_server as server;
#[tokio::main]
async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let args = Vec::from_iter (std::env::args_os ());
let args: Vec <_> = std::env::args_os ().collect ();
let (shutdown_tx, shutdown_rx) = watch::channel (false);
@ -19,5 +16,5 @@ async fn main () -> anyhow::Result <()> {
})?;
trace! ("Set Ctrl+C handler");
ptth_quic::executable_end_server::main (&args, Some (shutdown_rx)).await
server::main (&args, Some (shutdown_rx)).await
}

View File

@ -1,6 +1,7 @@
use tokio::sync::watch;
use ptth_quic::prelude::*;
use ptth_quic::executable_relay_server as relay;
#[tokio::main]
async fn main () -> anyhow::Result <()> {
@ -8,7 +9,7 @@ async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let opt = ptth_quic::executable_relay_server::Opt::from_args ();
let opt = relay::Opt::from_args ();
let (running_tx, mut running_rx) = watch::channel (true);
@ -17,8 +18,15 @@ async fn main () -> anyhow::Result <()> {
})?;
trace! ("Set Ctrl+C handler");
let app = relay::App::new (opt).await?;
println! ("Base64 cert: {}", base64::encode (app.server_cert ()));
println! ("Listening on {}", app.listen_addr ());
tokio::fs::create_dir_all ("ptth_quic_output").await?;
tokio::fs::write ("ptth_quic_output/quic_server.crt", app.server_cert ()).await?;
tokio::select! {
val = ptth_quic::executable_relay_server::main (opt) => {
val = app.run () => {
},
val = running_rx.changed () => {

View File

@ -0,0 +1,32 @@
#[cfg (test)]
mod test {
#[test]
fn signing () -> anyhow::Result <()> {
use std::fs;
use ring::{
signature::{
self,
Ed25519KeyPair,
KeyPair,
},
};
fs::create_dir_all ("untracked")?;
let rng = ring::rand::SystemRandom::new ();
let pkcs8_bytes = Ed25519KeyPair::generate_pkcs8 (&rng).map_err (|_| anyhow::anyhow! ("generate_pkcs8"))?;
let key_pair = Ed25519KeyPair::from_pkcs8 (pkcs8_bytes.as_ref ()).map_err (|_| anyhow::anyhow! ("from_pkcs8"))?;
const MESSAGE: &[u8] = b":V";
let sig = key_pair.sign (MESSAGE);
let peer_public_key_bytes = key_pair.public_key ().as_ref ();
let peer_public_key = signature::UnparsedPublicKey::new (&signature::ED25519, peer_public_key_bytes);
peer_public_key.verify (MESSAGE, sig.as_ref ()).map_err (|_| anyhow::anyhow! ("verify"))?;
Ok (())
}
}

View File

@ -10,7 +10,7 @@ use protocol::PeerId;
/// A partially-filled-out config that structopt can deal with
/// Try to turn this into a Config as soon as possible.
#[derive (Debug, StructOpt)]
struct Opt {
pub struct Opt {
#[structopt (long)]
relay_addr: Option <String>,
#[structopt (long)]
@ -19,6 +19,18 @@ struct Opt {
debug_echo: bool,
#[structopt (long)]
cert_url: Option <String>,
#[structopt (long)]
use_udp_over_tcp: Option <bool>,
}
/// A filled-out config for constructing an end server
#[derive (Clone)]
pub (crate) struct Config {
pub debug_echo: bool,
pub id: String,
pub relay_addr: SocketAddr,
pub relay_cert: Vec <u8>,
pub use_udp_over_tcp: bool,
}
pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool>>) -> anyhow::Result <()> {
@ -26,10 +38,9 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool
let opt = Opt::from_iter (args);
let conf = opt.into_config ().await?;
let end_server = Arc::new (P4EndServer::connect (conf)?);
let (end_server, shutdown_tx) = P4EndServer::connect (conf).await?;
let run_task = {
let end_server = Arc::clone (&end_server);
tokio::spawn (async move {
end_server.run ().await?;
Ok::<_, anyhow::Error> (())
@ -40,7 +51,8 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool
while ! *shutdown_rx.borrow () {
shutdown_rx.changed ().await?;
}
end_server.shut_down ()?;
trace! ("P4 end server shutting down...");
shutdown_tx.send (true)?
}
run_task.await??;
@ -50,21 +62,12 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool
Ok (())
}
/// A filled-out config for constructing an end server
#[derive (Clone)]
pub struct Config {
pub debug_echo: bool,
pub id: String,
pub relay_addr: SocketAddr,
pub relay_cert: Vec <u8>,
}
impl Opt {
/// Converts self into a Config that the server can use.
/// Performs I/O to load the relay cert from disk or from HTTP.
/// Fails if arguments can't be parsed or if I/O fails.
pub async fn into_config (self) -> anyhow::Result <Config> {
pub (crate) async fn into_config (self) -> anyhow::Result <Config> {
let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ());
let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
@ -83,53 +86,74 @@ impl Opt {
id,
relay_addr,
relay_cert,
use_udp_over_tcp: self.use_udp_over_tcp.unwrap_or (false),
})
}
}
pub struct P4EndServer {
conf: Config,
conn: quinn::Connection,
endpoint: quinn::Endpoint,
conf: Arc <Config>,
shutdown_tx: watch::Sender <bool>,
shutdown_rx: watch::Receiver <bool>,
}
impl P4EndServer {
pub fn connect (conf: Config) -> anyhow::Result <Self> {
trace! ("P4 end server making its QUIC endpoint");
pub (crate) async fn connect (conf: Config) -> anyhow::Result <(Self, watch::Sender <bool>)> {
debug! ("P4 end server making its QUIC endpoint");
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
let (shutdown_tx, shutdown_rx) = watch::channel (false);
let conf = if conf.use_udp_over_tcp {
let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?;
udp_sock.connect ((Ipv4Addr::LOCALHOST, endpoint.local_addr ()?.port ())).await?;
let udp_local_server_port = udp_sock.local_addr ()?.port ();
let tcp_sock = TcpSocket::new_v4 ()?;
let tcp_conn = tcp_sock.connect (conf.relay_addr).await?;
tokio::spawn (async move {
udp_over_tcp::client::main_with_sockets (udp_sock, tcp_conn).await
});
Config {
debug_echo: conf.debug_echo,
id: conf.id,
relay_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, udp_local_server_port)),
relay_cert: conf.relay_cert,
use_udp_over_tcp: true,
}
}
else {
conf
};
Ok (P4EndServer {
conf: Arc::new (conf),
endpoint,
shutdown_tx,
shutdown_rx,
})
}
pub fn config (&self) -> &Config {
&*self.conf
}
pub async fn run (&self) -> anyhow::Result <()> {
trace! ("P4 end server connecting to P3 relay server");
let quinn::NewConnection {
mut bi_streams,
..
} = protocol::p4_connect_to_p3 (
&self.endpoint,
self.conf.relay_addr,
&self.conf.id
debug! ("P4 end server connecting to P3 relay server");
let conn = protocol::p4_connect_to_p3 (
&endpoint,
conf.relay_addr,
&conf.id
).await?;
debug! ("Connected to relay server");
let (shutdown_tx, shutdown_rx) = watch::channel (false);
Ok ((P4EndServer {
conf,
conn,
endpoint,
shutdown_rx,
}, shutdown_tx))
}
pub (crate) async fn run (self) -> anyhow::Result <()> {
trace! ("Accepting bi streams from P3");
let mut shutdown_rx = self.shutdown_rx.clone ();
let conf = Arc::new (self.conf);
loop {
tokio::select! {
_ = shutdown_rx.changed () => {
@ -138,10 +162,10 @@ impl P4EndServer {
break;
}
}
stream_opt = bi_streams.next () => {
let (relay_send, relay_recv) = stream_opt.ok_or_else (|| anyhow::anyhow! ("P4 ran out of incoming streams. Maybe P3 shut down or disconnected?"))??;
stream_opt = self.conn.accept_bi () => {
let (relay_send, relay_recv) = stream_opt?;
tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv));
tokio::spawn (handle_bi_stream (Arc::clone (&conf), relay_send, relay_recv));
}
};
}
@ -149,11 +173,6 @@ impl P4EndServer {
Ok (())
}
pub fn shut_down (&self) -> anyhow::Result <()> {
trace! ("P4 end server shutting down...");
Ok (self.shutdown_tx.send (true)?)
}
pub fn shutting_down (&self) -> bool {
*self.shutdown_rx.borrow ()
}

View File

@ -20,142 +20,218 @@ use protocol::PeerId;
#[derive (Debug, StructOpt)]
pub struct Opt {
#[structopt (long)]
listen_addr: Option <String>,
pub (crate) listen_addr: Option <String>,
#[structopt (long)]
pub (crate) tcp_listen_port: Option <u16>,
}
pub async fn main (opt: Opt) -> anyhow::Result <()>
{
let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?;
let (mut incoming, server_cert) = make_server_endpoint (listen_addr)?;
println! ("Base64 cert: {}", base64::encode (&server_cert));
tokio::fs::create_dir_all ("ptth_quic_output").await?;
tokio::fs::write ("ptth_quic_output/quic_server.crt", &server_cert).await?;
let relay_state = Arc::new (RelayState::default ());
let make_svc = {
let relay_state = Arc::clone (&relay_state);
make_service_fn (move |_conn| {
let relay_state = Arc::clone (&relay_state);
pub struct App {
endpoint: quinn::Endpoint,
listen_addr: SocketAddr,
pub (crate) metrics: Arc <RwLock <Metrics>>,
server_cert: Vec <u8>,
tcp_listener: Option <udp_over_tcp::server::Listener>,
}
#[derive (Default)]
pub (crate) struct Metrics {
pub (crate) connected_end_servers: usize,
}
impl App {
pub async fn new (opt: Opt) -> anyhow::Result <Self> {
let config = load_config ().await.ok ();
let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?;
let (endpoint, server_cert) = make_server_endpoint (listen_addr)?;
let listen_addr = endpoint.local_addr ()?;
let tcp_port = opt.tcp_listen_port.or (config.map (|cfg| cfg.tcp_listen_port).flatten ());
let tcp_listener = if let Some (tcp_port) = tcp_port {
let cfg = udp_over_tcp::server::Config {
tcp_port,
udp_port: listen_addr.port (),
};
async move {
Ok::<_, String> (service_fn (move |req| {
Some (udp_over_tcp::server::Listener::new (cfg).await?)
}
else {
None
};
Ok (Self {
endpoint,
listen_addr,
metrics: Default::default (),
server_cert,
tcp_listener,
})
}
pub fn listen_addr (&self) -> SocketAddr {
self.listen_addr
}
pub fn server_cert (&self) -> &[u8] {
&self.server_cert
}
pub fn tcp_listen_port (&self) -> anyhow::Result <Option <u16>> {
match self.tcp_listener.as_ref () {
None => Ok (None),
Some (tcp_listener) => Ok (tcp_listener.tcp_port ()?.into ()),
}
}
pub async fn run (self) -> anyhow::Result <()> {
let Self {
endpoint,
listen_addr,
metrics,
server_cert,
tcp_listener,
} = self;
let mut relay_state = RelayState::default ();
relay_state.metrics = metrics;
if let Err (e) = relay_state.reload_config ().await {
error! ("{:?}", e);
}
let relay_state = Arc::new (relay_state);
let make_svc = {
let relay_state = Arc::clone (&relay_state);
make_service_fn (move |_conn| {
let relay_state = Arc::clone (&relay_state);
async move {
Ok::<_, String> (service_fn (move |req| {
let relay_state = Arc::clone (&relay_state);
handle_http (req, relay_state)
}))
}
})
};
let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004));
let http_server = Server::bind (&http_addr);
let _task_reload_config = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
let mut interval = tokio::time::interval (std::time::Duration::from_secs (60));
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick ().await;
relay_state.reload_config ().await.ok ();
}
})
};
let task_quic_server = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
while let Some (conn) = endpoint.accept ().await {
let relay_state = Arc::clone (&relay_state);
handle_http (req, relay_state)
}))
}
})
};
let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004));
let http_server = Server::bind (&http_addr);
let tcp_port = 30382;
let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?;
let task_quic_server = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
while let Some (conn) = incoming.next ().await {
let relay_state = Arc::clone (&relay_state);
// Each new peer QUIC connection gets its own task
tokio::spawn (async move {
let active = relay_state.stats.quic.connect ();
debug! ("QUIC connections: {}", active);
match handle_quic_connection (Arc::clone (&relay_state), conn).await {
Ok (_) => (),
Err (e) => warn! ("handle_quic_connection `{:?}`", e),
}
let active = relay_state.stats.quic.disconnect ();
debug! ("QUIC connections: {}", active);
});
}
// Each new peer QUIC connection gets its own task
tokio::spawn (async move {
let active = relay_state.stats.quic.connect ();
debug! ("QUIC connections: {}", active);
match handle_quic_connection (Arc::clone (&relay_state), conn).await {
Ok (_) => (),
Err (e) => warn! ("handle_quic_connection `{:?}`", e),
}
let active = relay_state.stats.quic.disconnect ();
debug! ("QUIC connections: {}", active);
});
}
Ok::<_, anyhow::Error> (())
})
};
let task_direc_server = {
let relay_state = Arc::clone (&relay_state);
Ok::<_, anyhow::Error> (())
})
};
tokio::spawn (async move {
let sock = UdpSocket::bind("0.0.0.0:30379").await?;
let mut buf = [0; 2048];
loop {
let (len, addr) = sock.recv_from (&mut buf).await?;
debug! ("{:?} bytes received from {:?}", len, addr);
let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x));
{
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
let task_direc_server = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
let sock = UdpSocket::bind("0.0.0.0:30379").await?;
let mut buf = [0; 2048];
loop {
let (len, addr) = sock.recv_from (&mut buf).await?;
debug! ("{:?} bytes received from {:?}", len, addr);
if let Some (direc_state) = direc_cookies.remove (&packet) {
debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id);
direc_state.p2_addr.send (addr).ok ();
}
else {
debug! ("UDP packet didn't match any PTTH_DIREC cookie");
let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x));
{
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
if let Some (direc_state) = direc_cookies.remove (&packet) {
debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id);
direc_state.p2_addr.send (addr).ok ();
}
else {
debug! ("UDP packet didn't match any PTTH_DIREC cookie");
}
}
}
}
Ok::<_, anyhow::Error> (())
})
};
let task_http_server = tokio::spawn (async move {
http_server.serve (make_svc).await?;
Ok::<_, anyhow::Error> (())
});
let task_tcp_server = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
loop {
let (tcp_socket, _) = tcp_listener.accept ().await?;
let relay_state = Arc::clone (&relay_state);
Ok::<_, anyhow::Error> (())
})
};
let task_http_server = tokio::spawn (async move {
http_server.serve (make_svc).await?;
Ok::<_, anyhow::Error> (())
});
debug! ("Serving HTTP on {:?}", http_addr);
if let Some (tcp_listener) = tcp_listener {
tokio::spawn (async move {
if let Err (e) = tcp_listener.run ().await {
eprintln! ("udp_over_tcp::server::main exited with err {:?}", e);
}
Ok::<_, anyhow::Error> (())
});
}
{
let config = relay_state.config.load ();
dbg! (&config.webhook_url);
if let Some (webhook_url) = config.webhook_url.clone () {
let j = json! ({
"text": "Booting up",
}).to_string ();
let http_client = relay_state.http_client.clone ();
tokio::spawn (async move {
let (_client_recv, _client_send) = tcp_socket.into_split ();
debug! ("Accepted direct TCP connection P1 --> P3");
let p4_server_proxies = relay_state.p4_server_proxies.lock ().await;
let _p4 = match p4_server_proxies.get ("bogus_server") {
Some (x) => x,
None => bail! ("That server isn't connected"),
};
// unimplemented! ();
/*
p4.req_channel.send (RequestP2ToP4 {
client_send,
client_recv,
client_id: "bogus_client".to_string (),
}).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?;
*/
Ok (())
http_client.post (webhook_url).body (j).send ().await
});
}
Ok::<_, anyhow::Error> (())
})
};
debug! ("Serving HTTP on {:?}", http_addr);
task_quic_server.await??;
task_http_server.await??;
task_tcp_server.await??;
task_direc_server.await??;
Ok (())
}
tokio::select! {
_val = task_quic_server => {
eprintln! ("QUIC relay server exited, exiting");
},
_val = task_http_server => {
eprintln! ("HTTP server exited, exiting");
},
_val = task_direc_server => {
eprintln! ("PTTH_DIREC server exited, exiting");
},
}
Ok (())
}
}
async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
@ -178,9 +254,36 @@ async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
#[derive (Default)]
struct RelayState {
config: arc_swap::ArcSwap <Config>,
p4_server_proxies: Mutex <HashMap <PeerId, P4State>>,
direc_cookies: Mutex <HashMap <Vec <u8>, DirecState>>,
metrics: Arc <RwLock <Metrics>>,
stats: Stats,
http_client: reqwest::Client,
}
#[derive (Default)]
struct Config {
ip_nicknames: BTreeMap <[u8; 4], String>,
tcp_listen_port: Option <u16>,
webhook_url: Option <String>,
}
impl From <ConfigFile> for Config {
fn from (x: ConfigFile) -> Self {
Self {
ip_nicknames: x.ip_nicknames.into_iter ().collect (),
tcp_listen_port: x.tcp_listen_port,
webhook_url: x.webhook_url,
}
}
}
#[derive (Deserialize)]
struct ConfigFile {
ip_nicknames: Vec <([u8; 4], String)>,
tcp_listen_port: Option <u16>,
webhook_url: Option <String>,
}
struct DirecState {
@ -222,11 +325,24 @@ impl ConnectEvents {
struct P4State {
req_channel: mpsc::Sender <RequestP2ToP4>,
}
async fn load_config () -> anyhow::Result <ConfigFile>
{
let s = tokio::fs::read_to_string ("config/ptth_quic_relay_server.json").await?;
let cfg: ConfigFile = serde_json::from_str (&s)?;
Ok (cfg)
}
impl RelayState {
async fn reload_config (&self) -> anyhow::Result <()> {
let config = load_config ().await?;
let config = Arc::new (Config::from (config));
self.config.store (config);
Ok (())
}
}
struct RequestP2ToP4 {
@ -301,30 +417,69 @@ async fn handle_quic_connection (
conn: quinn::Connecting,
) -> anyhow::Result <()>
{
let mut conn = conn.await?;
let id = Ulid::generate ();
let config = relay_state.config.load ();
let remote_addr = conn.remote_address ();
let ip_nickname = match remote_addr {
SocketAddr::V4 (x) => {
let ip = x.ip ().octets ();
match config.ip_nicknames.get (&ip) {
Some (nick) => nick.as_str (),
_ => "Unknown",
}
},
_ => "Unknown, not IPv4",
};
debug! ("EHG7NVUD Incoming QUIC connection {} from {:?} ({})", id, remote_addr, ip_nickname);
if let Some (webhook_url) = config.webhook_url.clone () {
let j = json! ({
"text": format! ("Incoming QUIC connection from {:?} ({})", remote_addr, ip_nickname),
}).to_string ();
let http_client = relay_state.http_client.clone ();
tokio::spawn (async move {
http_client.post (webhook_url).body (j).send ().await
});
}
let conn = conn.await?;
// Everyone who connects must identify themselves with the first
// bi stream
// TODO: Timeout
let (mut send, mut recv) = conn.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("QUIC client didn't identify itself"))??;
let (mut send, mut recv) = conn.accept_bi ().await?;
let peer = protocol::p3_accept_peer (&mut recv).await?;
match peer {
protocol::P3Peer::P2ClientProxy (peer) => {
trace! ("Accepting connection from P2 client");
trace! ("H36JTVE5 Handling connection {} as P2 client", id);
// TODO: Check authorization for P2 peers
protocol::p3_authorize_p2_peer (&mut send).await?;
handle_p2_connection (relay_state, conn, peer).await?;
},
protocol::P3Peer::P4ServerProxy (peer) => {
trace! ("Accepting connection from P4 end server");
trace! ("LRHUKB7K Handling connection {} as P4 end server", id);
// TODO: Check authorization for P4 peers
protocol::p3_authorize_p4_peer (&mut send).await?;
let metrics = Arc::clone (&relay_state.metrics);
{
let mut m = metrics.write ().await;
m.connected_end_servers += 1;
}
handle_p4_connection (relay_state, conn, peer).await?;
{
let mut m = metrics.write ().await;
m.connected_end_servers -= 1;
}
},
}
@ -333,19 +488,13 @@ async fn handle_quic_connection (
async fn handle_p2_connection (
relay_state: Arc <RelayState>,
conn: quinn::NewConnection,
conn: quinn::Connection,
peer: protocol::P2ClientProxy,
) -> anyhow::Result <()>
{
let client_id = peer.id;
let quinn::NewConnection {
mut bi_streams,
..
} = conn;
while let Some (bi_stream) = bi_streams.next ().await {
let (send, mut recv) = bi_stream?;
while let Ok ((send, mut recv)) = conn.accept_bi ().await {
let relay_state = Arc::clone (&relay_state);
let client_id = client_id.clone ();
@ -457,15 +606,11 @@ async fn handle_direc_p2_to_p4 (
async fn handle_p4_connection (
relay_state: Arc <RelayState>,
conn: quinn::NewConnection,
connection: quinn::Connection,
peer: protocol::P4ServerProxy,
) -> anyhow::Result <()>
{
let server_id = peer.id;
let quinn::NewConnection {
connection,
..
} = conn;
let (tx, mut rx) = mpsc::channel (2);
let p4_state = P4State {

View File

@ -1,7 +1,11 @@
pub mod client_proxy;
pub mod connection;
pub mod crypto;
pub mod executable_end_server;
pub mod executable_relay_server;
pub mod prelude;
pub mod protocol;
pub mod quinn_utils;
#[cfg (test)]
mod tests;

View File

@ -2,7 +2,11 @@ pub use std::{
collections::*,
ffi::OsString,
iter::FromIterator,
net::SocketAddr,
net::{
Ipv4Addr,
SocketAddr,
SocketAddrV4,
},
sync::{
Arc,
atomic::{
@ -26,9 +30,14 @@ pub use tokio::{
AsyncReadExt,
AsyncWriteExt,
},
net::TcpListener,
net::{
TcpListener,
TcpSocket,
UdpSocket,
},
sync::{
Mutex,
RwLock,
mpsc,
},
task::JoinHandle,
@ -37,6 +46,9 @@ pub use rand::{
Rng,
RngCore,
};
pub use rusty_ulid::Ulid;
pub use serde::Deserialize;
pub use serde_json::json;
pub use tracing::{
debug,
error,

View File

@ -33,14 +33,14 @@ pub async fn p2_connect_to_p3 (
endpoint: &quinn::Endpoint,
relay_addr: std::net::SocketAddr,
client_id: &str,
) -> Result <quinn::NewConnection>
) -> Result <quinn::Connection>
{
if client_id.as_bytes ().len () > MAX_ID_LENGTH {
bail! ("Client ID is longer than MAX_ID_LENGTH");
}
let new_conn = endpoint.connect (relay_addr, "localhost")?.await?;
let (mut send, mut recv) = new_conn.connection.open_bi ().await?;
let (mut send, mut recv) = new_conn.open_bi ().await?;
let cmd_type = Command::CONNECT_P2_TO_P3.0;
send.write_all (&[cmd_type, 0, 0, 0]).await?;
@ -251,21 +251,21 @@ pub async fn p4_connect_to_p3 (
endpoint: &quinn::Endpoint,
relay_addr: std::net::SocketAddr,
server_id: &str,
) -> Result <quinn::NewConnection>
) -> Result <quinn::Connection>
{
if server_id.as_bytes ().len () > MAX_ID_LENGTH {
bail! ("Server ID is longer than MAX_ID_LENGTH");
}
let new_conn = endpoint.connect (relay_addr, "localhost")?.await?;
let (mut send, mut recv) = new_conn.connection.open_bi ().await?;
let new_conn = endpoint.connect (relay_addr, "localhost")?.await.context ("UXTDVL2V quinn::Endpoint::connect")?;
let (mut send, mut recv) = new_conn.open_bi ().await?;
let cmd_type = Command::CONNECT_P4_TO_P3.0;
send.write_all (&[cmd_type, 0, 0, 0]).await?;
send_lv_string (&mut send, server_id).await?;
expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await
.context ("P4 didn't get OK response when connecting to P3")?;
.context ("WMGW2RXU P4 didn't get OK response when connecting to P3")?;
Ok (new_conn)
}

View File

@ -8,7 +8,7 @@ use std::{
};
use quinn::{
ClientConfig, Endpoint, Incoming,
ClientConfig, Endpoint,
ServerConfig, TransportConfig,
};
@ -26,7 +26,7 @@ pub fn make_client_endpoint(
let mut transport = quinn::TransportConfig::default ();
transport.keep_alive_interval (Some (Duration::from_millis (5_000)));
client_cfg.transport = Arc::new (transport);
client_cfg.transport_config (Arc::new (transport));
let mut endpoint = Endpoint::client (bind_addr)?;
endpoint.set_default_client_config (client_cfg);
@ -41,10 +41,10 @@ pub fn make_client_endpoint(
/// - a stream of incoming QUIC connections
/// - server certificate serialized into DER format
#[allow(unused)]
pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Incoming, Vec<u8>)> {
pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Endpoint, Vec<u8>)> {
let (server_config, server_cert) = configure_server()?;
let (_endpoint, incoming) = Endpoint::server (server_config, bind_addr)?;
Ok((incoming, server_cert))
let endpoint = Endpoint::server (server_config, bind_addr)?;
Ok((endpoint, server_cert))
}
/// Builds default quinn client config and trusts given certificates.

View File

@ -0,0 +1,110 @@
use crate::prelude::*;
#[test]
fn end_to_end () -> anyhow::Result <()> {
let rt = tokio::runtime::Runtime::new ()?;
rt.block_on (end_to_end_async ())?;
Ok (())
}
async fn end_to_end_async () -> anyhow::Result <()> {
use crate::executable_end_server as server;
use crate::executable_relay_server as relay;
let relay_opt = relay::Opt {
listen_addr: "127.0.0.1:0".to_string ().into (),
tcp_listen_port: Some (0),
};
let relay_app = relay::App::new (relay_opt).await?;
let relay_quic_port = relay_app.listen_addr ().port ();
let relay_cert = Vec::from (relay_app.server_cert ());
let relay_metrics = Arc::clone (&relay_app.metrics);
let tcp_listen_port = relay_app.tcp_listen_port ()?.unwrap ();
assert_ne! (tcp_listen_port, 0);
let task_relay = tokio::spawn (async move {
relay_app.run ().await
});
{
let m = relay_metrics.read ().await;
assert_eq! (m.connected_end_servers, 0);
}
// Connect with wrong port, should fail
let server_conf = server::Config {
debug_echo: false,
id: "bogus".into (),
relay_addr: "127.0.0.1:80".parse ()?,
relay_cert: relay_cert.clone (),
use_udp_over_tcp: false,
};
let server_err = server::P4EndServer::connect (server_conf).await;
assert! (server_err.is_err ());
// Connect with wrong cert, should fail
let server_conf = server::Config {
debug_echo: false,
id: "bogus".into (),
relay_addr: ([127, 0, 0, 1], relay_quic_port).into (),
relay_cert: vec! [],
use_udp_over_tcp: false,
};
let server_err = server::P4EndServer::connect (server_conf).await;
assert! (server_err.is_err ());
{
let m = relay_metrics.read ().await;
assert_eq! (m.connected_end_servers, 0);
}
// Connect over UDP
let server_conf = server::Config {
debug_echo: false,
id: "bogus_VZBNRUA5".into (),
relay_addr: ([127, 0, 0, 1], relay_quic_port).into (),
relay_cert: relay_cert.clone (),
use_udp_over_tcp: false,
};
let t = Instant::now ();
let (server, _) = server::P4EndServer::connect (server_conf).await?;
let dur = t.elapsed ();
assert! (dur < Duration::from_millis (1_000), "{:?}", dur);
{
let m = relay_metrics.read ().await;
assert_eq! (m.connected_end_servers, 1);
}
// Connect over TCP
let server_conf = server::Config {
debug_echo: false,
id: "bogus_6E5CZIAI".into (),
relay_addr: ([127, 0, 0, 1], tcp_listen_port).into (),
relay_cert: relay_cert.clone (),
use_udp_over_tcp: true,
};
let t = Instant::now ();
let (server, _) = server::P4EndServer::connect (server_conf).await?;
let dur = t.elapsed ();
assert! (dur < Duration::from_millis (1_000), "{:?}", dur);
{
let m = relay_metrics.read ().await;
assert_eq! (m.connected_end_servers, 2);
}
Ok (())
}

View File

@ -12,15 +12,19 @@ repository = "https://six-five-six-four.com/git/reactor/ptth"
[dependencies]
anyhow = "1.0.38"
blake3 = "1.0.0"
fltk = "1.2.8"
fltk = "1.3.24"
ptth_quic = { path = "../ptth_quic" }
quinn = "0.8.5"
quinn = "0.9.3"
rand = "0.8.4"
rand_chacha = "0.3.1"
reqwest = "0.11.4"
rmp-serde = "0.15.5"
serde = "1.0.130"
structopt = "0.3.20"
tokio = { version = "1.8.1", features = ["full"] }
tracing-subscriber = "0.2.16"
tracing = "0.1.25"
[dependencies.reqwest]
version = "0.11.4"
default-features = false
features = ["stream", "rustls-tls", "hyper-rustls"]

View File

@ -235,10 +235,7 @@ fn main () -> anyhow::Result <()> {
let client_id = opt.client_id.unwrap_or_else (|| "bogus_client".to_string ());
let quinn::NewConnection {
connection,
..
} = protocol::p2_connect_to_p3 (&endpoint, relay_addr, &client_id).await
let connection = protocol::p2_connect_to_p3 (&endpoint, relay_addr, &client_id).await
.context ("P2 can't connect to P3")?;
Ok::<_, anyhow::Error> (connection)

View File

@ -11,30 +11,35 @@ repository = "https://six-five-six-four.com/git/reactor/ptth"
[dependencies]
anyhow = "1.0.38"
anyhow = "1.0.66"
base64 = "0.13.0"
blake3 = "1.0.0"
chrono = { version = "0.4.19", features = ["serde"] }
chrono = { version = "0.4.23", features = ["serde"] }
clap = "2.33.3"
dashmap = "4.0.2"
futures = "0.3.7"
futures-util = "0.3.8"
handlebars = "3.5.3"
http = "0.2.3"
hyper = { version = "0.14.20", features = ["http1", "http2", "server", "stream", "tcp"] }
hyper = { version = "0.14.23", features = ["http1", "http2", "server", "stream", "tcp"] }
itertools = "0.9.0"
rand = "0.8.3"
rand = "0.8.5"
rmp-serde = "0.15.5"
rusty_ulid = "0.10.1"
serde = { version = "1.0.117", features = ["derive"] }
serde_json = "1.0.60"
serde_urlencoded = "0.7.0"
thiserror = "1.0.22"
tokio = { version = "1.8.1", features = [] }
tokio-stream = "0.1.3"
toml = "0.5.7"
tracing = "0.1.25"
rusty_ulid = "1.0.0"
serde = { version = "1.0.150", features = ["derive"] }
serde_json = "1.0.89"
serde_urlencoded = "0.7.1"
thiserror = "1.0.37"
tokio = { version = "1.23.0", features = [] }
tokio-stream = "0.1.11"
toml = "0.5.10"
tracing = "0.1.37"
tracing-futures = "0.2.4"
tracing-subscriber = "0.2.15"
ptth_core = { path = "../ptth_core", version = "2.0.0" }
[dependencies.reqwest]
version = "0.11.13"
default-features = false
features = ["stream", "rustls-tls", "hyper-rustls"]

View File

@ -144,6 +144,8 @@ pub mod file {
pub news_url: Option <String>,
pub hide_audit_log: Option <bool>,
pub webhook_url: Option <String>,
pub webhook_interval_s: Option <u32>,
}
}
@ -158,6 +160,8 @@ pub struct Config {
pub scraper_keys: HashMap <String, ScraperKey>,
pub news_url: Option <String>,
pub hide_audit_log: bool,
pub webhook_url: Option <String>,
pub webhook_interval_s: u32,
}
impl Default for Config {
@ -170,6 +174,8 @@ impl Default for Config {
scraper_keys: Default::default (),
news_url: None,
hide_audit_log: false,
webhook_url: None,
webhook_interval_s: 7200,
}
}
}
@ -199,6 +205,8 @@ impl TryFrom <file::Config> for Config {
scraper_keys,
news_url: f.news_url,
hide_audit_log: f.hide_audit_log.unwrap_or (false),
webhook_url: f.webhook_url,
webhook_interval_s: f.webhook_interval_s.unwrap_or (7200),
})
}
}

View File

@ -589,6 +589,10 @@ async fn handle_all (
use routing::Route::*;
let state = &*state;
{
let mut counters = state.webhook_counters.write ().await;
counters.requests_total += 1;
}
// The path is cloned here, so it's okay to consume the request
// later.
@ -818,7 +822,7 @@ pub async fn run_relay (
let mut request_rendezvous = state_2.request_rendezvous.lock ().await;
request_rendezvous.iter_mut ()
.for_each (|(k, v)| {
.for_each (|(_k, v)| {
match v {
RequestRendezvous::ParkedServer (_) => (),
RequestRendezvous::ParkedClients (requests) => requests.retain (|req| req.id.as_str () >= timeout_ulid.as_str ()),
@ -878,6 +882,11 @@ pub async fn run_relay (
state.audit_log.push (AuditEvent::new (AuditData::RelayStart)).await;
{
let state = state.clone ();
tokio::spawn (webhook_task (state));
}
trace! ("Serving relay on {:?}", addr);
server.with_graceful_shutdown (async {
@ -913,5 +922,64 @@ pub async fn run_relay (
Ok (())
}
async fn webhook_task (state: Arc <Relay>) {
use crate::relay_state::MonitoringCounters;
let client = reqwest::Client::default ();
let webhook_interval_s = {
let config = state.config.read ().await;
config.webhook_interval_s
};
dbg! (webhook_interval_s);
let mut interval = tokio::time::interval (std::time::Duration::from_secs (webhook_interval_s.into ()));
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
let mut tick_seq = 1;
let mut last_counters_reported = (MonitoringCounters::default (), Utc::now (), 0);
loop {
interval.tick ().await;
let webhook_url = {
let config = state.config.read ().await;
config.webhook_url.clone ()
};
let webhook_url = match webhook_url {
Some (x) => x,
None => {
continue;
},
};
let now = Utc::now ();
let counters = {
state.webhook_counters.read ().await.clone ()
};
let requests_total_diff = counters.requests_total - last_counters_reported.0.requests_total;
let j = serde_json::json! ({
"text": format! ("From tick {} to {}: Handled {} requests", last_counters_reported.2, tick_seq, requests_total_diff),
}).to_string ();
match client.post (webhook_url).body (j).send ().await {
Ok (resp) => {
if resp.status () == StatusCode::OK {
last_counters_reported = (counters, now, tick_seq);
}
else {
dbg! (resp.status ());
}
},
Err (e) => { dbg! (e); },
}
tick_seq += 1;
}
}
#[cfg (test)]
mod tests;

View File

@ -101,6 +101,16 @@ pub struct Relay {
/// Memory backend for audit logging
// TODO: Add file / database / network server logging backend
pub (crate) audit_log: BoundedVec <AuditEvent>,
/// Counters for webhook reporting
pub (crate) webhook_counters: RwLock <MonitoringCounters>,
}
#[derive (Clone, Default)]
pub (crate) struct MonitoringCounters {
pub (crate) requests_total: u64,
pub (crate) requests_by_scraper_api: HashMap <String, u64>,
pub (crate) requests_by_email: HashMap <String, u64>,
}
#[derive (Clone)]
@ -204,6 +214,7 @@ impl TryFrom <Config> for Relay {
shutdown_watch_rx,
unregistered_servers: BoundedVec::new (20),
audit_log: BoundedVec::new (256),
webhook_counters: Default::default (),
})
}
}

View File

@ -558,7 +558,7 @@ pub mod executable {
name: opt.name.or (config_file.name).ok_or (anyhow::anyhow! ("`name` must be provided in command line or config file"))?,
api_key: config_file.api_key,
relay_url: opt.relay_url.or (config_file.relay_url).ok_or (anyhow::anyhow! ("`--relay-url` must be provided in command line or `relay_url` in config file"))?,
file_server_root: opt.file_server_root.or (config_file.file_server_root).unwrap_or_else (PathBuf::new),
file_server_root: opt.file_server_root.or (config_file.file_server_root).unwrap_or_else (|| PathBuf::from (".")),
file_server_roots,
throttle_upload: opt.throttle_upload,
allow_any_client: true,

View File

@ -0,0 +1,11 @@
[package]
name = "udp_over_tcp"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.66"
tokio = { version = "1.23.0", features = ["full"] }
tracing = "0.1.37"

View File

@ -0,0 +1,59 @@
use std::{
net::{
Ipv4Addr,
SocketAddr,
SocketAddrV4,
},
sync::Arc,
};
use tokio::{
net::{
TcpSocket,
TcpStream,
UdpSocket,
},
spawn,
};
use crate::loops;
pub struct Config {
pub udp_eph_port: u16,
pub udp_local_server_port: u16,
pub tcp_server_addr: SocketAddr,
}
pub async fn main (cfg: Config) -> anyhow::Result <()> {
let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, cfg.udp_local_server_port)).await?;
udp_sock.connect ((Ipv4Addr::LOCALHOST, cfg.udp_eph_port)).await?;
let tcp_sock = TcpSocket::new_v4 ()?;
let tcp_conn = tcp_sock.connect (cfg.tcp_server_addr).await?;
main_with_sockets (udp_sock, tcp_conn).await
}
pub async fn main_with_sockets (udp_sock: UdpSocket, tcp_conn: TcpStream) -> anyhow::Result <()> {
let (tcp_read, tcp_write) = tcp_conn.into_split ();
let tx_task;
let rx_task;
{
let udp_sock = Arc::new (udp_sock);
rx_task = spawn (loops::rx (Arc::clone (&udp_sock), tcp_read));
tx_task = spawn (loops::tx (Arc::clone (&udp_sock), tcp_write));
}
tokio::select! {
_val = tx_task => {
println! ("client_main: tx_task exited, exiting");
}
_val = rx_task => {
println! ("client_main: rx_task exited, exiting");
}
}
Ok (())
}

View File

@ -0,0 +1,4 @@
pub mod client;
pub mod server;
mod loops;

View File

@ -0,0 +1,84 @@
use std::{
sync::Arc,
};
use anyhow::bail;
use tokio::{
io::{
AsyncReadExt,
AsyncWriteExt,
},
net::{
UdpSocket,
tcp,
}
};
pub async fn rx (
udp_sock: Arc <UdpSocket>,
mut tcp_read: tcp::OwnedReadHalf,
) -> anyhow::Result <()> {
for i in 0u64.. {
// Optimizes down to a bitwise AND
if i % 8_192 == 0 {
tracing::trace! ("rx loop");
}
let mut tag = [0u8, 0, 0, 0];
let bytes_read = tcp_read.read (&mut tag).await?;
if bytes_read != 4 {
bail! ("loops::rx: Couldn't read 4 bytes for tag");
}
if tag != [1, 0, 0, 0] {
bail! ("loops::rx: unexpected tag in framing");
}
let mut length = [0u8, 0, 0, 0];
let bytes_read = tcp_read.read (&mut length).await?;
if bytes_read != 4 {
bail! ("loops::rx: Couldn't read 4 bytes for tag");
}
let length = usize::try_from (u32::from_le_bytes (length))?;
if length >= 8_192 {
bail! ("loops::rx: Length too big for UDP packets");
}
let mut buf = vec! [0u8; length];
let bytes_read = tcp_read.read_exact (&mut buf).await?;
if length != bytes_read {
bail! ("loops::rx: read_exact failed");
}
buf.truncate (bytes_read);
udp_sock.send (&buf).await?;
}
Ok (())
}
pub async fn tx (
udp_sock: Arc <UdpSocket>,
mut tcp_write: tcp::OwnedWriteHalf,
) -> anyhow::Result <()>
{
for i in 0u64.. {
// Optimizes down to a bitwise AND
if i % 8_192 == 0 {
tracing::trace! ("tx loop");
}
let mut buf = vec! [0u8; 8_192];
let bytes_read = udp_sock.recv (&mut buf).await?;
buf.truncate (bytes_read);
let tag = [1u8, 0, 0, 0];
let length = u32::try_from (bytes_read)?.to_le_bytes ();
tcp_write.write_all (&tag).await?;
tcp_write.write_all (&length).await?;
tcp_write.write_all (&buf).await?;
}
Ok (())
}

View File

@ -0,0 +1,76 @@
/*
To test manually, run this 3 commands:
- Terminal A: `nc -l -u -p 9502`
- Terminal B: `cargo run -p udp_over_tcp`
- Terminal C: `nc -p 9500 -u 127.0.0.1 9501`
Terminals A and C should be connected through the UDP-over-TCP connection
*/
use std::{
net::{
Ipv4Addr,
SocketAddr,
SocketAddrV4,
},
};
use tokio::{
runtime,
spawn,
};
mod client;
mod loops;
mod server;
// The ephemeral UDP port that the PTTH_QUIC client will bind
const PORT_0: u16 = 9500;
// The well-known UDP port that the UDP-over-TCP client will bind
// The PTTH_QUIC client must connect to this instead of the real relay address
const PORT_1: u16 = 9501;
// The well-known TCP port that the UDP-over-TCP server will bind
const PORT_2: u16 = 9502;
// The well-known UDP port that the PTTH_QUIC relay will bind
const PORT_3: u16 = 9502;
fn main () -> anyhow::Result <()> {
let rt = runtime::Runtime::new ()?;
rt.block_on (async_main ())?;
Ok (())
}
async fn async_main () -> anyhow::Result <()> {
let server_cfg = server::Config {
tcp_port: PORT_2,
udp_port: PORT_3,
};
let server_app = server::Listener::new (server_cfg).await?;
let server_task = spawn (server_app.run ());
let client_cfg = client::Config {
udp_eph_port: PORT_0,
udp_local_server_port: PORT_1,
tcp_server_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, PORT_2)),
};
let client_task = spawn (client::main (client_cfg));
tokio::select! {
_val = client_task => {
println! ("Client exited, exiting");
},
_val = server_task => {
println! ("Server exited, exiting");
},
}
Ok (())
}

View File

@ -0,0 +1,88 @@
use std::{
net::{
Ipv4Addr,
SocketAddrV4,
},
sync::Arc,
};
use tokio::{
net::{
TcpListener,
TcpStream,
UdpSocket,
},
spawn,
};
use crate::loops;
#[derive (Clone)]
pub struct Config {
/// The well-known TCP port that the UDP-over-TCP server will bind
pub tcp_port: u16,
/// The well-known UDP port that the PTTH_QUIC relay will bind
pub udp_port: u16,
}
pub struct Listener {
cfg: Config,
tcp_listener: TcpListener,
}
impl Listener {
pub async fn new (cfg: Config) -> anyhow::Result <Self> {
let tcp_listener = TcpListener::bind ((Ipv4Addr::UNSPECIFIED, cfg.tcp_port)).await?;
Ok (Self {
cfg,
tcp_listener,
})
}
pub fn tcp_port (&self) -> anyhow::Result <u16> {
Ok (self.tcp_listener.local_addr ()?.port ())
}
pub async fn run (self) -> anyhow::Result <()> {
let Self {
cfg,
tcp_listener,
} = self;
loop {
let (conn, _peer_addr) = tcp_listener.accept ().await?;
let cfg = cfg.clone ();
spawn (handle_connection (cfg, conn));
}
}
}
async fn handle_connection (cfg: Config, conn: TcpStream) -> anyhow::Result <()> {
let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?;
udp_sock.connect ((Ipv4Addr::LOCALHOST, cfg.udp_port)).await?;
let (tcp_read, tcp_write) = conn.into_split ();
let rx_task;
let tx_task;
{
let udp_sock = Arc::new (udp_sock);
rx_task = spawn (loops::rx (Arc::clone (&udp_sock), tcp_read));
tx_task = spawn (loops::tx (Arc::clone (&udp_sock), tcp_write));
}
tokio::select! {
_val = tx_task => {
println! ("server_handle_connection: tx_task exited, exiting");
}
_val = rx_task => {
println! ("server_handle_connection: rx_task exited, exiting");
}
}
Ok (())
}