Merge remote-tracking branch 'origin/main'

main
_ 2021-07-17 20:47:09 -05:00
commit b3295d2542
13 changed files with 1260 additions and 1 deletions

120
Cargo.lock generated
View File

@ -243,6 +243,15 @@ dependencies = [
"subtle",
]
[[package]]
name = "ct-logs"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1a816186fa68d9e426e3cb4ae4dff1fcd8e4a2c34b781bf7a822574a0d0aac8"
dependencies = [
"sct",
]
[[package]]
name = "ctrlc"
version = "3.1.8"
@ -635,7 +644,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project",
"socket2",
"socket2 0.4.0",
"tokio",
"tower-service",
"tracing",
@ -979,6 +988,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "pem"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd56cbd21fea48d0c440b41cd69c589faacade08c992d9a54e471b79d0fd13eb"
dependencies = [
"base64",
"once_cell",
"regex",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
@ -1326,12 +1346,66 @@ dependencies = [
"unicase",
]
[[package]]
name = "quic_demo"
version = "0.1.0"
dependencies = [
"anyhow",
"base64",
"futures-util",
"quinn",
"rcgen",
"structopt",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "quick-error"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ac73b1112776fc109b2e61909bc46c7e1bf0d7f690ffb1676553acce16d5cda"
[[package]]
name = "quinn"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c82c0a393b300104f989f3db8b8637c0d11f7a32a9c214560b47849ba8f119aa"
dependencies = [
"bytes",
"futures",
"lazy_static",
"libc",
"mio",
"quinn-proto",
"rustls",
"socket2 0.3.19",
"thiserror",
"tokio",
"tracing",
"webpki",
]
[[package]]
name = "quinn-proto"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "047aa96ec7ee6acabad7a1318dff72e9aff8994316bf2166c9b94cbec78ca54c"
dependencies = [
"bytes",
"ct-logs",
"rand",
"ring",
"rustls",
"rustls-native-certs",
"slab",
"thiserror",
"tinyvec",
"tracing",
"webpki",
]
[[package]]
name = "quote"
version = "1.0.9"
@ -1390,6 +1464,18 @@ dependencies = [
"libc",
]
[[package]]
name = "rcgen"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48b4fc1b81d685fcd442a86da2e2c829d9e353142633a8159f42bf28e7e94428"
dependencies = [
"chrono",
"pem",
"ring",
"yasna",
]
[[package]]
name = "redox_syscall"
version = "0.2.5"
@ -1533,6 +1619,18 @@ dependencies = [
"webpki",
]
[[package]]
name = "rustls-native-certs"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092"
dependencies = [
"openssl-probe",
"rustls",
"schannel",
"security-framework",
]
[[package]]
name = "rusty_ulid"
version = "0.10.1"
@ -1685,6 +1783,17 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "socket2"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e"
dependencies = [
"cfg-if 1.0.0",
"libc",
"winapi",
]
[[package]]
name = "socket2"
version = "0.4.0"
@ -2255,3 +2364,12 @@ checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
dependencies = [
"winapi",
]
[[package]]
name = "yasna"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e262a29d0e61ccf2b6190d7050d4b237535fc76ce4c1210d9caa316f71dffa75"
dependencies = [
"chrono",
]

View File

@ -47,5 +47,6 @@ chrono = {version = "0.4.19", features = ["serde"]}
members = [
"crates/*",
"prototypes/*",
"tools/*",
]

4
prototypes/quic_demo/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
# TLS certs used for QUIC experiments
*.crt
/app_packages

View File

@ -0,0 +1,18 @@
[package]
name = "quic_demo"
version = "0.1.0"
authors = ["_"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.38"
base64 = "0.13.0"
futures-util = "0.3.9"
quinn = "0.7.2"
rcgen = "0.8.11"
structopt = "0.3.20"
tokio = { version = "1.4.0", features = ["full"] }
tracing-subscriber = "0.2.16"
tracing = "0.1.25"

View File

@ -0,0 +1,83 @@
# https://whitfin.io/speeding-up-rust-docker-builds/
# TODO: https://stackoverflow.com/questions/57389547/how-to-define-the-context-for-a-docker-build-as-a-specific-commit-on-one-of-the
# rust:1.50-slim-buster
FROM rust@sha256:5dd85eb0c60bbdea14a6ecba1f6fe4a0f5c878bcf06d2cdfae0aff3a19ed4b10 as build
WORKDIR /
ENV USER root
# create empty shell projects
RUN cargo new --bin ptth
WORKDIR /ptth
RUN \
cargo new --lib crates/always_equal && \
cargo new --lib crates/ptth_core && \
cargo new --bin crates/ptth_relay && \
cargo new --bin crates/ptth_server && \
cargo new --bin crates/ptth_file_server_bin && \
cargo new --bin tools/ptth_tail && \
cargo new --bin crates/debug_proxy && \
cargo new --bin prototypes/quic_demo
# copy over your manifests
COPY ./Cargo.lock ./
COPY ./Cargo.toml ./
COPY ./crates/always_equal/Cargo.toml ./crates/always_equal/
COPY ./crates/ptth_core/Cargo.toml ./crates/ptth_core/
COPY ./crates/ptth_relay/Cargo.toml ./crates/ptth_relay/
COPY ./prototypes/quic_demo/Cargo.toml ./prototypes/quic_demo/
# this build step will cache your dependencies
RUN cargo build --release -p quic_demo
RUN \
rm \
src/*.rs \
crates/always_equal/src/*.rs \
crates/ptth_core/src/*.rs \
crates/ptth_relay/src/*.rs \
prototypes/quic_demo/src/*.rs
# Copy source tree
# Yes, I tried a few variations on the syntax. Dockerfiles are just rough.
COPY ./src/ ./src
COPY ./crates/always_equal ./crates/always_equal
COPY ./crates/ptth_core ./crates/ptth_core
COPY ./crates/ptth_relay ./crates/ptth_relay
COPY ./handlebars/ ./handlebars
COPY ./prototypes/quic_demo ./prototypes/quic_demo
# Bug in cargo's incremental build logic, triggered by
# Docker doing something funny with mtimes? Maybe?
RUN touch crates/ptth_core/src/lib.rs
# build for release
# gate only on ptth_relay tests for now
RUN \
cargo build --release -p quic_demo --bin quic_demo_relay_server && \
cargo test --release -p quic_demo --bin quic_demo_relay_server
# debian:buster-slim
FROM debian@sha256:13f0764262a064b2dd9f8a828bbaab29bdb1a1a0ac6adc8610a0a5f37e514955
RUN apt-get update \
&& apt-get upgrade -y \
&& apt-get install -y libssl1.1 ca-certificates tini
RUN addgroup --gid 10001 ptth_user && adduser --system --uid 10000 --gid 10001 ptth_user
USER ptth_user
WORKDIR /home/ptth_user
COPY --from=build /ptth/target/release/quic_demo_relay_server ./
ARG git_version
RUN \
echo -n "$git_version" > ./git_version.txt && \
ln -s quic_demo_relay_server app
CMD ["/usr/bin/tini", "--", "./quic_demo_relay_server"]

View File

@ -0,0 +1,168 @@
# End-to-end Testing
There are 5 processes, so you'll need 5 terminal windows or screen / tmux
sessions. Run the processes in this order:
1. QUIC relay server: `RUST_LOG=quic_demo_relay_server=debug cargo run --bin quic_demo_relay_server`
2. Server-side proxy: `RUST_LOG=quic_demo_end_server=debug cargo run --bin quic_demo_end_server`
3. Client-side proxy: `RUST_LOG=quic_demo_client cargo run --bin quic_demo_client`
4. TCP end server: `nc -l -p 30382`
5. TCP end client: `nc 127.0.0.1 30381`
The netcat processes from steps 1 and 5 should now be connected to each other.
# Testing PTTH itself
The end-to-end testing above is the happy path. Try these sadder cases:
- Swap Steps 2 and 3
- After Step 2, restart the server proxy P4
- After Step 3, restart the client proxy P2
- After Step 5, close P1. P2 and P4 should stay up
- After Step 2, restart the relay server P3
- After Step 3, restart the relay server P3
# Network protocol
For the prototype, all control messages are fixed 4-byte messages at the
start of bidirectional streams.
Unused bytes are always "0".
Messages are sent in request-response pairs, like HTTP.
Requests look like this:
1. Command type
2. Extra data, or unused, depending on command type
3. Unused
4. Unused
Responses look like this:
1. Status code (Always "20" for now, meaning "OK")
2. Command type of the request that we're responding to
3. Unused
4. Unused
The command types are:
| Type | Sender | Receiver | Meaning |
| ---- | ------ | ----------- | ------------------------------------------------- |
| 2 | P2 | P3 | Client proxy wants to connect to the relay |
| 4 | P4 | P3 | Server proxy wants to connect to the relay |
| 10 | P2 | P3 | Client wants relay to connect it to a server |
| 11 | P3 | P4 | Relay tells server that a client wants to connect |
| 12 | P2 | P4 (via P3) | Client wants a port forwarded from the server |
# Plan
This is a TCP port forwarding system, like SSH has, but better.
We'll name the processes in order from clientest to serverest:
1. TCP end client (e.g. VNC viewer, SSH client)
2. Client-side proxy (Part of PTTH, must run as a desktop app on end user systems)
3. Relay server (Part of ptth_relay)
4. Server-side proxy (Path of ptth_server)
5. TCP end server (e.g. VNC server, SSH server)
At the highest level, creating a connection means:
1. Processes 2, 3, and 4 are running and connected by QUIC.
2. Process 5 is running and listening on its TCP port.
(e.g. 22 for SSH)
3. Process 2 sets a port mapping to the server.
(e.g. 2200 on the client is mapped to 22 on the server)
4. Process 1 connects to the client-side port
(e.g. ssh to 2200 on localhost)
5. Processes 2, 3, and 4 forward that connection to Process 5.
Any of the other processes can reject the connection for various reasons.
6. The connection is established, and Processes 2, 3, and 4 relay bytes
for Processes 1 and 5.
What identifies a connection?
1. The client ID, because a connection is never shared between 2 clients.
2. The ephemeral port used by P1, because 2 SSH clients may connect to the same
forwarded port in P2 at once.
3. The server ID, because a connection is never shared between 2 servers.
4. The listen port of P5, for the sake of matching TCP theory, and because
this data is needed. I'm not sure if it's technically part of the identifier.
Because clients can't see each other, each client will store these data
about each of its connections:
1. The ephemeral port used by P1
2. The server ID
3. The listen port for P5
Because servers can't see each other, each server stores these data:
1. P1's port
2. The client ID
3. The listen port for P5
The relay server must store all 4 data for each connection.
Each client and each server has exactly one QUIC connection to the relay
server.
In P2 and P4, each PTTH connection has a TCP bi stream and a QUIC bi stream.
In P3, each PTTH connection has 2 QUIC bi streams.
Assume that some kind of framing exists.
Once a port mapping is established (within P2 only), a connection proceeds
like this:
1. P1 connects to P2
2. P2 starts a QUIC bi stream to P3. This has a tip message of
(P1 port, P4 ID, P5 port).
3. If P2 isn't authorized to make the connection, P3 closes it
4. If P4 isn't connected to P3, the connection fails
5. P3 starts a QUIC bi stream to P4. This has a tip message of
(P1 port, P2 ID, P5 port).
6. If P4 doesn't approve, the connection closes
7. If P4 can't connect to P5, the connection closes
8. P4 connects to P5
9. P4 returns a tip message to P3, equivalent to "200 OK"
10. P3 returns the tip message to P2
11. The connection goes into relaying mode
Here's the big things I'm thinking:
- It's kinda Tor, in that there's 3 middle nodes and onion layers
- It's kinda like HTTP 1, except after the server responds, you turn the
connection into an arbitrary TCP connection. Maybe that's what the CONNECT
verb does?
- I will probably need a control channel some day, but not today
So I don't need a general control stream, which means luckily I don't need
to worry about synchronizing between streams. The tip messages are enough
to get things going.
Would the protocol be cleaner if the tip messages were explicitly layered?
I think so. As long as we aren't end-to-end encrypted (AND WE ARE NOT) this
gives P3, the relay server, a chance to be clever if we ever need it.
So reviewing the above steps:
In Step 2 the tip message says, "Connect me to P4 with ID 'bogus' and forward
this message to them".
Between Steps 2 and 5, P3 re-wraps the inner message. P3 says to P4,
"P2 with ID 'bogus' wants to connect, and forwards this message"
P4 receives that inner tip message in Step 5, and it says "Connect my port 2200
to your port 22."
There's no need for the actual port numbers to be used - It could be tokenized.
But for a prototype, I think it's fine.
Since all parties trust P3, this makes it more clear that P2 and P4 may not
directly trust each other. P3 vouches for the identity of each. The response
from P4 is symmetrical with the request from P2.

View File

@ -0,0 +1,21 @@
#!/usr/bin/env bash
set -euo pipefail
GIT_COMMIT=$(git rev-parse main)
GIT_COMMIT_SHORT=$(echo "$GIT_COMMIT" | cut -b -8)
DOCKER_TAG="ptth_quic:latest"
mkdir -p app_packages
pushd ../../
git archive --format=tar "$GIT_COMMIT" | sudo docker build -f prototypes/quic_demo/Dockerfile -t "$DOCKER_TAG" --build-arg "git_version=$GIT_COMMIT" -
popd
sudo docker run --rm "$DOCKER_TAG" \
tar -c \
app \
quic_demo_relay_server \
| gzip > "app_packages/ptth_quic_relay_$GIT_COMMIT_SHORT.tar.gz"
# sudo docker build -f app_package_Dockerfile -t ptth_app_host:latest .

View File

@ -0,0 +1,190 @@
use structopt::StructOpt;
use tokio::net::TcpListener;
use quic_demo::prelude::*;
#[derive (Debug, StructOpt)]
struct Opt {
#[structopt (long)]
relay_addr: Option <String>,
#[structopt (long)]
local_tcp_port: Option <u16>,
#[structopt (long)]
client_id: Option <u8>,
#[structopt (long)]
server_id: Option <u8>,
}
#[tokio::main]
async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let opt = Opt::from_args ();
let local_tcp_port = opt.local_tcp_port.unwrap_or (30381);
let server_cert = tokio::fs::read ("quic_server.crt").await?;
let relay_addr = opt.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?;
trace! ("Connecting to relay server");
let quinn::NewConnection {
connection,
..
} = endpoint.connect (&relay_addr, "localhost")?.await?;
let (mut send, mut recv) = connection.open_bi ().await?;
let client_id = opt.client_id.unwrap_or (42);
let server_id = opt.server_id.unwrap_or (43);
let req_buf = [
Command::CONNECT_P2_TO_P3.0,
client_id,
0,
0,
];
send.write_all (&req_buf).await?;
let mut resp_buf = [0, 0, 0, 0];
recv.read_exact (&mut resp_buf).await?;
assert_eq! (resp_buf, [
Command::OKAY.0,
Command::CONNECT_P2_TO_P3.0,
0,
0,
]);
let listener = TcpListener::bind (("127.0.0.1", local_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1");
loop {
let (tcp_socket, _) = listener.accept ().await?;
let connection = connection.clone ();
tokio::spawn (async move {
let (local_recv, local_send) = tcp_socket.into_split ();
debug! ("Started PTTH connection");
let (mut relay_send, mut relay_recv) = connection.open_bi ().await?;
// Ask P3 if we can connect to P4
let req_buf = [
Command::CONNECT_P2_TO_P4.0,
server_id,
0,
0,
];
relay_send.write_all (&req_buf).await?;
let mut resp_buf = [0; 4];
relay_recv.read_exact (&mut resp_buf).await?;
assert_eq! (resp_buf, [
Command::OKAY.0,
Command::CONNECT_P2_TO_P4.0,
0,
0,
]);
// Ask P4 if we can connect to P5
let req_buf = [
Command::CONNECT_P2_TO_P5.0,
0,
0,
0,
];
relay_send.write_all (&req_buf).await?;
let mut resp_buf = [0; 4];
relay_recv.read_exact (&mut resp_buf).await?;
assert_eq! (resp_buf, [
Command::OKAY.0,
Command::CONNECT_P2_TO_P5.0,
0,
0,
]);
trace! ("Relaying bytes...");
let ptth_conn = PtthNewConnection {
local_send,
local_recv,
relay_send,
relay_recv,
}.build ();
ptth_conn.uplink_task.await??;
ptth_conn.downlink_task.await??;
debug! ("Ended PTTH connection");
Ok::<_, anyhow::Error> (())
});
}
}
struct PtthNewConnection {
local_send: tokio::net::tcp::OwnedWriteHalf,
local_recv: tokio::net::tcp::OwnedReadHalf,
relay_send: quinn::SendStream,
relay_recv: quinn::RecvStream,
}
struct PtthConnection {
uplink_task: JoinHandle <anyhow::Result <()>>,
downlink_task: JoinHandle <anyhow::Result <()>>,
}
impl PtthNewConnection {
fn build (self) -> PtthConnection {
let Self {
mut local_send,
mut local_recv,
mut relay_send,
mut relay_recv,
} = self;
let uplink_task = tokio::spawn (async move {
// Uplink - local client to relay server
let mut buf = vec! [0u8; 65_536];
loop {
let bytes_read = local_recv.read (&mut buf).await?;
if bytes_read == 0 {
break;
}
let buf_slice = &buf [0..bytes_read];
trace! ("Uplink relaying {} bytes", bytes_read);
relay_send.write_all (buf_slice).await?;
}
trace! ("Uplink closed");
Ok::<_, anyhow::Error> (())
});
let downlink_task = tokio::spawn (async move {
// Downlink - Relay server to local client
let mut buf = vec! [0u8; 65_536];
while let Some (bytes_read) = relay_recv.read (&mut buf).await? {
let buf_slice = &buf [0..bytes_read];
trace! ("Downlink relaying {} bytes", bytes_read);
local_send.write_all (buf_slice).await?;
}
trace! ("Downlink closed");
Ok::<_, anyhow::Error> (())
});
PtthConnection {
uplink_task,
downlink_task,
}
}
}

View File

@ -0,0 +1,171 @@
use structopt::StructOpt;
use tokio::net::TcpStream;
use quic_demo::prelude::*;
#[derive (Debug, StructOpt)]
struct Opt {
#[structopt (long)]
relay_addr: Option <String>,
#[structopt (long)]
local_tcp_port: Option <u16>,
#[structopt (long)]
server_id: Option <u8>,
}
#[tokio::main]
async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let opt = Opt::from_args ();
let local_tcp_port = opt.local_tcp_port.unwrap_or (30382);
let server_cert = tokio::fs::read ("quic_server.crt").await?;
let relay_addr = opt.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?;
trace! ("Connecting to relay server");
let quinn::NewConnection {
connection,
mut bi_streams,
..
} = endpoint.connect (&relay_addr, "localhost")?.await?;
let (mut send, mut recv) = connection.open_bi ().await?;
let our_id = opt.server_id.unwrap_or (43);
let req_buf = [
Command::CONNECT_P4_TO_P3.0,
our_id,
0,
0,
];
send.write_all (&req_buf).await?;
let mut resp_buf = [0, 0, 0, 0];
recv.read_exact (&mut resp_buf).await?;
assert_eq! (resp_buf, [
Command::OKAY.0,
Command::CONNECT_P4_TO_P3.0,
0,
0,
]);
trace! ("Accepting bi streams from P3");
loop {
let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??;
tokio::spawn (async move {
let mut req_buf = [0, 0, 0, 0];
relay_recv.read_exact (&mut req_buf).await?;
assert_eq! (req_buf [0], Command::CONNECT_P2_TO_P4_STEP_2.0);
// TODO: Authorize P2 to connect to us
let resp_buf = [
Command::OKAY.0,
Command::CONNECT_P2_TO_P4_STEP_2.0,
0,
0,
];
relay_send.write_all (&resp_buf).await?;
let mut req_buf = [0, 0, 0, 0];
relay_recv.read_exact (&mut req_buf).await?;
assert_eq! (req_buf [0], Command::CONNECT_P2_TO_P5.0);
// TODO: Authorize P2 to connect to P5
let resp_buf = [
Command::OKAY.0,
Command::CONNECT_P2_TO_P5.0,
0,
0,
];
relay_send.write_all (&resp_buf).await?;
debug! ("Started PTTH connection");
let stream = TcpStream::connect (("127.0.0.1", local_tcp_port)).await?;
let (local_recv, local_send) = stream.into_split ();
trace! ("Relaying bytes...");
let ptth_conn = PtthNewConnection {
local_send,
local_recv,
relay_send,
relay_recv,
}.build ();
ptth_conn.uplink_task.await??;
ptth_conn.downlink_task.await??;
Ok::<_, anyhow::Error> (())
});
}
}
struct PtthNewConnection {
local_send: tokio::net::tcp::OwnedWriteHalf,
local_recv: tokio::net::tcp::OwnedReadHalf,
relay_send: quinn::SendStream,
relay_recv: quinn::RecvStream,
}
struct PtthConnection {
uplink_task: JoinHandle <anyhow::Result <()>>,
downlink_task: JoinHandle <anyhow::Result <()>>,
}
impl PtthNewConnection {
fn build (self) -> PtthConnection {
let Self {
mut local_send,
mut local_recv,
mut relay_send,
mut relay_recv,
} = self;
let uplink_task = tokio::spawn (async move {
// Downlink - Relay server to local client
let mut buf = vec! [0u8; 65_536];
while let Some (bytes_read) = relay_recv.read (&mut buf).await? {
let buf_slice = &buf [0..bytes_read];
trace! ("Uplink relaying {} bytes", bytes_read);
local_send.write_all (buf_slice).await?;
}
trace! ("Downlink closed");
Ok::<_, anyhow::Error> (())
});
let downlink_task = tokio::spawn (async move {
// Uplink - local client to relay server
let mut buf = vec! [0u8; 65_536];
loop {
let bytes_read = local_recv.read (&mut buf).await?;
if bytes_read == 0 {
break;
}
let buf_slice = &buf [0..bytes_read];
trace! ("Downlink relaying {} bytes", bytes_read);
relay_send.write_all (buf_slice).await?;
}
trace! ("Uplink closed");
Ok::<_, anyhow::Error> (())
});
PtthConnection {
uplink_task,
downlink_task,
}
}
}

View File

@ -0,0 +1,354 @@
use structopt::StructOpt;
use quic_demo::prelude::*;
#[derive (Debug, StructOpt)]
struct Opt {
#[structopt (long)]
listen_addr: Option <String>,
}
#[tokio::main]
async fn main () -> anyhow::Result <()> {
tracing_subscriber::fmt::init ();
let opt = Opt::from_args ();
let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?;
let (mut incoming, server_cert) = make_server_endpoint (listen_addr)?;
println! ("Base64 cert: {}", base64::encode (&server_cert));
tokio::fs::write ("quic_server.crt", &server_cert).await?;
let relay_state = RelayState::default ();
let relay_state = Arc::new (relay_state);
while let Some (conn) = incoming.next ().await {
let relay_state = Arc::clone (&relay_state);
// Each new peer QUIC connection gets its own task
tokio::spawn (async move {
let active = relay_state.stats.quic.connect ();
debug! ("QUIC connections: {}", active);
match handle_quic_connection (Arc::clone (&relay_state), conn).await {
Ok (_) => (),
Err (e) => warn! ("handle_quic_connection {:?}", e),
}
let active = relay_state.stats.quic.disconnect ();
debug! ("QUIC connections: {}", active);
});
}
Ok (())
}
#[derive (Default)]
struct RelayState {
p4_server_proxies: Mutex <HashMap <u8, P4State>>,
stats: Stats,
}
#[derive (Default)]
struct Stats {
quic: ConnectEvents,
}
#[derive (Default)]
struct ConnectEvents {
connects: AtomicU64,
disconnects: AtomicU64,
}
impl ConnectEvents {
fn connect (&self) -> u64 {
let connects = self.connects.fetch_add (1, Ordering::Relaxed) + 1;
let disconnects = self.disconnects.load (Ordering::Relaxed);
connects - disconnects
}
fn disconnect (&self) -> u64 {
let disconnects = self.disconnects.fetch_add (1, Ordering::Relaxed) + 1;
let connects = self.connects.load (Ordering::Relaxed);
connects - disconnects
}
fn active (&self) -> u64 {
let connects = self.connects.load (Ordering::Relaxed);
let disconnects = self.disconnects.load (Ordering::Relaxed);
connects - disconnects
}
}
struct P4State {
req_channel: mpsc::Sender <RequestP2ToP4>,
}
impl RelayState {
}
struct RequestP2ToP4 {
client_send: quinn::SendStream,
client_recv: quinn::RecvStream,
client_id: u8,
}
struct PtthNewConnection {
client_send: quinn::SendStream,
client_recv: quinn::RecvStream,
server_send: quinn::SendStream,
server_recv: quinn::RecvStream,
}
struct PtthConnection {
uplink_task: JoinHandle <anyhow::Result <()>>,
downlink_task: JoinHandle <anyhow::Result <()>>,
}
impl PtthNewConnection {
fn build (self) -> PtthConnection {
let Self {
mut client_send,
mut client_recv,
mut server_send,
mut server_recv,
} = self;
let uplink_task = tokio::spawn (async move {
// Uplink - Client to end server
let mut buf = vec! [0u8; 65_536];
while let Some (bytes_read) = client_recv.read (&mut buf).await? {
if bytes_read == 0 {
break;
}
let buf_slice = &buf [0..bytes_read];
trace! ("Uplink relaying {} bytes", bytes_read);
server_send.write_all (buf_slice).await?;
}
trace! ("Uplink closed");
Ok::<_, anyhow::Error> (())
});
let downlink_task = tokio::spawn (async move {
// Downlink - End server to client
let mut buf = vec! [0u8; 65_536];
while let Some (bytes_read) = server_recv.read (&mut buf).await? {
let buf_slice = &buf [0..bytes_read];
trace! ("Downlink relaying {} bytes", bytes_read);
client_send.write_all (buf_slice).await?;
}
trace! ("Downlink closed");
Ok::<_, anyhow::Error> (())
});
PtthConnection {
uplink_task,
downlink_task,
}
}
}
async fn handle_quic_connection (
relay_state: Arc <RelayState>,
conn: quinn::Connecting,
) -> anyhow::Result <()>
{
let mut conn = conn.await?;
// Everyone who connects must identify themselves with the first
// bi stream
// TODO: Timeout
let (mut send, mut recv) = conn.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("QUIC client didn't identify itself"))??;
let mut req_buf = [0, 0, 0, 0];
recv.read_exact (&mut req_buf).await?;
let peer_type = req_buf [0];
let peer_id = req_buf [1];
match peer_type {
4 => debug! ("Server-side proxy (P4) connected, ID {}", peer_id),
2 => debug! ("Client-side proxy (P2) connected, ID {}", peer_id),
_ => bail! ("Unknown QUIC client type"),
}
let resp_buf = [
Command::OKAY.0,
peer_type,
0,
0,
];
send.write_all (&resp_buf).await?;
match peer_type {
2 => {
handle_p2_connection (relay_state, peer_id, conn).await?;
},
4 => {
handle_p4_connection (relay_state, peer_id, conn).await?;
},
_ => bail! ("Unknown QUIC client type"),
}
debug! ("Peer {} disconnected", peer_id);
Ok::<_, anyhow::Error> (())
}
async fn handle_p2_connection (
relay_state: Arc <RelayState>,
client_id: u8,
conn: quinn::NewConnection,
) -> anyhow::Result <()>
{
let quinn::NewConnection {
mut bi_streams,
..
} = conn;
while let Some (bi_stream) = bi_streams.next ().await {
let (client_send, mut client_recv) = bi_stream?;
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
debug! ("Request started for P2");
let mut req_buf = [0, 0, 0, 0];
client_recv.read_exact (&mut req_buf).await?;
let cmd_type = req_buf [0];
match Command (cmd_type) {
Command::CONNECT_P2_TO_P4 => {
let server_id = req_buf [1];
handle_request_p2_to_p4 (relay_state, client_id, server_id, client_send, client_recv).await?;
},
_ => bail! ("Unknown command type from P2"),
}
debug! ("Request ended for P2");
Ok::<_, anyhow::Error> (())
});
}
debug! ("P2 {} disconnected", client_id);
Ok (())
}
async fn handle_request_p2_to_p4 (
relay_state: Arc <RelayState>,
client_id: u8,
server_id: u8,
mut client_send: quinn::SendStream,
client_recv: quinn::RecvStream,
) -> anyhow::Result <()>
{
trace! ("P2 {} wants to connect to P4 {}", client_id, server_id);
// TODO: Auth checks
let resp_buf = [
Command::OKAY.0,
Command::CONNECT_P2_TO_P4.0,
0,
0,
];
client_send.write_all (&resp_buf).await?;
{
let p4_server_proxies = relay_state.p4_server_proxies.lock ().await;
match p4_server_proxies.get (&server_id) {
Some (p4_state) => {
p4_state.req_channel.send (RequestP2ToP4 {
client_send,
client_recv,
client_id,
}).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?;
},
None => warn! ("That server isn't connected"),
}
}
Ok (())
}
async fn handle_p4_connection (
relay_state: Arc <RelayState>,
server_id: u8,
conn: quinn::NewConnection,
) -> anyhow::Result <()>
{
let quinn::NewConnection {
connection,
..
} = conn;
let (tx, mut rx) = mpsc::channel (2);
let p4_state = P4State {
req_channel: tx,
};
{
let mut p4_server_proxies = relay_state.p4_server_proxies.lock ().await;
p4_server_proxies.insert (server_id, p4_state);
}
while let Some (req) = rx.recv ().await {
let connection = connection.clone ();
tokio::spawn (async move {
let RequestP2ToP4 {
client_send,
client_recv,
client_id,
} = req;
debug! ("P4 {} got a request from P2 {}", server_id, req.client_id);
let (mut server_send, mut server_recv) = connection.open_bi ().await?;
let req_buf = [
Command::CONNECT_P2_TO_P4_STEP_2.0,
client_id,
0,
0,
];
server_send.write_all (&req_buf).await?;
let mut resp_buf = [0, 0, 0, 0];
server_recv.read_exact (&mut resp_buf).await?;
assert_eq! (resp_buf, [
Command::OKAY.0,
Command::CONNECT_P2_TO_P4_STEP_2.0,
0,
0,
]);
trace! ("Relaying bytes...");
let ptth_conn = PtthNewConnection {
client_send,
client_recv,
server_send,
server_recv,
}.build ();
ptth_conn.uplink_task.await??;
ptth_conn.downlink_task.await??;
debug! ("Request ended for P4");
Ok::<_, anyhow::Error> (())
});
}
debug! ("P4 {} disconnected", server_id);
Ok (())
}

View File

@ -0,0 +1,2 @@
pub mod prelude;
pub mod quinn_utils;

View File

@ -0,0 +1,46 @@
pub use std::{
collections::*,
sync::{
Arc,
atomic::{
AtomicU64,
Ordering,
},
},
time::Duration,
};
pub use anyhow::bail;
pub use futures_util::StreamExt;
pub use tokio::{
io::{
AsyncReadExt,
AsyncWriteExt,
},
sync::{
Mutex,
mpsc,
},
task::JoinHandle,
};
pub use tracing::{
debug,
error,
info,
trace,
warn,
};
pub use crate::quinn_utils::*;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Command (pub u8);
impl Command {
pub const CONNECT_P2_TO_P3: Command = Command (2);
pub const CONNECT_P4_TO_P3: Command = Command (4);
pub const CONNECT_P2_TO_P4: Command = Command (10);
pub const CONNECT_P2_TO_P4_STEP_2: Command = Command (11);
pub const CONNECT_P2_TO_P5: Command = Command (12);
pub const OKAY: Command = Command (20);
}

View File

@ -0,0 +1,83 @@
// I'm not sure where I got this module from, but it's probably from the
// quinn examples, so the license should be okay.
use std::{
net::SocketAddr,
sync::Arc,
time::Duration,
};
use quinn::{
Certificate, CertificateChain, ClientConfig, ClientConfigBuilder, Endpoint, Incoming,
PrivateKey, ServerConfig, ServerConfigBuilder, TransportConfig,
};
/// Constructs a QUIC endpoint configured for use a client only.
///
/// ## Args
///
/// - server_certs: list of trusted certificates.
#[allow(unused)]
pub fn make_client_endpoint(
bind_addr: SocketAddr,
server_certs: &[&[u8]],
) -> anyhow::Result<Endpoint> {
let mut client_cfg = configure_client (server_certs)?;
let mut transport = quinn::TransportConfig::default ();
transport.keep_alive_interval (Some (Duration::from_millis (5_000)));
client_cfg.transport = Arc::new (transport);
let mut endpoint_builder = Endpoint::builder ();
endpoint_builder.default_client_config (client_cfg);
let (endpoint, _incoming) = endpoint_builder.bind(&bind_addr)?;
Ok(endpoint)
}
/// Constructs a QUIC endpoint configured to listen for incoming connections
/// on a certain address and port.
///
/// ## Returns
///
/// - a stream of incoming QUIC connections
/// - server certificate serialized into DER format
#[allow(unused)]
pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Incoming, Vec<u8>)> {
let (server_config, server_cert) = configure_server()?;
let mut endpoint_builder = Endpoint::builder();
endpoint_builder.listen(server_config);
let (_endpoint, incoming) = endpoint_builder.bind(&bind_addr)?;
Ok((incoming, server_cert))
}
/// Builds default quinn client config and trusts given certificates.
///
/// ## Args
///
/// - server_certs: a list of trusted certificates in DER format.
fn configure_client(server_certs: &[&[u8]]) -> anyhow::Result<ClientConfig> {
let mut cfg_builder = ClientConfigBuilder::default();
for cert in server_certs {
cfg_builder.add_certificate_authority(Certificate::from_der(&cert)?)?;
}
Ok(cfg_builder.build())
}
/// Returns default server configuration along with its certificate.
#[allow(clippy::field_reassign_with_default)] // https://github.com/rust-lang/rust-clippy/issues/6527
fn configure_server() -> anyhow::Result<(ServerConfig, Vec<u8>)> {
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap();
let cert_der = cert.serialize_der().unwrap();
let priv_key = cert.serialize_private_key_der();
let priv_key = PrivateKey::from_der(&priv_key)?;
let mut transport_config = TransportConfig::default();
transport_config.max_concurrent_uni_streams(0).unwrap();
let mut server_config = ServerConfig::default();
server_config.transport = Arc::new(transport_config);
let mut cfg_builder = ServerConfigBuilder::new(server_config);
let cert = Certificate::from_der(&cert_der)?;
cfg_builder.certificate(CertificateChain::from_certs(vec![cert]), priv_key)?;
Ok((cfg_builder.build(), cert_der))
}