Compare commits
42 Commits
2a90930723
...
dfc6885b8c
Author | SHA1 | Date |
---|---|---|
_ | dfc6885b8c | |
(on company time) | 43fe373847 | |
(on company time) | 0d6290ed60 | |
(on company time) | ffeeda1e0a | |
(on company time) | 1905d85a59 | |
(on company time) | 5e7f691a60 | |
(on company time) | 200c07da2f | |
(on company time) | ce7a539413 | |
(on company time) | 96af820ffb | |
(on company time) | 74fb0cbe5a | |
(on company time) | 3f4bce85a4 | |
(on company time) | e3ff600b51 | |
(on company time) | 0992d76fdc | |
(on company time) | 27ed72b196 | |
(on company time) | 3f0272ed09 | |
(on company time) | 605c15468a | |
(on company time) | 415c8954c1 | |
(on company time) | 93be903b86 | |
(on company time) | 33fe10ee27 | |
(on company time) | 6ba988c2c8 | |
(on company time) | 0fc99e7c26 | |
(on company time) | 86e5305630 | |
(on company time) | 5eda2c4288 | |
(on company time) | 50332bab69 | |
(on company time) | 24dac2cc39 | |
(on company time) | 996543cecc | |
(on company time) | c13d1f37bf | |
(on company time) | 5a9c301747 | |
(on company time) | 91a29abb39 | |
(on company time) | 9ab3b42e32 | |
(on company time) | b53748b2c4 | |
(on company time) | e05c4fa8bf | |
(on company time) | d93c1404b7 | |
(on company time) | 7f2dc47aec | |
(on company time) | 80c2ef17be | |
(on company time) | f9e10e0f64 | |
(on company time) | 8a302f3049 | |
(on company time) | fd3c85fccd | |
(on company time) | 963631ff96 | |
(on company time) | 036193a19e | |
(on company time) | b5be7709a3 | |
(on company time) | edd7e8de54 |
|
@ -9,6 +9,10 @@
|
|||
/scope/untracked
|
||||
/scraper-secret.txt
|
||||
/target
|
||||
/untracked
|
||||
|
||||
# TLS certs used for QUIC experiments
|
||||
*.crt
|
||||
|
||||
# Kate editor temp file
|
||||
*.kate-swp
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,8 +1,8 @@
|
|||
# https://whitfin.io/speeding-up-rust-docker-builds/
|
||||
# TODO: https://stackoverflow.com/questions/57389547/how-to-define-the-context-for-a-docker-build-as-a-specific-commit-on-one-of-the
|
||||
|
||||
# rust:1.60-slim-buster
|
||||
FROM rust@sha256:c0f26a0b299a8a74cd87be0b4bd291d55aa292198bab1bafd906edd8665edb82 as build
|
||||
# docker pull rust:1.66-slim-buster
|
||||
FROM rust@sha256:98c9b1fca0c9a6183369daf9efadb57c634340ae877bb027aeadf72afdd086a3 as build
|
||||
|
||||
WORKDIR /
|
||||
ENV USER root
|
||||
|
@ -20,7 +20,8 @@ cargo new --bin crates/ptth_server && \
|
|||
cargo new --bin crates/ptth_file_server_bin && \
|
||||
cargo new --bin tools/ptth_tail && \
|
||||
cargo new --bin crates/debug_proxy && \
|
||||
cargo new --bin crates/ptth_quic
|
||||
cargo new --bin crates/ptth_quic && \
|
||||
cargo new --lib crates/udp_over_tcp
|
||||
|
||||
# copy over your manifests
|
||||
COPY ./Cargo.lock ./
|
||||
|
@ -29,6 +30,7 @@ COPY ./crates/always_equal/Cargo.toml ./crates/always_equal/
|
|||
COPY ./crates/ptth_core/Cargo.toml ./crates/ptth_core/
|
||||
COPY ./crates/ptth_relay/Cargo.toml ./crates/ptth_relay/
|
||||
COPY ./crates/ptth_quic/Cargo.toml ./crates/ptth_quic/
|
||||
COPY ./crates/udp_over_tcp/Cargo.toml ./crates/udp_over_tcp/
|
||||
|
||||
# this build step will cache your dependencies
|
||||
RUN cargo build --release -p ptth_relay
|
||||
|
|
|
@ -10,23 +10,28 @@ repository = "https://six-five-six-four.com/git/reactor/ptth"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.38"
|
||||
base64 = "0.13.0"
|
||||
ctrlc = "3.2.1"
|
||||
# fltk = "1.1.1"
|
||||
futures-util = "0.3.9"
|
||||
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] }
|
||||
quinn = "0.8.5"
|
||||
rand = "0.8.4"
|
||||
rcgen = "0.8.11"
|
||||
rmp-serde = "0.15.5"
|
||||
rustls = "0.20.4"
|
||||
structopt = "0.3.20"
|
||||
tokio = { version = "1.8.1", features = ["full"] }
|
||||
tracing-subscriber = "0.2.16"
|
||||
tracing = "0.1.25"
|
||||
anyhow = "1.0.66"
|
||||
arc-swap = "1.5.1"
|
||||
base64 = "0.20.0"
|
||||
ctrlc = "3.2.4"
|
||||
futures-util = "0.3.25"
|
||||
hyper = { version = "0.14.23", features = ["http1", "server", "stream", "tcp"] }
|
||||
quinn = "0.9.3"
|
||||
rand = "0.8.5"
|
||||
rcgen = "0.10.0"
|
||||
ring = "0.16.20"
|
||||
rmp-serde = "1.1.1"
|
||||
rustls = "0.20.7"
|
||||
rusty_ulid = "1.0.0"
|
||||
serde = "1.0.151"
|
||||
serde_json = "1.0.89"
|
||||
structopt = "0.3.26"
|
||||
tokio = { version = "1.23.0", features = ["full"] }
|
||||
tracing-subscriber = "0.3.16"
|
||||
tracing = "0.1.37"
|
||||
udp_over_tcp = { path = "../udp_over_tcp" }
|
||||
|
||||
[dependencies.reqwest]
|
||||
version = "0.11.10"
|
||||
version = "0.11.13"
|
||||
default-features = false
|
||||
features = ["stream", "rustls-tls", "hyper-rustls"]
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
# https://whitfin.io/speeding-up-rust-docker-builds/
|
||||
# TODO: https://stackoverflow.com/questions/57389547/how-to-define-the-context-for-a-docker-build-as-a-specific-commit-on-one-of-the
|
||||
|
||||
# rust:1.64-slim-buster
|
||||
FROM rust@sha256:7da4fbd2dc7176746e8e5c371aeb0bbe742598c4906fa48cb2d87a4b89d50357 as build
|
||||
# docker pull rust:1.66-slim-buster
|
||||
FROM rust@sha256:98c9b1fca0c9a6183369daf9efadb57c634340ae877bb027aeadf72afdd086a3 as build
|
||||
|
||||
WORKDIR /
|
||||
ENV USER root
|
||||
|
@ -20,7 +20,8 @@ cargo new --bin crates/ptth_server && \
|
|||
cargo new --bin crates/ptth_file_server_bin && \
|
||||
cargo new --bin tools/ptth_tail && \
|
||||
cargo new --bin crates/debug_proxy && \
|
||||
cargo new --bin crates/ptth_quic
|
||||
cargo new --bin crates/ptth_quic && \
|
||||
cargo new --lib crates/udp_over_tcp
|
||||
|
||||
# copy over your manifests
|
||||
COPY ./Cargo.lock ./
|
||||
|
@ -29,6 +30,7 @@ COPY ./crates/always_equal/Cargo.toml ./crates/always_equal/
|
|||
COPY ./crates/ptth_core/Cargo.toml ./crates/ptth_core/
|
||||
COPY ./crates/ptth_relay/Cargo.toml ./crates/ptth_relay/
|
||||
COPY ./crates/ptth_quic/Cargo.toml ./crates/ptth_quic/
|
||||
COPY ./crates/udp_over_tcp/Cargo.toml ./crates/udp_over_tcp/
|
||||
|
||||
# this build step will cache your dependencies
|
||||
RUN cargo build --release -p ptth_quic
|
||||
|
@ -39,7 +41,8 @@ src/*.rs \
|
|||
crates/always_equal/src/*.rs \
|
||||
crates/ptth_core/src/*.rs \
|
||||
crates/ptth_relay/src/*.rs \
|
||||
crates/ptth_quic/src/*.rs
|
||||
crates/ptth_quic/src/*.rs \
|
||||
crates/udp_over_tcp/src/*.rs
|
||||
|
||||
# Copy source tree
|
||||
# Yes, I tried a few variations on the syntax. Dockerfiles are just rough.
|
||||
|
@ -50,6 +53,7 @@ COPY ./crates/ptth_core ./crates/ptth_core
|
|||
COPY ./crates/ptth_relay ./crates/ptth_relay
|
||||
COPY ./handlebars/ ./handlebars
|
||||
COPY ./crates/ptth_quic ./crates/ptth_quic
|
||||
COPY ./crates/udp_over_tcp ./crates/udp_over_tcp
|
||||
|
||||
# Bug in cargo's incremental build logic, triggered by
|
||||
# Docker doing something funny with mtimes? Maybe?
|
||||
|
@ -58,6 +62,7 @@ RUN touch crates/ptth_core/src/lib.rs
|
|||
# build for release
|
||||
# gate only on ptth_relay tests for now
|
||||
RUN \
|
||||
find . && \
|
||||
cargo build --release -p ptth_quic --bin ptth_quic_relay_server && \
|
||||
cargo test --release -p ptth_quic --bin ptth_quic_relay_server
|
||||
|
||||
|
|
|
@ -61,10 +61,7 @@ impl P2Client {
|
|||
|
||||
let conf = Arc::clone (&self.conf);
|
||||
|
||||
let quinn::NewConnection {
|
||||
connection,
|
||||
..
|
||||
} = protocol::p2_connect_to_p3 (&self.endpoint, conf.relay_addr, &conf.client_id).await?;
|
||||
let connection = protocol::p2_connect_to_p3 (&self.endpoint, conf.relay_addr, &conf.client_id).await?;
|
||||
|
||||
let client_tcp_port = conf.client_tcp_port;
|
||||
|
||||
|
|
|
@ -1,16 +1,13 @@
|
|||
use std::{
|
||||
iter::FromIterator,
|
||||
};
|
||||
|
||||
use tokio::sync::watch;
|
||||
|
||||
use ptth_quic::prelude::*;
|
||||
use ptth_quic::executable_end_server as server;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main () -> anyhow::Result <()> {
|
||||
tracing_subscriber::fmt::init ();
|
||||
|
||||
let args = Vec::from_iter (std::env::args_os ());
|
||||
let args: Vec <_> = std::env::args_os ().collect ();
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
||||
|
||||
|
@ -19,5 +16,5 @@ async fn main () -> anyhow::Result <()> {
|
|||
})?;
|
||||
trace! ("Set Ctrl+C handler");
|
||||
|
||||
ptth_quic::executable_end_server::main (&args, Some (shutdown_rx)).await
|
||||
server::main (&args, Some (shutdown_rx)).await
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use tokio::sync::watch;
|
||||
|
||||
use ptth_quic::prelude::*;
|
||||
use ptth_quic::executable_relay_server as relay;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main () -> anyhow::Result <()> {
|
||||
|
@ -8,7 +9,7 @@ async fn main () -> anyhow::Result <()> {
|
|||
|
||||
tracing_subscriber::fmt::init ();
|
||||
|
||||
let opt = ptth_quic::executable_relay_server::Opt::from_args ();
|
||||
let opt = relay::Opt::from_args ();
|
||||
|
||||
let (running_tx, mut running_rx) = watch::channel (true);
|
||||
|
||||
|
@ -17,8 +18,15 @@ async fn main () -> anyhow::Result <()> {
|
|||
})?;
|
||||
trace! ("Set Ctrl+C handler");
|
||||
|
||||
let app = relay::App::new (opt).await?;
|
||||
println! ("Base64 cert: {}", base64::encode (app.server_cert ()));
|
||||
println! ("Listening on {}", app.listen_addr ());
|
||||
|
||||
tokio::fs::create_dir_all ("ptth_quic_output").await?;
|
||||
tokio::fs::write ("ptth_quic_output/quic_server.crt", app.server_cert ()).await?;
|
||||
|
||||
tokio::select! {
|
||||
val = ptth_quic::executable_relay_server::main (opt) => {
|
||||
val = app.run () => {
|
||||
|
||||
},
|
||||
val = running_rx.changed () => {
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
|
||||
#[cfg (test)]
|
||||
mod test {
|
||||
#[test]
|
||||
fn signing () -> anyhow::Result <()> {
|
||||
use std::fs;
|
||||
use ring::{
|
||||
signature::{
|
||||
self,
|
||||
Ed25519KeyPair,
|
||||
KeyPair,
|
||||
},
|
||||
};
|
||||
|
||||
fs::create_dir_all ("untracked")?;
|
||||
|
||||
let rng = ring::rand::SystemRandom::new ();
|
||||
let pkcs8_bytes = Ed25519KeyPair::generate_pkcs8 (&rng).map_err (|_| anyhow::anyhow! ("generate_pkcs8"))?;
|
||||
|
||||
let key_pair = Ed25519KeyPair::from_pkcs8 (pkcs8_bytes.as_ref ()).map_err (|_| anyhow::anyhow! ("from_pkcs8"))?;
|
||||
|
||||
const MESSAGE: &[u8] = b":V";
|
||||
let sig = key_pair.sign (MESSAGE);
|
||||
|
||||
let peer_public_key_bytes = key_pair.public_key ().as_ref ();
|
||||
let peer_public_key = signature::UnparsedPublicKey::new (&signature::ED25519, peer_public_key_bytes);
|
||||
|
||||
peer_public_key.verify (MESSAGE, sig.as_ref ()).map_err (|_| anyhow::anyhow! ("verify"))?;
|
||||
|
||||
Ok (())
|
||||
}
|
||||
}
|
|
@ -10,7 +10,7 @@ use protocol::PeerId;
|
|||
/// A partially-filled-out config that structopt can deal with
|
||||
/// Try to turn this into a Config as soon as possible.
|
||||
#[derive (Debug, StructOpt)]
|
||||
struct Opt {
|
||||
pub struct Opt {
|
||||
#[structopt (long)]
|
||||
relay_addr: Option <String>,
|
||||
#[structopt (long)]
|
||||
|
@ -19,6 +19,18 @@ struct Opt {
|
|||
debug_echo: bool,
|
||||
#[structopt (long)]
|
||||
cert_url: Option <String>,
|
||||
#[structopt (long)]
|
||||
use_udp_over_tcp: Option <bool>,
|
||||
}
|
||||
|
||||
/// A filled-out config for constructing an end server
|
||||
#[derive (Clone)]
|
||||
pub (crate) struct Config {
|
||||
pub debug_echo: bool,
|
||||
pub id: String,
|
||||
pub relay_addr: SocketAddr,
|
||||
pub relay_cert: Vec <u8>,
|
||||
pub use_udp_over_tcp: bool,
|
||||
}
|
||||
|
||||
pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool>>) -> anyhow::Result <()> {
|
||||
|
@ -26,10 +38,9 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool
|
|||
let opt = Opt::from_iter (args);
|
||||
let conf = opt.into_config ().await?;
|
||||
|
||||
let end_server = Arc::new (P4EndServer::connect (conf)?);
|
||||
let (end_server, shutdown_tx) = P4EndServer::connect (conf).await?;
|
||||
|
||||
let run_task = {
|
||||
let end_server = Arc::clone (&end_server);
|
||||
tokio::spawn (async move {
|
||||
end_server.run ().await?;
|
||||
Ok::<_, anyhow::Error> (())
|
||||
|
@ -40,7 +51,8 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool
|
|||
while ! *shutdown_rx.borrow () {
|
||||
shutdown_rx.changed ().await?;
|
||||
}
|
||||
end_server.shut_down ()?;
|
||||
trace! ("P4 end server shutting down...");
|
||||
shutdown_tx.send (true)?
|
||||
}
|
||||
|
||||
run_task.await??;
|
||||
|
@ -50,21 +62,12 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool
|
|||
Ok (())
|
||||
}
|
||||
|
||||
/// A filled-out config for constructing an end server
|
||||
#[derive (Clone)]
|
||||
pub struct Config {
|
||||
pub debug_echo: bool,
|
||||
pub id: String,
|
||||
pub relay_addr: SocketAddr,
|
||||
pub relay_cert: Vec <u8>,
|
||||
}
|
||||
|
||||
impl Opt {
|
||||
/// Converts self into a Config that the server can use.
|
||||
/// Performs I/O to load the relay cert from disk or from HTTP.
|
||||
/// Fails if arguments can't be parsed or if I/O fails.
|
||||
|
||||
pub async fn into_config (self) -> anyhow::Result <Config> {
|
||||
pub (crate) async fn into_config (self) -> anyhow::Result <Config> {
|
||||
let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ());
|
||||
|
||||
let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
|
||||
|
@ -83,53 +86,74 @@ impl Opt {
|
|||
id,
|
||||
relay_addr,
|
||||
relay_cert,
|
||||
use_udp_over_tcp: self.use_udp_over_tcp.unwrap_or (false),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct P4EndServer {
|
||||
conf: Config,
|
||||
conn: quinn::Connection,
|
||||
endpoint: quinn::Endpoint,
|
||||
conf: Arc <Config>,
|
||||
shutdown_tx: watch::Sender <bool>,
|
||||
shutdown_rx: watch::Receiver <bool>,
|
||||
}
|
||||
|
||||
impl P4EndServer {
|
||||
pub fn connect (conf: Config) -> anyhow::Result <Self> {
|
||||
trace! ("P4 end server making its QUIC endpoint");
|
||||
pub (crate) async fn connect (conf: Config) -> anyhow::Result <(Self, watch::Sender <bool>)> {
|
||||
debug! ("P4 end server making its QUIC endpoint");
|
||||
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?;
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
||||
let conf = if conf.use_udp_over_tcp {
|
||||
let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?;
|
||||
udp_sock.connect ((Ipv4Addr::LOCALHOST, endpoint.local_addr ()?.port ())).await?;
|
||||
|
||||
Ok (P4EndServer {
|
||||
conf: Arc::new (conf),
|
||||
endpoint,
|
||||
shutdown_tx,
|
||||
shutdown_rx,
|
||||
})
|
||||
}
|
||||
let udp_local_server_port = udp_sock.local_addr ()?.port ();
|
||||
|
||||
pub fn config (&self) -> &Config {
|
||||
&*self.conf
|
||||
}
|
||||
let tcp_sock = TcpSocket::new_v4 ()?;
|
||||
let tcp_conn = tcp_sock.connect (conf.relay_addr).await?;
|
||||
|
||||
pub async fn run (&self) -> anyhow::Result <()> {
|
||||
trace! ("P4 end server connecting to P3 relay server");
|
||||
let quinn::NewConnection {
|
||||
mut bi_streams,
|
||||
..
|
||||
} = protocol::p4_connect_to_p3 (
|
||||
&self.endpoint,
|
||||
self.conf.relay_addr,
|
||||
&self.conf.id
|
||||
tokio::spawn (async move {
|
||||
udp_over_tcp::client::main_with_sockets (udp_sock, tcp_conn).await
|
||||
});
|
||||
|
||||
Config {
|
||||
debug_echo: conf.debug_echo,
|
||||
id: conf.id,
|
||||
relay_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, udp_local_server_port)),
|
||||
relay_cert: conf.relay_cert,
|
||||
use_udp_over_tcp: true,
|
||||
}
|
||||
}
|
||||
else {
|
||||
conf
|
||||
};
|
||||
|
||||
debug! ("P4 end server connecting to P3 relay server");
|
||||
let conn = protocol::p4_connect_to_p3 (
|
||||
&endpoint,
|
||||
conf.relay_addr,
|
||||
&conf.id
|
||||
).await?;
|
||||
|
||||
debug! ("Connected to relay server");
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = watch::channel (false);
|
||||
|
||||
Ok ((P4EndServer {
|
||||
conf,
|
||||
conn,
|
||||
endpoint,
|
||||
shutdown_rx,
|
||||
}, shutdown_tx))
|
||||
}
|
||||
|
||||
pub (crate) async fn run (self) -> anyhow::Result <()> {
|
||||
trace! ("Accepting bi streams from P3");
|
||||
|
||||
let mut shutdown_rx = self.shutdown_rx.clone ();
|
||||
|
||||
let conf = Arc::new (self.conf);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown_rx.changed () => {
|
||||
|
@ -138,10 +162,10 @@ impl P4EndServer {
|
|||
break;
|
||||
}
|
||||
}
|
||||
stream_opt = bi_streams.next () => {
|
||||
let (relay_send, relay_recv) = stream_opt.ok_or_else (|| anyhow::anyhow! ("P4 ran out of incoming streams. Maybe P3 shut down or disconnected?"))??;
|
||||
stream_opt = self.conn.accept_bi () => {
|
||||
let (relay_send, relay_recv) = stream_opt?;
|
||||
|
||||
tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv));
|
||||
tokio::spawn (handle_bi_stream (Arc::clone (&conf), relay_send, relay_recv));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -149,11 +173,6 @@ impl P4EndServer {
|
|||
Ok (())
|
||||
}
|
||||
|
||||
pub fn shut_down (&self) -> anyhow::Result <()> {
|
||||
trace! ("P4 end server shutting down...");
|
||||
Ok (self.shutdown_tx.send (true)?)
|
||||
}
|
||||
|
||||
pub fn shutting_down (&self) -> bool {
|
||||
*self.shutdown_rx.borrow ()
|
||||
}
|
||||
|
|
|
@ -20,142 +20,218 @@ use protocol::PeerId;
|
|||
#[derive (Debug, StructOpt)]
|
||||
pub struct Opt {
|
||||
#[structopt (long)]
|
||||
listen_addr: Option <String>,
|
||||
pub (crate) listen_addr: Option <String>,
|
||||
#[structopt (long)]
|
||||
pub (crate) tcp_listen_port: Option <u16>,
|
||||
}
|
||||
|
||||
pub async fn main (opt: Opt) -> anyhow::Result <()>
|
||||
{
|
||||
let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?;
|
||||
let (mut incoming, server_cert) = make_server_endpoint (listen_addr)?;
|
||||
println! ("Base64 cert: {}", base64::encode (&server_cert));
|
||||
pub struct App {
|
||||
endpoint: quinn::Endpoint,
|
||||
listen_addr: SocketAddr,
|
||||
pub (crate) metrics: Arc <RwLock <Metrics>>,
|
||||
server_cert: Vec <u8>,
|
||||
tcp_listener: Option <udp_over_tcp::server::Listener>,
|
||||
}
|
||||
|
||||
tokio::fs::create_dir_all ("ptth_quic_output").await?;
|
||||
tokio::fs::write ("ptth_quic_output/quic_server.crt", &server_cert).await?;
|
||||
#[derive (Default)]
|
||||
pub (crate) struct Metrics {
|
||||
pub (crate) connected_end_servers: usize,
|
||||
}
|
||||
|
||||
let relay_state = Arc::new (RelayState::default ());
|
||||
impl App {
|
||||
pub async fn new (opt: Opt) -> anyhow::Result <Self> {
|
||||
let config = load_config ().await.ok ();
|
||||
|
||||
let make_svc = {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
make_service_fn (move |_conn| {
|
||||
let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?;
|
||||
let (endpoint, server_cert) = make_server_endpoint (listen_addr)?;
|
||||
|
||||
let listen_addr = endpoint.local_addr ()?;
|
||||
let tcp_port = opt.tcp_listen_port.or (config.map (|cfg| cfg.tcp_listen_port).flatten ());
|
||||
|
||||
let tcp_listener = if let Some (tcp_port) = tcp_port {
|
||||
let cfg = udp_over_tcp::server::Config {
|
||||
tcp_port,
|
||||
udp_port: listen_addr.port (),
|
||||
};
|
||||
|
||||
Some (udp_over_tcp::server::Listener::new (cfg).await?)
|
||||
}
|
||||
else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok (Self {
|
||||
endpoint,
|
||||
listen_addr,
|
||||
metrics: Default::default (),
|
||||
server_cert,
|
||||
tcp_listener,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn listen_addr (&self) -> SocketAddr {
|
||||
self.listen_addr
|
||||
}
|
||||
|
||||
pub fn server_cert (&self) -> &[u8] {
|
||||
&self.server_cert
|
||||
}
|
||||
|
||||
pub fn tcp_listen_port (&self) -> anyhow::Result <Option <u16>> {
|
||||
match self.tcp_listener.as_ref () {
|
||||
None => Ok (None),
|
||||
Some (tcp_listener) => Ok (tcp_listener.tcp_port ()?.into ()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run (self) -> anyhow::Result <()> {
|
||||
let Self {
|
||||
endpoint,
|
||||
listen_addr,
|
||||
metrics,
|
||||
server_cert,
|
||||
tcp_listener,
|
||||
} = self;
|
||||
|
||||
let mut relay_state = RelayState::default ();
|
||||
relay_state.metrics = metrics;
|
||||
if let Err (e) = relay_state.reload_config ().await {
|
||||
error! ("{:?}", e);
|
||||
}
|
||||
let relay_state = Arc::new (relay_state);
|
||||
|
||||
let make_svc = {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
make_service_fn (move |_conn| {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
|
||||
async move {
|
||||
Ok::<_, String> (service_fn (move |req| {
|
||||
async move {
|
||||
Ok::<_, String> (service_fn (move |req| {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
|
||||
handle_http (req, relay_state)
|
||||
}))
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004));
|
||||
let http_server = Server::bind (&http_addr);
|
||||
|
||||
let _task_reload_config = {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
tokio::spawn (async move {
|
||||
let mut interval = tokio::time::interval (std::time::Duration::from_secs (60));
|
||||
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
loop {
|
||||
interval.tick ().await;
|
||||
|
||||
relay_state.reload_config ().await.ok ();
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let task_quic_server = {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
tokio::spawn (async move {
|
||||
while let Some (conn) = endpoint.accept ().await {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
|
||||
handle_http (req, relay_state)
|
||||
}))
|
||||
}
|
||||
})
|
||||
};
|
||||
// Each new peer QUIC connection gets its own task
|
||||
tokio::spawn (async move {
|
||||
let active = relay_state.stats.quic.connect ();
|
||||
|
||||
let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004));
|
||||
let http_server = Server::bind (&http_addr);
|
||||
debug! ("QUIC connections: {}", active);
|
||||
|
||||
let tcp_port = 30382;
|
||||
let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?;
|
||||
match handle_quic_connection (Arc::clone (&relay_state), conn).await {
|
||||
Ok (_) => (),
|
||||
Err (e) => warn! ("handle_quic_connection `{:?}`", e),
|
||||
}
|
||||
|
||||
let task_quic_server = {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
tokio::spawn (async move {
|
||||
while let Some (conn) = incoming.next ().await {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
let active = relay_state.stats.quic.disconnect ();
|
||||
debug! ("QUIC connections: {}", active);
|
||||
});
|
||||
}
|
||||
|
||||
// Each new peer QUIC connection gets its own task
|
||||
tokio::spawn (async move {
|
||||
let active = relay_state.stats.quic.connect ();
|
||||
debug! ("QUIC connections: {}", active);
|
||||
Ok::<_, anyhow::Error> (())
|
||||
})
|
||||
};
|
||||
|
||||
match handle_quic_connection (Arc::clone (&relay_state), conn).await {
|
||||
Ok (_) => (),
|
||||
Err (e) => warn! ("handle_quic_connection `{:?}`", e),
|
||||
}
|
||||
let task_direc_server = {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
|
||||
let active = relay_state.stats.quic.disconnect ();
|
||||
debug! ("QUIC connections: {}", active);
|
||||
});
|
||||
}
|
||||
tokio::spawn (async move {
|
||||
let sock = UdpSocket::bind("0.0.0.0:30379").await?;
|
||||
let mut buf = [0; 2048];
|
||||
loop {
|
||||
let (len, addr) = sock.recv_from (&mut buf).await?;
|
||||
debug! ("{:?} bytes received from {:?}", len, addr);
|
||||
|
||||
Ok::<_, anyhow::Error> (())
|
||||
})
|
||||
};
|
||||
let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x));
|
||||
|
||||
let task_direc_server = {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
{
|
||||
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
|
||||
|
||||
tokio::spawn (async move {
|
||||
let sock = UdpSocket::bind("0.0.0.0:30379").await?;
|
||||
let mut buf = [0; 2048];
|
||||
loop {
|
||||
let (len, addr) = sock.recv_from (&mut buf).await?;
|
||||
debug! ("{:?} bytes received from {:?}", len, addr);
|
||||
|
||||
let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x));
|
||||
|
||||
{
|
||||
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
|
||||
|
||||
if let Some (direc_state) = direc_cookies.remove (&packet) {
|
||||
debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id);
|
||||
direc_state.p2_addr.send (addr).ok ();
|
||||
}
|
||||
else {
|
||||
debug! ("UDP packet didn't match any PTTH_DIREC cookie");
|
||||
if let Some (direc_state) = direc_cookies.remove (&packet) {
|
||||
debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id);
|
||||
direc_state.p2_addr.send (addr).ok ();
|
||||
}
|
||||
else {
|
||||
debug! ("UDP packet didn't match any PTTH_DIREC cookie");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, anyhow::Error> (())
|
||||
})
|
||||
};
|
||||
|
||||
let task_http_server = tokio::spawn (async move {
|
||||
http_server.serve (make_svc).await?;
|
||||
Ok::<_, anyhow::Error> (())
|
||||
})
|
||||
};
|
||||
});
|
||||
|
||||
let task_http_server = tokio::spawn (async move {
|
||||
http_server.serve (make_svc).await?;
|
||||
Ok::<_, anyhow::Error> (())
|
||||
});
|
||||
debug! ("Serving HTTP on {:?}", http_addr);
|
||||
|
||||
let task_tcp_server = {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
tokio::spawn (async move {
|
||||
loop {
|
||||
let (tcp_socket, _) = tcp_listener.accept ().await?;
|
||||
if let Some (tcp_listener) = tcp_listener {
|
||||
tokio::spawn (async move {
|
||||
if let Err (e) = tcp_listener.run ().await {
|
||||
eprintln! ("udp_over_tcp::server::main exited with err {:?}", e);
|
||||
}
|
||||
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
Ok::<_, anyhow::Error> (())
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let config = relay_state.config.load ();
|
||||
dbg! (&config.webhook_url);
|
||||
if let Some (webhook_url) = config.webhook_url.clone () {
|
||||
let j = json! ({
|
||||
"text": "Booting up",
|
||||
}).to_string ();
|
||||
let http_client = relay_state.http_client.clone ();
|
||||
tokio::spawn (async move {
|
||||
let (_client_recv, _client_send) = tcp_socket.into_split ();
|
||||
|
||||
debug! ("Accepted direct TCP connection P1 --> P3");
|
||||
|
||||
let p4_server_proxies = relay_state.p4_server_proxies.lock ().await;
|
||||
let _p4 = match p4_server_proxies.get ("bogus_server") {
|
||||
Some (x) => x,
|
||||
None => bail! ("That server isn't connected"),
|
||||
};
|
||||
|
||||
// unimplemented! ();
|
||||
/*
|
||||
p4.req_channel.send (RequestP2ToP4 {
|
||||
client_send,
|
||||
client_recv,
|
||||
client_id: "bogus_client".to_string (),
|
||||
}).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?;
|
||||
*/
|
||||
Ok (())
|
||||
http_client.post (webhook_url).body (j).send ().await
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, anyhow::Error> (())
|
||||
})
|
||||
};
|
||||
tokio::select! {
|
||||
_val = task_quic_server => {
|
||||
eprintln! ("QUIC relay server exited, exiting");
|
||||
},
|
||||
_val = task_http_server => {
|
||||
eprintln! ("HTTP server exited, exiting");
|
||||
},
|
||||
_val = task_direc_server => {
|
||||
eprintln! ("PTTH_DIREC server exited, exiting");
|
||||
},
|
||||
}
|
||||
|
||||
debug! ("Serving HTTP on {:?}", http_addr);
|
||||
|
||||
task_quic_server.await??;
|
||||
task_http_server.await??;
|
||||
task_tcp_server.await??;
|
||||
task_direc_server.await??;
|
||||
|
||||
Ok (())
|
||||
Ok (())
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
|
||||
|
@ -178,9 +254,36 @@ async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
|
|||
|
||||
#[derive (Default)]
|
||||
struct RelayState {
|
||||
config: arc_swap::ArcSwap <Config>,
|
||||
p4_server_proxies: Mutex <HashMap <PeerId, P4State>>,
|
||||
direc_cookies: Mutex <HashMap <Vec <u8>, DirecState>>,
|
||||
metrics: Arc <RwLock <Metrics>>,
|
||||
stats: Stats,
|
||||
http_client: reqwest::Client,
|
||||
}
|
||||
|
||||
#[derive (Default)]
|
||||
struct Config {
|
||||
ip_nicknames: BTreeMap <[u8; 4], String>,
|
||||
tcp_listen_port: Option <u16>,
|
||||
webhook_url: Option <String>,
|
||||
}
|
||||
|
||||
impl From <ConfigFile> for Config {
|
||||
fn from (x: ConfigFile) -> Self {
|
||||
Self {
|
||||
ip_nicknames: x.ip_nicknames.into_iter ().collect (),
|
||||
tcp_listen_port: x.tcp_listen_port,
|
||||
webhook_url: x.webhook_url,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive (Deserialize)]
|
||||
struct ConfigFile {
|
||||
ip_nicknames: Vec <([u8; 4], String)>,
|
||||
tcp_listen_port: Option <u16>,
|
||||
webhook_url: Option <String>,
|
||||
}
|
||||
|
||||
struct DirecState {
|
||||
|
@ -222,11 +325,24 @@ impl ConnectEvents {
|
|||
|
||||
struct P4State {
|
||||
req_channel: mpsc::Sender <RequestP2ToP4>,
|
||||
}
|
||||
|
||||
async fn load_config () -> anyhow::Result <ConfigFile>
|
||||
{
|
||||
let s = tokio::fs::read_to_string ("config/ptth_quic_relay_server.json").await?;
|
||||
let cfg: ConfigFile = serde_json::from_str (&s)?;
|
||||
Ok (cfg)
|
||||
}
|
||||
|
||||
impl RelayState {
|
||||
async fn reload_config (&self) -> anyhow::Result <()> {
|
||||
let config = load_config ().await?;
|
||||
let config = Arc::new (Config::from (config));
|
||||
|
||||
self.config.store (config);
|
||||
|
||||
Ok (())
|
||||
}
|
||||
}
|
||||
|
||||
struct RequestP2ToP4 {
|
||||
|
@ -301,30 +417,69 @@ async fn handle_quic_connection (
|
|||
conn: quinn::Connecting,
|
||||
) -> anyhow::Result <()>
|
||||
{
|
||||
let mut conn = conn.await?;
|
||||
let id = Ulid::generate ();
|
||||
|
||||
let config = relay_state.config.load ();
|
||||
|
||||
let remote_addr = conn.remote_address ();
|
||||
let ip_nickname = match remote_addr {
|
||||
SocketAddr::V4 (x) => {
|
||||
let ip = x.ip ().octets ();
|
||||
|
||||
match config.ip_nicknames.get (&ip) {
|
||||
Some (nick) => nick.as_str (),
|
||||
_ => "Unknown",
|
||||
}
|
||||
},
|
||||
_ => "Unknown, not IPv4",
|
||||
};
|
||||
|
||||
debug! ("EHG7NVUD Incoming QUIC connection {} from {:?} ({})", id, remote_addr, ip_nickname);
|
||||
|
||||
if let Some (webhook_url) = config.webhook_url.clone () {
|
||||
let j = json! ({
|
||||
"text": format! ("Incoming QUIC connection from {:?} ({})", remote_addr, ip_nickname),
|
||||
}).to_string ();
|
||||
let http_client = relay_state.http_client.clone ();
|
||||
tokio::spawn (async move {
|
||||
http_client.post (webhook_url).body (j).send ().await
|
||||
});
|
||||
}
|
||||
|
||||
let conn = conn.await?;
|
||||
|
||||
// Everyone who connects must identify themselves with the first
|
||||
// bi stream
|
||||
// TODO: Timeout
|
||||
|
||||
let (mut send, mut recv) = conn.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("QUIC client didn't identify itself"))??;
|
||||
let (mut send, mut recv) = conn.accept_bi ().await?;
|
||||
|
||||
let peer = protocol::p3_accept_peer (&mut recv).await?;
|
||||
|
||||
match peer {
|
||||
protocol::P3Peer::P2ClientProxy (peer) => {
|
||||
trace! ("Accepting connection from P2 client");
|
||||
trace! ("H36JTVE5 Handling connection {} as P2 client", id);
|
||||
// TODO: Check authorization for P2 peers
|
||||
|
||||
protocol::p3_authorize_p2_peer (&mut send).await?;
|
||||
handle_p2_connection (relay_state, conn, peer).await?;
|
||||
},
|
||||
protocol::P3Peer::P4ServerProxy (peer) => {
|
||||
trace! ("Accepting connection from P4 end server");
|
||||
trace! ("LRHUKB7K Handling connection {} as P4 end server", id);
|
||||
// TODO: Check authorization for P4 peers
|
||||
|
||||
protocol::p3_authorize_p4_peer (&mut send).await?;
|
||||
let metrics = Arc::clone (&relay_state.metrics);
|
||||
|
||||
{
|
||||
let mut m = metrics.write ().await;
|
||||
m.connected_end_servers += 1;
|
||||
}
|
||||
handle_p4_connection (relay_state, conn, peer).await?;
|
||||
{
|
||||
let mut m = metrics.write ().await;
|
||||
m.connected_end_servers -= 1;
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -333,19 +488,13 @@ async fn handle_quic_connection (
|
|||
|
||||
async fn handle_p2_connection (
|
||||
relay_state: Arc <RelayState>,
|
||||
conn: quinn::NewConnection,
|
||||
conn: quinn::Connection,
|
||||
peer: protocol::P2ClientProxy,
|
||||
) -> anyhow::Result <()>
|
||||
{
|
||||
let client_id = peer.id;
|
||||
|
||||
let quinn::NewConnection {
|
||||
mut bi_streams,
|
||||
..
|
||||
} = conn;
|
||||
|
||||
while let Some (bi_stream) = bi_streams.next ().await {
|
||||
let (send, mut recv) = bi_stream?;
|
||||
while let Ok ((send, mut recv)) = conn.accept_bi ().await {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
let client_id = client_id.clone ();
|
||||
|
||||
|
@ -457,15 +606,11 @@ async fn handle_direc_p2_to_p4 (
|
|||
|
||||
async fn handle_p4_connection (
|
||||
relay_state: Arc <RelayState>,
|
||||
conn: quinn::NewConnection,
|
||||
connection: quinn::Connection,
|
||||
peer: protocol::P4ServerProxy,
|
||||
) -> anyhow::Result <()>
|
||||
{
|
||||
let server_id = peer.id;
|
||||
let quinn::NewConnection {
|
||||
connection,
|
||||
..
|
||||
} = conn;
|
||||
let (tx, mut rx) = mpsc::channel (2);
|
||||
|
||||
let p4_state = P4State {
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
pub mod client_proxy;
|
||||
pub mod connection;
|
||||
pub mod crypto;
|
||||
pub mod executable_end_server;
|
||||
pub mod executable_relay_server;
|
||||
pub mod prelude;
|
||||
pub mod protocol;
|
||||
pub mod quinn_utils;
|
||||
|
||||
#[cfg (test)]
|
||||
mod tests;
|
||||
|
|
|
@ -2,7 +2,11 @@ pub use std::{
|
|||
collections::*,
|
||||
ffi::OsString,
|
||||
iter::FromIterator,
|
||||
net::SocketAddr,
|
||||
net::{
|
||||
Ipv4Addr,
|
||||
SocketAddr,
|
||||
SocketAddrV4,
|
||||
},
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{
|
||||
|
@ -26,9 +30,14 @@ pub use tokio::{
|
|||
AsyncReadExt,
|
||||
AsyncWriteExt,
|
||||
},
|
||||
net::TcpListener,
|
||||
net::{
|
||||
TcpListener,
|
||||
TcpSocket,
|
||||
UdpSocket,
|
||||
},
|
||||
sync::{
|
||||
Mutex,
|
||||
RwLock,
|
||||
mpsc,
|
||||
},
|
||||
task::JoinHandle,
|
||||
|
@ -37,6 +46,9 @@ pub use rand::{
|
|||
Rng,
|
||||
RngCore,
|
||||
};
|
||||
pub use rusty_ulid::Ulid;
|
||||
pub use serde::Deserialize;
|
||||
pub use serde_json::json;
|
||||
pub use tracing::{
|
||||
debug,
|
||||
error,
|
||||
|
|
|
@ -33,14 +33,14 @@ pub async fn p2_connect_to_p3 (
|
|||
endpoint: &quinn::Endpoint,
|
||||
relay_addr: std::net::SocketAddr,
|
||||
client_id: &str,
|
||||
) -> Result <quinn::NewConnection>
|
||||
) -> Result <quinn::Connection>
|
||||
{
|
||||
if client_id.as_bytes ().len () > MAX_ID_LENGTH {
|
||||
bail! ("Client ID is longer than MAX_ID_LENGTH");
|
||||
}
|
||||
|
||||
let new_conn = endpoint.connect (relay_addr, "localhost")?.await?;
|
||||
let (mut send, mut recv) = new_conn.connection.open_bi ().await?;
|
||||
let (mut send, mut recv) = new_conn.open_bi ().await?;
|
||||
let cmd_type = Command::CONNECT_P2_TO_P3.0;
|
||||
|
||||
send.write_all (&[cmd_type, 0, 0, 0]).await?;
|
||||
|
@ -251,21 +251,21 @@ pub async fn p4_connect_to_p3 (
|
|||
endpoint: &quinn::Endpoint,
|
||||
relay_addr: std::net::SocketAddr,
|
||||
server_id: &str,
|
||||
) -> Result <quinn::NewConnection>
|
||||
) -> Result <quinn::Connection>
|
||||
{
|
||||
if server_id.as_bytes ().len () > MAX_ID_LENGTH {
|
||||
bail! ("Server ID is longer than MAX_ID_LENGTH");
|
||||
}
|
||||
|
||||
let new_conn = endpoint.connect (relay_addr, "localhost")?.await?;
|
||||
let (mut send, mut recv) = new_conn.connection.open_bi ().await?;
|
||||
let new_conn = endpoint.connect (relay_addr, "localhost")?.await.context ("UXTDVL2V quinn::Endpoint::connect")?;
|
||||
let (mut send, mut recv) = new_conn.open_bi ().await?;
|
||||
let cmd_type = Command::CONNECT_P4_TO_P3.0;
|
||||
|
||||
send.write_all (&[cmd_type, 0, 0, 0]).await?;
|
||||
send_lv_string (&mut send, server_id).await?;
|
||||
|
||||
expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await
|
||||
.context ("P4 didn't get OK response when connecting to P3")?;
|
||||
.context ("WMGW2RXU P4 didn't get OK response when connecting to P3")?;
|
||||
|
||||
Ok (new_conn)
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@ use std::{
|
|||
};
|
||||
|
||||
use quinn::{
|
||||
ClientConfig, Endpoint, Incoming,
|
||||
ClientConfig, Endpoint,
|
||||
ServerConfig, TransportConfig,
|
||||
};
|
||||
|
||||
|
@ -26,7 +26,7 @@ pub fn make_client_endpoint(
|
|||
let mut transport = quinn::TransportConfig::default ();
|
||||
transport.keep_alive_interval (Some (Duration::from_millis (5_000)));
|
||||
|
||||
client_cfg.transport = Arc::new (transport);
|
||||
client_cfg.transport_config (Arc::new (transport));
|
||||
|
||||
let mut endpoint = Endpoint::client (bind_addr)?;
|
||||
endpoint.set_default_client_config (client_cfg);
|
||||
|
@ -41,10 +41,10 @@ pub fn make_client_endpoint(
|
|||
/// - a stream of incoming QUIC connections
|
||||
/// - server certificate serialized into DER format
|
||||
#[allow(unused)]
|
||||
pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Incoming, Vec<u8>)> {
|
||||
pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Endpoint, Vec<u8>)> {
|
||||
let (server_config, server_cert) = configure_server()?;
|
||||
let (_endpoint, incoming) = Endpoint::server (server_config, bind_addr)?;
|
||||
Ok((incoming, server_cert))
|
||||
let endpoint = Endpoint::server (server_config, bind_addr)?;
|
||||
Ok((endpoint, server_cert))
|
||||
}
|
||||
|
||||
/// Builds default quinn client config and trusts given certificates.
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
use crate::prelude::*;
|
||||
|
||||
#[test]
|
||||
fn end_to_end () -> anyhow::Result <()> {
|
||||
let rt = tokio::runtime::Runtime::new ()?;
|
||||
rt.block_on (end_to_end_async ())?;
|
||||
Ok (())
|
||||
}
|
||||
|
||||
async fn end_to_end_async () -> anyhow::Result <()> {
|
||||
use crate::executable_end_server as server;
|
||||
use crate::executable_relay_server as relay;
|
||||
|
||||
let relay_opt = relay::Opt {
|
||||
listen_addr: "127.0.0.1:0".to_string ().into (),
|
||||
tcp_listen_port: Some (0),
|
||||
};
|
||||
let relay_app = relay::App::new (relay_opt).await?;
|
||||
|
||||
let relay_quic_port = relay_app.listen_addr ().port ();
|
||||
let relay_cert = Vec::from (relay_app.server_cert ());
|
||||
let relay_metrics = Arc::clone (&relay_app.metrics);
|
||||
let tcp_listen_port = relay_app.tcp_listen_port ()?.unwrap ();
|
||||
|
||||
assert_ne! (tcp_listen_port, 0);
|
||||
|
||||
let task_relay = tokio::spawn (async move {
|
||||
relay_app.run ().await
|
||||
});
|
||||
|
||||
{
|
||||
let m = relay_metrics.read ().await;
|
||||
assert_eq! (m.connected_end_servers, 0);
|
||||
}
|
||||
|
||||
// Connect with wrong port, should fail
|
||||
|
||||
let server_conf = server::Config {
|
||||
debug_echo: false,
|
||||
id: "bogus".into (),
|
||||
relay_addr: "127.0.0.1:80".parse ()?,
|
||||
relay_cert: relay_cert.clone (),
|
||||
use_udp_over_tcp: false,
|
||||
};
|
||||
|
||||
let server_err = server::P4EndServer::connect (server_conf).await;
|
||||
|
||||
assert! (server_err.is_err ());
|
||||
|
||||
// Connect with wrong cert, should fail
|
||||
|
||||
let server_conf = server::Config {
|
||||
debug_echo: false,
|
||||
id: "bogus".into (),
|
||||
relay_addr: ([127, 0, 0, 1], relay_quic_port).into (),
|
||||
relay_cert: vec! [],
|
||||
use_udp_over_tcp: false,
|
||||
};
|
||||
|
||||
let server_err = server::P4EndServer::connect (server_conf).await;
|
||||
|
||||
assert! (server_err.is_err ());
|
||||
|
||||
{
|
||||
let m = relay_metrics.read ().await;
|
||||
assert_eq! (m.connected_end_servers, 0);
|
||||
}
|
||||
|
||||
// Connect over UDP
|
||||
|
||||
let server_conf = server::Config {
|
||||
debug_echo: false,
|
||||
id: "bogus_VZBNRUA5".into (),
|
||||
relay_addr: ([127, 0, 0, 1], relay_quic_port).into (),
|
||||
relay_cert: relay_cert.clone (),
|
||||
use_udp_over_tcp: false,
|
||||
};
|
||||
|
||||
let t = Instant::now ();
|
||||
let (server, _) = server::P4EndServer::connect (server_conf).await?;
|
||||
let dur = t.elapsed ();
|
||||
assert! (dur < Duration::from_millis (1_000), "{:?}", dur);
|
||||
|
||||
{
|
||||
let m = relay_metrics.read ().await;
|
||||
assert_eq! (m.connected_end_servers, 1);
|
||||
}
|
||||
|
||||
// Connect over TCP
|
||||
|
||||
let server_conf = server::Config {
|
||||
debug_echo: false,
|
||||
id: "bogus_6E5CZIAI".into (),
|
||||
relay_addr: ([127, 0, 0, 1], tcp_listen_port).into (),
|
||||
relay_cert: relay_cert.clone (),
|
||||
use_udp_over_tcp: true,
|
||||
};
|
||||
|
||||
let t = Instant::now ();
|
||||
let (server, _) = server::P4EndServer::connect (server_conf).await?;
|
||||
let dur = t.elapsed ();
|
||||
assert! (dur < Duration::from_millis (1_000), "{:?}", dur);
|
||||
|
||||
{
|
||||
let m = relay_metrics.read ().await;
|
||||
assert_eq! (m.connected_end_servers, 2);
|
||||
}
|
||||
|
||||
Ok (())
|
||||
}
|
|
@ -12,15 +12,19 @@ repository = "https://six-five-six-four.com/git/reactor/ptth"
|
|||
[dependencies]
|
||||
anyhow = "1.0.38"
|
||||
blake3 = "1.0.0"
|
||||
fltk = "1.2.8"
|
||||
fltk = "1.3.24"
|
||||
ptth_quic = { path = "../ptth_quic" }
|
||||
quinn = "0.8.5"
|
||||
quinn = "0.9.3"
|
||||
rand = "0.8.4"
|
||||
rand_chacha = "0.3.1"
|
||||
reqwest = "0.11.4"
|
||||
rmp-serde = "0.15.5"
|
||||
serde = "1.0.130"
|
||||
structopt = "0.3.20"
|
||||
tokio = { version = "1.8.1", features = ["full"] }
|
||||
tracing-subscriber = "0.2.16"
|
||||
tracing = "0.1.25"
|
||||
|
||||
[dependencies.reqwest]
|
||||
version = "0.11.4"
|
||||
default-features = false
|
||||
features = ["stream", "rustls-tls", "hyper-rustls"]
|
||||
|
|
|
@ -235,10 +235,7 @@ fn main () -> anyhow::Result <()> {
|
|||
|
||||
let client_id = opt.client_id.unwrap_or_else (|| "bogus_client".to_string ());
|
||||
|
||||
let quinn::NewConnection {
|
||||
connection,
|
||||
..
|
||||
} = protocol::p2_connect_to_p3 (&endpoint, relay_addr, &client_id).await
|
||||
let connection = protocol::p2_connect_to_p3 (&endpoint, relay_addr, &client_id).await
|
||||
.context ("P2 can't connect to P3")?;
|
||||
|
||||
Ok::<_, anyhow::Error> (connection)
|
||||
|
|
|
@ -11,30 +11,35 @@ repository = "https://six-five-six-four.com/git/reactor/ptth"
|
|||
|
||||
[dependencies]
|
||||
|
||||
anyhow = "1.0.38"
|
||||
anyhow = "1.0.66"
|
||||
base64 = "0.13.0"
|
||||
blake3 = "1.0.0"
|
||||
chrono = { version = "0.4.19", features = ["serde"] }
|
||||
chrono = { version = "0.4.23", features = ["serde"] }
|
||||
clap = "2.33.3"
|
||||
dashmap = "4.0.2"
|
||||
futures = "0.3.7"
|
||||
futures-util = "0.3.8"
|
||||
handlebars = "3.5.3"
|
||||
http = "0.2.3"
|
||||
hyper = { version = "0.14.20", features = ["http1", "http2", "server", "stream", "tcp"] }
|
||||
hyper = { version = "0.14.23", features = ["http1", "http2", "server", "stream", "tcp"] }
|
||||
itertools = "0.9.0"
|
||||
rand = "0.8.3"
|
||||
rand = "0.8.5"
|
||||
rmp-serde = "0.15.5"
|
||||
rusty_ulid = "0.10.1"
|
||||
serde = { version = "1.0.117", features = ["derive"] }
|
||||
serde_json = "1.0.60"
|
||||
serde_urlencoded = "0.7.0"
|
||||
thiserror = "1.0.22"
|
||||
tokio = { version = "1.8.1", features = [] }
|
||||
tokio-stream = "0.1.3"
|
||||
toml = "0.5.7"
|
||||
tracing = "0.1.25"
|
||||
rusty_ulid = "1.0.0"
|
||||
serde = { version = "1.0.150", features = ["derive"] }
|
||||
serde_json = "1.0.89"
|
||||
serde_urlencoded = "0.7.1"
|
||||
thiserror = "1.0.37"
|
||||
tokio = { version = "1.23.0", features = [] }
|
||||
tokio-stream = "0.1.11"
|
||||
toml = "0.5.10"
|
||||
tracing = "0.1.37"
|
||||
tracing-futures = "0.2.4"
|
||||
tracing-subscriber = "0.2.15"
|
||||
|
||||
ptth_core = { path = "../ptth_core", version = "2.0.0" }
|
||||
|
||||
[dependencies.reqwest]
|
||||
version = "0.11.13"
|
||||
default-features = false
|
||||
features = ["stream", "rustls-tls", "hyper-rustls"]
|
||||
|
|
|
@ -144,6 +144,8 @@ pub mod file {
|
|||
|
||||
pub news_url: Option <String>,
|
||||
pub hide_audit_log: Option <bool>,
|
||||
pub webhook_url: Option <String>,
|
||||
pub webhook_interval_s: Option <u32>,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -158,6 +160,8 @@ pub struct Config {
|
|||
pub scraper_keys: HashMap <String, ScraperKey>,
|
||||
pub news_url: Option <String>,
|
||||
pub hide_audit_log: bool,
|
||||
pub webhook_url: Option <String>,
|
||||
pub webhook_interval_s: u32,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
|
@ -170,6 +174,8 @@ impl Default for Config {
|
|||
scraper_keys: Default::default (),
|
||||
news_url: None,
|
||||
hide_audit_log: false,
|
||||
webhook_url: None,
|
||||
webhook_interval_s: 7200,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -199,6 +205,8 @@ impl TryFrom <file::Config> for Config {
|
|||
scraper_keys,
|
||||
news_url: f.news_url,
|
||||
hide_audit_log: f.hide_audit_log.unwrap_or (false),
|
||||
webhook_url: f.webhook_url,
|
||||
webhook_interval_s: f.webhook_interval_s.unwrap_or (7200),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -589,6 +589,10 @@ async fn handle_all (
|
|||
use routing::Route::*;
|
||||
|
||||
let state = &*state;
|
||||
{
|
||||
let mut counters = state.webhook_counters.write ().await;
|
||||
counters.requests_total += 1;
|
||||
}
|
||||
|
||||
// The path is cloned here, so it's okay to consume the request
|
||||
// later.
|
||||
|
@ -818,7 +822,7 @@ pub async fn run_relay (
|
|||
|
||||
let mut request_rendezvous = state_2.request_rendezvous.lock ().await;
|
||||
request_rendezvous.iter_mut ()
|
||||
.for_each (|(k, v)| {
|
||||
.for_each (|(_k, v)| {
|
||||
match v {
|
||||
RequestRendezvous::ParkedServer (_) => (),
|
||||
RequestRendezvous::ParkedClients (requests) => requests.retain (|req| req.id.as_str () >= timeout_ulid.as_str ()),
|
||||
|
@ -878,6 +882,11 @@ pub async fn run_relay (
|
|||
|
||||
state.audit_log.push (AuditEvent::new (AuditData::RelayStart)).await;
|
||||
|
||||
{
|
||||
let state = state.clone ();
|
||||
tokio::spawn (webhook_task (state));
|
||||
}
|
||||
|
||||
trace! ("Serving relay on {:?}", addr);
|
||||
|
||||
server.with_graceful_shutdown (async {
|
||||
|
@ -913,5 +922,64 @@ pub async fn run_relay (
|
|||
Ok (())
|
||||
}
|
||||
|
||||
async fn webhook_task (state: Arc <Relay>) {
|
||||
use crate::relay_state::MonitoringCounters;
|
||||
|
||||
let client = reqwest::Client::default ();
|
||||
|
||||
let webhook_interval_s = {
|
||||
let config = state.config.read ().await;
|
||||
config.webhook_interval_s
|
||||
};
|
||||
dbg! (webhook_interval_s);
|
||||
|
||||
let mut interval = tokio::time::interval (std::time::Duration::from_secs (webhook_interval_s.into ()));
|
||||
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
let mut tick_seq = 1;
|
||||
let mut last_counters_reported = (MonitoringCounters::default (), Utc::now (), 0);
|
||||
|
||||
loop {
|
||||
interval.tick ().await;
|
||||
|
||||
let webhook_url = {
|
||||
let config = state.config.read ().await;
|
||||
config.webhook_url.clone ()
|
||||
};
|
||||
|
||||
let webhook_url = match webhook_url {
|
||||
Some (x) => x,
|
||||
None => {
|
||||
continue;
|
||||
},
|
||||
};
|
||||
|
||||
let now = Utc::now ();
|
||||
|
||||
let counters = {
|
||||
state.webhook_counters.read ().await.clone ()
|
||||
};
|
||||
|
||||
let requests_total_diff = counters.requests_total - last_counters_reported.0.requests_total;
|
||||
|
||||
let j = serde_json::json! ({
|
||||
"text": format! ("From tick {} to {}: Handled {} requests", last_counters_reported.2, tick_seq, requests_total_diff),
|
||||
}).to_string ();
|
||||
|
||||
match client.post (webhook_url).body (j).send ().await {
|
||||
Ok (resp) => {
|
||||
if resp.status () == StatusCode::OK {
|
||||
last_counters_reported = (counters, now, tick_seq);
|
||||
}
|
||||
else {
|
||||
dbg! (resp.status ());
|
||||
}
|
||||
},
|
||||
Err (e) => { dbg! (e); },
|
||||
}
|
||||
tick_seq += 1;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg (test)]
|
||||
mod tests;
|
||||
|
|
|
@ -101,6 +101,16 @@ pub struct Relay {
|
|||
/// Memory backend for audit logging
|
||||
// TODO: Add file / database / network server logging backend
|
||||
pub (crate) audit_log: BoundedVec <AuditEvent>,
|
||||
|
||||
/// Counters for webhook reporting
|
||||
pub (crate) webhook_counters: RwLock <MonitoringCounters>,
|
||||
}
|
||||
|
||||
#[derive (Clone, Default)]
|
||||
pub (crate) struct MonitoringCounters {
|
||||
pub (crate) requests_total: u64,
|
||||
pub (crate) requests_by_scraper_api: HashMap <String, u64>,
|
||||
pub (crate) requests_by_email: HashMap <String, u64>,
|
||||
}
|
||||
|
||||
#[derive (Clone)]
|
||||
|
@ -204,6 +214,7 @@ impl TryFrom <Config> for Relay {
|
|||
shutdown_watch_rx,
|
||||
unregistered_servers: BoundedVec::new (20),
|
||||
audit_log: BoundedVec::new (256),
|
||||
webhook_counters: Default::default (),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -558,7 +558,7 @@ pub mod executable {
|
|||
name: opt.name.or (config_file.name).ok_or (anyhow::anyhow! ("`name` must be provided in command line or config file"))?,
|
||||
api_key: config_file.api_key,
|
||||
relay_url: opt.relay_url.or (config_file.relay_url).ok_or (anyhow::anyhow! ("`--relay-url` must be provided in command line or `relay_url` in config file"))?,
|
||||
file_server_root: opt.file_server_root.or (config_file.file_server_root).unwrap_or_else (PathBuf::new),
|
||||
file_server_root: opt.file_server_root.or (config_file.file_server_root).unwrap_or_else (|| PathBuf::from (".")),
|
||||
file_server_roots,
|
||||
throttle_upload: opt.throttle_upload,
|
||||
allow_any_client: true,
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
[package]
|
||||
name = "udp_over_tcp"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.66"
|
||||
tokio = { version = "1.23.0", features = ["full"] }
|
||||
tracing = "0.1.37"
|
|
@ -0,0 +1,59 @@
|
|||
use std::{
|
||||
net::{
|
||||
Ipv4Addr,
|
||||
SocketAddr,
|
||||
SocketAddrV4,
|
||||
},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use tokio::{
|
||||
net::{
|
||||
TcpSocket,
|
||||
TcpStream,
|
||||
UdpSocket,
|
||||
},
|
||||
spawn,
|
||||
};
|
||||
|
||||
use crate::loops;
|
||||
|
||||
pub struct Config {
|
||||
pub udp_eph_port: u16,
|
||||
pub udp_local_server_port: u16,
|
||||
pub tcp_server_addr: SocketAddr,
|
||||
}
|
||||
|
||||
pub async fn main (cfg: Config) -> anyhow::Result <()> {
|
||||
let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, cfg.udp_local_server_port)).await?;
|
||||
udp_sock.connect ((Ipv4Addr::LOCALHOST, cfg.udp_eph_port)).await?;
|
||||
|
||||
let tcp_sock = TcpSocket::new_v4 ()?;
|
||||
let tcp_conn = tcp_sock.connect (cfg.tcp_server_addr).await?;
|
||||
|
||||
main_with_sockets (udp_sock, tcp_conn).await
|
||||
}
|
||||
|
||||
pub async fn main_with_sockets (udp_sock: UdpSocket, tcp_conn: TcpStream) -> anyhow::Result <()> {
|
||||
let (tcp_read, tcp_write) = tcp_conn.into_split ();
|
||||
|
||||
let tx_task;
|
||||
let rx_task;
|
||||
|
||||
{
|
||||
let udp_sock = Arc::new (udp_sock);
|
||||
rx_task = spawn (loops::rx (Arc::clone (&udp_sock), tcp_read));
|
||||
tx_task = spawn (loops::tx (Arc::clone (&udp_sock), tcp_write));
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
_val = tx_task => {
|
||||
println! ("client_main: tx_task exited, exiting");
|
||||
}
|
||||
_val = rx_task => {
|
||||
println! ("client_main: rx_task exited, exiting");
|
||||
}
|
||||
}
|
||||
|
||||
Ok (())
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
pub mod client;
|
||||
pub mod server;
|
||||
|
||||
mod loops;
|
|
@ -0,0 +1,84 @@
|
|||
use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::bail;
|
||||
use tokio::{
|
||||
io::{
|
||||
AsyncReadExt,
|
||||
AsyncWriteExt,
|
||||
},
|
||||
net::{
|
||||
UdpSocket,
|
||||
tcp,
|
||||
}
|
||||
};
|
||||
|
||||
pub async fn rx (
|
||||
udp_sock: Arc <UdpSocket>,
|
||||
mut tcp_read: tcp::OwnedReadHalf,
|
||||
) -> anyhow::Result <()> {
|
||||
for i in 0u64.. {
|
||||
// Optimizes down to a bitwise AND
|
||||
if i % 8_192 == 0 {
|
||||
tracing::trace! ("rx loop");
|
||||
}
|
||||
|
||||
let mut tag = [0u8, 0, 0, 0];
|
||||
let bytes_read = tcp_read.read (&mut tag).await?;
|
||||
if bytes_read != 4 {
|
||||
bail! ("loops::rx: Couldn't read 4 bytes for tag");
|
||||
}
|
||||
if tag != [1, 0, 0, 0] {
|
||||
bail! ("loops::rx: unexpected tag in framing");
|
||||
}
|
||||
|
||||
let mut length = [0u8, 0, 0, 0];
|
||||
let bytes_read = tcp_read.read (&mut length).await?;
|
||||
if bytes_read != 4 {
|
||||
bail! ("loops::rx: Couldn't read 4 bytes for tag");
|
||||
}
|
||||
|
||||
let length = usize::try_from (u32::from_le_bytes (length))?;
|
||||
if length >= 8_192 {
|
||||
bail! ("loops::rx: Length too big for UDP packets");
|
||||
}
|
||||
|
||||
let mut buf = vec! [0u8; length];
|
||||
let bytes_read = tcp_read.read_exact (&mut buf).await?;
|
||||
if length != bytes_read {
|
||||
bail! ("loops::rx: read_exact failed");
|
||||
}
|
||||
buf.truncate (bytes_read);
|
||||
|
||||
udp_sock.send (&buf).await?;
|
||||
}
|
||||
|
||||
Ok (())
|
||||
}
|
||||
|
||||
pub async fn tx (
|
||||
udp_sock: Arc <UdpSocket>,
|
||||
mut tcp_write: tcp::OwnedWriteHalf,
|
||||
) -> anyhow::Result <()>
|
||||
{
|
||||
for i in 0u64.. {
|
||||
// Optimizes down to a bitwise AND
|
||||
if i % 8_192 == 0 {
|
||||
tracing::trace! ("tx loop");
|
||||
}
|
||||
|
||||
let mut buf = vec! [0u8; 8_192];
|
||||
let bytes_read = udp_sock.recv (&mut buf).await?;
|
||||
buf.truncate (bytes_read);
|
||||
|
||||
let tag = [1u8, 0, 0, 0];
|
||||
let length = u32::try_from (bytes_read)?.to_le_bytes ();
|
||||
|
||||
tcp_write.write_all (&tag).await?;
|
||||
tcp_write.write_all (&length).await?;
|
||||
tcp_write.write_all (&buf).await?;
|
||||
}
|
||||
|
||||
Ok (())
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
|
||||
To test manually, run this 3 commands:
|
||||
|
||||
- Terminal A: `nc -l -u -p 9502`
|
||||
- Terminal B: `cargo run -p udp_over_tcp`
|
||||
- Terminal C: `nc -p 9500 -u 127.0.0.1 9501`
|
||||
|
||||
Terminals A and C should be connected through the UDP-over-TCP connection
|
||||
|
||||
*/
|
||||
|
||||
use std::{
|
||||
net::{
|
||||
Ipv4Addr,
|
||||
SocketAddr,
|
||||
SocketAddrV4,
|
||||
},
|
||||
};
|
||||
|
||||
use tokio::{
|
||||
runtime,
|
||||
spawn,
|
||||
};
|
||||
|
||||
mod client;
|
||||
mod loops;
|
||||
mod server;
|
||||
|
||||
// The ephemeral UDP port that the PTTH_QUIC client will bind
|
||||
const PORT_0: u16 = 9500;
|
||||
|
||||
// The well-known UDP port that the UDP-over-TCP client will bind
|
||||
// The PTTH_QUIC client must connect to this instead of the real relay address
|
||||
const PORT_1: u16 = 9501;
|
||||
|
||||
// The well-known TCP port that the UDP-over-TCP server will bind
|
||||
const PORT_2: u16 = 9502;
|
||||
|
||||
// The well-known UDP port that the PTTH_QUIC relay will bind
|
||||
const PORT_3: u16 = 9502;
|
||||
|
||||
fn main () -> anyhow::Result <()> {
|
||||
let rt = runtime::Runtime::new ()?;
|
||||
|
||||
rt.block_on (async_main ())?;
|
||||
|
||||
Ok (())
|
||||
}
|
||||
|
||||
async fn async_main () -> anyhow::Result <()> {
|
||||
let server_cfg = server::Config {
|
||||
tcp_port: PORT_2,
|
||||
udp_port: PORT_3,
|
||||
};
|
||||
let server_app = server::Listener::new (server_cfg).await?;
|
||||
let server_task = spawn (server_app.run ());
|
||||
|
||||
let client_cfg = client::Config {
|
||||
udp_eph_port: PORT_0,
|
||||
udp_local_server_port: PORT_1,
|
||||
tcp_server_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, PORT_2)),
|
||||
};
|
||||
let client_task = spawn (client::main (client_cfg));
|
||||
|
||||
tokio::select! {
|
||||
_val = client_task => {
|
||||
println! ("Client exited, exiting");
|
||||
},
|
||||
_val = server_task => {
|
||||
println! ("Server exited, exiting");
|
||||
},
|
||||
}
|
||||
|
||||
Ok (())
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
use std::{
|
||||
net::{
|
||||
Ipv4Addr,
|
||||
SocketAddrV4,
|
||||
},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use tokio::{
|
||||
net::{
|
||||
TcpListener,
|
||||
TcpStream,
|
||||
UdpSocket,
|
||||
},
|
||||
spawn,
|
||||
};
|
||||
|
||||
use crate::loops;
|
||||
|
||||
#[derive (Clone)]
|
||||
pub struct Config {
|
||||
/// The well-known TCP port that the UDP-over-TCP server will bind
|
||||
pub tcp_port: u16,
|
||||
|
||||
/// The well-known UDP port that the PTTH_QUIC relay will bind
|
||||
pub udp_port: u16,
|
||||
}
|
||||
|
||||
pub struct Listener {
|
||||
cfg: Config,
|
||||
tcp_listener: TcpListener,
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
pub async fn new (cfg: Config) -> anyhow::Result <Self> {
|
||||
let tcp_listener = TcpListener::bind ((Ipv4Addr::UNSPECIFIED, cfg.tcp_port)).await?;
|
||||
|
||||
Ok (Self {
|
||||
cfg,
|
||||
tcp_listener,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn tcp_port (&self) -> anyhow::Result <u16> {
|
||||
Ok (self.tcp_listener.local_addr ()?.port ())
|
||||
}
|
||||
|
||||
pub async fn run (self) -> anyhow::Result <()> {
|
||||
let Self {
|
||||
cfg,
|
||||
tcp_listener,
|
||||
} = self;
|
||||
|
||||
loop {
|
||||
let (conn, _peer_addr) = tcp_listener.accept ().await?;
|
||||
|
||||
let cfg = cfg.clone ();
|
||||
spawn (handle_connection (cfg, conn));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_connection (cfg: Config, conn: TcpStream) -> anyhow::Result <()> {
|
||||
let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?;
|
||||
udp_sock.connect ((Ipv4Addr::LOCALHOST, cfg.udp_port)).await?;
|
||||
|
||||
let (tcp_read, tcp_write) = conn.into_split ();
|
||||
|
||||
let rx_task;
|
||||
let tx_task;
|
||||
|
||||
{
|
||||
let udp_sock = Arc::new (udp_sock);
|
||||
rx_task = spawn (loops::rx (Arc::clone (&udp_sock), tcp_read));
|
||||
tx_task = spawn (loops::tx (Arc::clone (&udp_sock), tcp_write));
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
_val = tx_task => {
|
||||
println! ("server_handle_connection: tx_task exited, exiting");
|
||||
}
|
||||
_val = rx_task => {
|
||||
println! ("server_handle_connection: rx_task exited, exiting");
|
||||
}
|
||||
}
|
||||
|
||||
Ok (())
|
||||
}
|
Loading…
Reference in New Issue