Merge remote-tracking branch 'origin/main' into main

main
_ 2021-09-07 17:10:59 -05:00
commit f867f32927
20 changed files with 1561 additions and 226 deletions

3
.gitignore vendored
View File

@ -8,3 +8,6 @@
/ptth_server_build_BIHWLQXQ/ /ptth_server_build_BIHWLQXQ/
/scraper-secret.txt /scraper-secret.txt
/target /target
# TLS certs used for QUIC experiments
*.crt

15
Cargo.lock generated
View File

@ -1201,6 +1201,20 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "ptth_quic_client_gui"
version = "0.1.0"
dependencies = [
"anyhow",
"fltk",
"quic_demo",
"quinn",
"structopt",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]] [[package]]
name = "ptth_relay" name = "ptth_relay"
version = "2.0.0" version = "2.0.0"
@ -1313,7 +1327,6 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
"fltk",
"futures-util", "futures-util",
"quinn", "quinn",
"rcgen", "rcgen",

View File

@ -19,7 +19,8 @@ cargo new --bin crates/ptth_relay && \
cargo new --bin crates/ptth_server && \ cargo new --bin crates/ptth_server && \
cargo new --bin crates/ptth_file_server_bin && \ cargo new --bin crates/ptth_file_server_bin && \
cargo new --bin tools/ptth_tail && \ cargo new --bin tools/ptth_tail && \
cargo new --bin crates/debug_proxy cargo new --bin crates/debug_proxy && \
cargo new --bin prototypes/quic_demo
# copy over your manifests # copy over your manifests
COPY ./Cargo.lock ./ COPY ./Cargo.lock ./
@ -27,6 +28,7 @@ COPY ./Cargo.toml ./
COPY ./crates/always_equal/Cargo.toml ./crates/always_equal/ COPY ./crates/always_equal/Cargo.toml ./crates/always_equal/
COPY ./crates/ptth_core/Cargo.toml ./crates/ptth_core/ COPY ./crates/ptth_core/Cargo.toml ./crates/ptth_core/
COPY ./crates/ptth_relay/Cargo.toml ./crates/ptth_relay/ COPY ./crates/ptth_relay/Cargo.toml ./crates/ptth_relay/
COPY ./prototypes/quic_demo/Cargo.toml ./prototypes/quic_demo/
# this build step will cache your dependencies # this build step will cache your dependencies
RUN cargo build --release -p ptth_relay RUN cargo build --release -p ptth_relay
@ -36,7 +38,8 @@ rm \
src/*.rs \ src/*.rs \
crates/always_equal/src/*.rs \ crates/always_equal/src/*.rs \
crates/ptth_core/src/*.rs \ crates/ptth_core/src/*.rs \
crates/ptth_relay/src/*.rs crates/ptth_relay/src/*.rs \
prototypes/quic_demo/src/*.rs
# Copy source tree # Copy source tree
# Yes, I tried a few variations on the syntax. Dockerfiles are just rough. # Yes, I tried a few variations on the syntax. Dockerfiles are just rough.
@ -46,6 +49,7 @@ COPY ./crates/always_equal ./crates/always_equal
COPY ./crates/ptth_core ./crates/ptth_core COPY ./crates/ptth_core ./crates/ptth_core
COPY ./crates/ptth_relay ./crates/ptth_relay COPY ./crates/ptth_relay ./crates/ptth_relay
COPY ./handlebars/ ./handlebars COPY ./handlebars/ ./handlebars
COPY ./prototypes/quic_demo ./prototypes/quic_demo
# Bug in cargo's incremental build logic, triggered by # Bug in cargo's incremental build logic, triggered by
# Docker doing something funny with mtimes? Maybe? # Docker doing something funny with mtimes? Maybe?

View File

@ -15,7 +15,6 @@ use crate::{
errors::ConfigError, errors::ConfigError,
key_validity::{ key_validity::{
ScraperKey, ScraperKey,
Valid30Days,
}, },
}; };
@ -99,7 +98,6 @@ pub mod file {
use crate::key_validity::{ use crate::key_validity::{
BlakeHashWrapper, BlakeHashWrapper,
ScraperKey, ScraperKey,
Valid30Days,
}; };
#[derive (Clone, Debug, Deserialize, Serialize)] #[derive (Clone, Debug, Deserialize, Serialize)]
@ -142,7 +140,7 @@ pub mod file {
pub servers: Option <Vec <Server>>, pub servers: Option <Vec <Server>>,
// Adding a DB will take a while, so I'm moving these out of dev mode. // Adding a DB will take a while, so I'm moving these out of dev mode.
pub scraper_keys: Option <Vec <ScraperKey <Valid30Days>>>, pub scraper_keys: Option <Vec <ScraperKey>>,
pub news_url: Option <String>, pub news_url: Option <String>,
} }
@ -156,7 +154,7 @@ pub struct Config {
pub address: IpAddr, pub address: IpAddr,
pub port: Option <u16>, pub port: Option <u16>,
pub servers: HashMap <String, file::Server>, pub servers: HashMap <String, file::Server>,
pub scraper_keys: HashMap <String, ScraperKey <Valid30Days>>, pub scraper_keys: HashMap <String, ScraperKey>,
pub news_url: Option <String>, pub news_url: Option <String>,
} }

View File

@ -78,36 +78,17 @@ impl Serialize for BlakeHashWrapper {
} }
} }
pub struct Valid7Days;
pub struct Valid30Days;
//pub struct Valid90Days;
pub trait MaxValidDuration { pub trait MaxValidDuration {
fn dur () -> Duration; fn dur () -> Duration;
} }
impl MaxValidDuration for Valid7Days {
fn dur () -> Duration {
Duration::days (7)
}
}
impl MaxValidDuration for Valid30Days {
fn dur () -> Duration {
Duration::days (30)
}
}
#[derive (Deserialize)] #[derive (Deserialize)]
pub struct ScraperKey <V: MaxValidDuration> { pub struct ScraperKey {
name: String, pub name: String,
not_before: DateTime <Utc>, not_before: DateTime <Utc>,
not_after: DateTime <Utc>, not_after: DateTime <Utc>,
pub hash: BlakeHashWrapper, pub hash: BlakeHashWrapper,
#[serde (default)]
_phantom: std::marker::PhantomData <V>,
} }
#[derive (Copy, Clone, Debug, PartialEq)] #[derive (Copy, Clone, Debug, PartialEq)]
@ -121,21 +102,20 @@ pub enum KeyValidity {
DurationNegative, DurationNegative,
} }
impl <V: MaxValidDuration> ScraperKey <V> { impl ScraperKey {
pub fn new_30_day <S: Into <String>> (name: S, input: &[u8]) -> Self { pub fn new_30_day <S: Into <String>> (name: S, input: &[u8]) -> Self {
let now = Utc::now (); let now = Utc::now ();
Self { Self {
name: name.into (), name: name.into (),
not_before: now, not_before: now,
not_after: now + V::dur (), not_after: now + Duration::days (30),
hash: BlakeHashWrapper::from_key (input), hash: BlakeHashWrapper::from_key (input),
_phantom: Default::default (),
} }
} }
} }
impl <V: MaxValidDuration> ScraperKey <V> { impl ScraperKey {
#[must_use] #[must_use]
pub fn is_valid (&self, now: DateTime <Utc>, input: &[u8]) -> KeyValidity { pub fn is_valid (&self, now: DateTime <Utc>, input: &[u8]) -> KeyValidity {
use KeyValidity::*; use KeyValidity::*;
@ -152,13 +132,6 @@ impl <V: MaxValidDuration> ScraperKey <V> {
return DurationNegative; return DurationNegative;
} }
let max_dur = V::dur ();
let actual_dur = self.not_after - self.not_before;
if actual_dur > max_dur {
return DurationTooLong (max_dur);
}
if now >= self.not_after { if now >= self.not_after {
return Expired; return Expired;
} }
@ -196,12 +169,11 @@ mod tests {
fn duration_negative () { fn duration_negative () {
let zero_time = Utc::now (); let zero_time = Utc::now ();
let key = ScraperKey::<Valid30Days> { let key = ScraperKey {
name: "automated testing".to_string (), name: "automated testing".to_string (),
not_before: zero_time + Duration::days (1 + 2), not_before: zero_time + Duration::days (1 + 2),
not_after: zero_time + Duration::days (1), not_after: zero_time + Duration::days (1),
hash: BlakeHashWrapper::from_key ("bad_password".as_bytes ()), hash: BlakeHashWrapper::from_key ("bad_password".as_bytes ()),
_phantom: Default::default (),
}; };
let err = DurationNegative; let err = DurationNegative;
@ -215,46 +187,22 @@ mod tests {
} }
} }
#[test]
fn key_valid_too_long () {
let zero_time = Utc::now ();
let key = ScraperKey::<Valid30Days> {
name: "automated testing".to_string (),
not_before: zero_time + Duration::days (1),
not_after: zero_time + Duration::days (1 + 31),
hash: BlakeHashWrapper::from_key ("bad_password".as_bytes ()),
_phantom: Default::default (),
};
let err = DurationTooLong (Duration::days (30));
for (input, expected) in &[
(zero_time + Duration::days (0), err),
(zero_time + Duration::days (2), err),
(zero_time + Duration::days (100), err),
] {
assert_eq! (key.is_valid (*input, "bad_password".as_bytes ()), *expected);
}
}
#[test] #[test]
fn normal_key () { fn normal_key () {
let zero_time = Utc::now (); let zero_time = Utc::now ();
let key = ScraperKey::<Valid30Days> { let key = ScraperKey {
name: "automated testing".to_string (), name: "automated testing".to_string (),
not_before: zero_time + Duration::days (1), not_before: zero_time + Duration::days (1),
not_after: zero_time + Duration::days (1 + 30), not_after: zero_time + Duration::days (1 + 60),
hash: BlakeHashWrapper::from_key ("bad_password".as_bytes ()), hash: BlakeHashWrapper::from_key ("bad_password".as_bytes ()),
_phantom: Default::default (),
}; };
for (input, expected) in &[ for (input, expected) in &[
(zero_time + Duration::days (0), ClockIsBehind), (zero_time + Duration::days (0), ClockIsBehind),
(zero_time + Duration::days (2), Valid), (zero_time + Duration::days (2), Valid),
(zero_time + Duration::days (29), Valid), (zero_time + Duration::days (60 - 1), Valid),
(zero_time + Duration::days (1 + 30), Expired), (zero_time + Duration::days (60 + 1), Expired),
(zero_time + Duration::days (100), Expired), (zero_time + Duration::days (100), Expired),
] { ] {
assert_eq! (key.is_valid (*input, "bad_password".as_bytes ()), *expected); assert_eq! (key.is_valid (*input, "bad_password".as_bytes ()), *expected);
@ -265,12 +213,11 @@ mod tests {
fn wrong_key () { fn wrong_key () {
let zero_time = Utc::now (); let zero_time = Utc::now ();
let key = ScraperKey::<Valid30Days> { let key = ScraperKey {
name: "automated testing".to_string (), name: "automated testing".to_string (),
not_before: zero_time + Duration::days (1), not_before: zero_time + Duration::days (1),
not_after: zero_time + Duration::days (1 + 30), not_after: zero_time + Duration::days (1 + 30),
hash: BlakeHashWrapper::from_key ("bad_password".as_bytes ()), hash: BlakeHashWrapper::from_key ("bad_password".as_bytes ()),
_phantom: Default::default (),
}; };
for input in &[ for input in &[

View File

@ -127,8 +127,6 @@ async fn handle_http_request (
return Err (UnknownServer); return Err (UnknownServer);
} }
let user = get_user_name (&req);
let req = http_serde::RequestParts::from_hyper (req.method, uri.clone (), req.headers) let req = http_serde::RequestParts::from_hyper (req.method, uri.clone (), req.headers)
.map_err (|_| BadRequest)?; .map_err (|_| BadRequest)?;
@ -136,11 +134,6 @@ async fn handle_http_request (
let req_id = rusty_ulid::generate_ulid_string (); let req_id = rusty_ulid::generate_ulid_string ();
state.audit_log.push (AuditEvent::new (AuditData::WebClientGet {
user,
server_name: server_name.to_string (),
uri,
})).await;
trace! ("Created request {}", req_id); trace! ("Created request {}", req_id);
{ {
@ -610,6 +603,13 @@ async fn handle_all (
} => { } => {
let (parts, _) = req.into_parts (); let (parts, _) = req.into_parts ();
let user = get_user_name (&parts);
state.audit_log.push (AuditEvent::new (AuditData::WebClientGet {
user,
server_name: listen_code.to_string (),
uri: path.to_string (),
})).await;
handle_http_request (parts, path.to_string (), &state, listen_code).await? handle_http_request (parts, path.to_string (), &state, listen_code).await?
}, },
ClientServerList => handle_server_list (state, handlebars).await?, ClientServerList => handle_server_list (state, handlebars).await?,

View File

@ -121,6 +121,10 @@ pub enum AuditData {
server: crate::config::file::Server, server: crate::config::file::Server,
}, },
RelayStart, RelayStart,
ScraperGet {
key_name: String,
path: String,
},
WebClientGet { WebClientGet {
user: Option <String>, user: Option <String>,
server_name: String, server_name: String,
@ -312,7 +316,7 @@ impl Builder {
self self
} }
pub fn scraper_key (mut self, key: crate::key_validity::ScraperKey <crate::key_validity::Valid30Days>) pub fn scraper_key (mut self, key: crate::key_validity::ScraperKey)
-> Self -> Self
{ {
self.config.scraper_keys.insert (key.hash.encode_base64 (), key); self.config.scraper_keys.insert (key.hash.encode_base64 (), key);

View File

@ -125,6 +125,7 @@ pub fn route_url <'a> (method: &Method, path: &'a str) -> Result <Route <'a>, Er
}) })
} }
else { else {
tracing::error! ("URL routing failed for `{}`", path);
Err (Error::NotFound) Err (Error::NotFound)
} }
} }

View File

@ -127,6 +127,11 @@ async fn api_v1 (
) )
-> Result <Response <Body>, RequestError> -> Result <Response <Body>, RequestError>
{ {
use crate::{
AuditData,
AuditEvent,
};
let api_key = req.headers ().get ("X-ApiKey"); let api_key = req.headers ().get ("X-ApiKey");
let api_key = match api_key { let api_key = match api_key {
@ -138,6 +143,8 @@ async fn api_v1 (
let bad_key = || error_reply (StatusCode::FORBIDDEN, strings::FORBIDDEN); let bad_key = || error_reply (StatusCode::FORBIDDEN, strings::FORBIDDEN);
let key_name;
{ {
let config = state.config.read ().await; let config = state.config.read ().await;
@ -160,8 +167,15 @@ async fn api_v1 (
return Ok (bad_key ()?); return Ok (bad_key ()?);
}, },
} }
key_name = expected_key.name.to_string ();
} }
state.audit_log.push (AuditEvent::new (AuditData::ScraperGet {
key_name,
path: path_rest.to_string (),
})).await;
if path_rest == "test" { if path_rest == "test" {
Ok (error_reply (StatusCode::OK, "You're valid!")?) Ok (error_reply (StatusCode::OK, "You're valid!")?)
} }
@ -224,7 +238,6 @@ mod tests {
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use crate::{ use crate::{
config,
key_validity, key_validity,
}; };
use super::*; use super::*;

View File

@ -0,0 +1,31 @@
# How scraper keys work
Come up with a random passphrase:
`not this, this is a bogus passphrase for documentation`
Run that through the `hash-api-key` subcommand of any `ptth_relay` instance:
`ptth_relay hash-api-key`
You'll get a hash like this:
`RUWt1hQQuHIRjftOdgeZf0PG/DtAmIaMqot/nwBAZXQ=`
Make sure that gets into the relay's config file, `ptth_relay.toml`:
```
[[scraper_keys]]
name = "shudder_mummy"
not_before = "2021-08-27T19:20:25-05:00"
not_after = "2031-08-27T19:20:25-05:00"
hash = "RUWt1hQQuHIRjftOdgeZf0PG/DtAmIaMqot/nwBAZXQ="
```
Use curl to like, try it out:
```
curl \
--header "X-ApiKey: not this, this is a bogus passphrase for documentation" \
http://localhost:4000/scraper/v1/test
```

1253
eff_short_wordlist_1.txt Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,18 @@
[package]
name = "ptth_quic_client_gui"
version = "0.1.0"
authors = ["Trish"]
edition = "2018"
license = "AGPL-3.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.38"
fltk = "1.1.1"
quic_demo = { path = "../quic_demo" }
quinn = "0.7.2"
structopt = "0.3.20"
tokio = { version = "1.8.1", features = ["full"] }
tracing-subscriber = "0.2.16"
tracing = "0.1.25"

View File

@ -10,10 +10,12 @@ use fltk::{
window::Window window::Window
}; };
use structopt::StructOpt; use structopt::StructOpt;
use tokio::net::TcpListener;
use quic_demo::prelude::*; use quic_demo::{
use protocol::PeerId; client_proxy::*,
prelude::*,
protocol::PeerId,
};
#[derive (Debug, StructOpt)] #[derive (Debug, StructOpt)]
struct Opt { struct Opt {
@ -122,19 +124,13 @@ fn main () -> anyhow::Result <()> {
Some (Message::OpenPort (port_idx)) => { Some (Message::OpenPort (port_idx)) => {
if let Ok (params) = gui_ports [port_idx].get_params () { if let Ok (params) = gui_ports [port_idx].get_params () {
let connection_p2_p3 = connection_p2_p3.clone (); let connection_p2_p3 = connection_p2_p3.clone ();
let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true);
let task = rt.spawn (forward_port ( let _guard = rt.enter ();
forwarding_instances [port_idx].replace (ForwardingInstance::new (
connection_p2_p3, connection_p2_p3,
params, params,
shutdown_flag_rx
)); ));
forwarding_instances [port_idx].replace (ForwardingInstance {
task,
shutdown_flag,
});
gui_ports [port_idx].set_forwarding (true); gui_ports [port_idx].set_forwarding (true);
frame_status.set_label ("Forwarding 1 port"); frame_status.set_label ("Forwarding 1 port");
@ -142,11 +138,7 @@ fn main () -> anyhow::Result <()> {
}, },
Some (Message::ClosePort (port_idx)) => { Some (Message::ClosePort (port_idx)) => {
if let Some (old_instance) = forwarding_instances [port_idx].take () { if let Some (old_instance) = forwarding_instances [port_idx].take () {
rt.block_on (async { rt.block_on (old_instance.close ())?;
old_instance.shutdown_flag.send (false)?;
old_instance.task.await??;
Ok::<_, anyhow::Error> (())
})?;
} }
gui_ports [port_idx].set_forwarding (false); gui_ports [port_idx].set_forwarding (false);
@ -168,125 +160,6 @@ fn set_active <W: WidgetExt> (w: &mut W, b: bool) {
} }
} }
struct ForwardingInstance {
task: tokio::task::JoinHandle <anyhow::Result <()>>,
shutdown_flag: tokio::sync::watch::Sender <bool>,
}
async fn forward_port (
connection_p2_p3: quinn::Connection,
params: ForwardingParams,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()>
{
let ForwardingParams {
client_tcp_port,
server_id,
server_tcp_port,
} = params;
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
while *shutdown_flag_rx.borrow () {
let mut shutdown_flag_rx_2 = shutdown_flag_rx.clone ();
tokio::select! {
x = listener.accept () => {
let (tcp_socket, _) = x?;
let connection = connection_p2_p3.clone ();
let server_id = server_id.clone ();
let shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx));
},
_ = shutdown_flag_rx_2.changed () => (),
};
}
Ok::<_, anyhow::Error> (())
}
async fn handle_p1 (
connection: quinn::Connection,
server_id: String,
server_tcp_port: u16,
tcp_socket: tokio::net::TcpStream,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()>
{
let (mut local_recv, mut local_send) = tcp_socket.into_split ();
debug! ("Starting PTTH connection");
let (mut relay_send, mut relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?;
trace! ("Relaying bytes...");
let task_blue = {
let mut shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (async move {
let mut buf = vec! [0u8; 65_536];
while *shutdown_flag_rx.borrow () {
trace! ("Blue reading from QUIC...");
tokio::select! {
x = relay_recv.read (&mut buf) => {
let bytes_read = match x? {
None => break,
Some (0) => break,
Some (x) => x,
};
let buf_slice = &buf [0..bytes_read];
trace! ("Uplink relaying {} bytes", bytes_read);
local_send.write_all (buf_slice).await?;
},
_ = shutdown_flag_rx.changed () => (),
};
}
debug! ("Blue QUIC --> TCP closed");
Ok::<_, anyhow::Error> (())
})
};
let task_green = {
let mut shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (async move {
let mut buf = vec! [0u8; 65_536];
while *shutdown_flag_rx.borrow () {
trace! ("Green reading from TCP...");
tokio::select! {
x = local_recv.read (&mut buf) => {
let bytes_read = match x? {
0 => break,
x => x,
};
let buf_slice = &buf [0..bytes_read];
trace! ("Downlink relaying {} bytes", bytes_read);
relay_send.write_all (buf_slice).await?;
},
_ = shutdown_flag_rx.changed () => (),
};
}
debug! ("Green TCP --> QUIC closed");
Ok::<_, anyhow::Error> (())
})
};
task_blue.await??;
task_green.await??;
debug! ("Ended PTTH connection");
Ok (())
}
struct GuiPort { struct GuiPort {
input_client_port: Input, input_client_port: Input,
input_server_id: Input, input_server_id: Input,
@ -295,12 +168,6 @@ struct GuiPort {
but_close: Button, but_close: Button,
} }
struct ForwardingParams {
client_tcp_port: u16,
server_id: String,
server_tcp_port: u16,
}
impl GuiPort { impl GuiPort {
fn new (fltk_tx: fltk::app::Sender <Message>, x: &mut i32, y: i32, port_idx: usize) -> Self { fn new (fltk_tx: fltk::app::Sender <Message>, x: &mut i32, y: i32, port_idx: usize) -> Self {
let margin = 10; let margin = 10;

View File

@ -1,4 +1 @@
# TLS certs used for QUIC experiments
*.crt
/app_packages /app_packages

View File

@ -0,0 +1,8 @@
# Top 10 TODO items by priority
- Allow lazy P2 --> P3 connections
- Allow multiple relays in P2 GUI
- Integrate relay server into `ptth_relay`
- Integrate server proxy into `ptth_server`
- Auth for client proxies
- Auth for server proxies

View File

@ -0,0 +1,15 @@
# Pie in the sky ideas
These aren't good enough for the main TODO list, but I think they would be
cool. Most of them are a combo of "Too much work" and "Too little demand".
They're not weekend projects with obvious payoff, they're month-long projects
with a high risk of failure.
- Custom VNC protocol that uses datagrams (Would need to forward datagrams
within PTTH too)
- Cross-platform pull-style backups (Doing backups well is its own field of
study. But I think convenient pull backups are worth it, even if they're
not as perfect as Borg)
- Remote shell for Windows (Just admitting that PTTH is basically malware
minus the mal)
- Generic file send / receive with a decent GUI (This is actually feasible)

View File

@ -28,7 +28,7 @@ async fn main () -> anyhow::Result <()> {
let relay_addr = opt.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; let relay_addr = opt.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?;
let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?; let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?;
trace! ("Connecting to relay server"); debug! ("Connecting to relay server");
let client_id = opt.client_id.unwrap_or_else (|| "bogus_client".to_string ()); let client_id = opt.client_id.unwrap_or_else (|| "bogus_client".to_string ());
@ -46,7 +46,7 @@ async fn main () -> anyhow::Result <()> {
let server_tcp_port = opt.server_tcp_port.unwrap_or (30382); let server_tcp_port = opt.server_tcp_port.unwrap_or (30382);
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?; let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1"); debug! ("Accepting local TCP connections from P1");
// End of per-port stuff // End of per-port stuff
// Beginning of per-connection stuff // Beginning of per-connection stuff

View File

@ -0,0 +1,159 @@
use tokio::{
net::TcpListener,
sync::watch,
task::JoinHandle,
};
use crate::prelude::*;
pub struct ForwardingInstance {
task: JoinHandle <anyhow::Result <()>>,
shutdown_flag: watch::Sender <bool>,
}
impl ForwardingInstance {
pub fn new (
connection_p2_p3: quinn::Connection,
params: ForwardingParams,
) -> Self
{
let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true);
let task = tokio::spawn (forward_port (
connection_p2_p3,
params,
shutdown_flag_rx
));
Self {
task,
shutdown_flag,
}
}
pub async fn close (self) -> anyhow::Result <()> {
self.shutdown_flag.send (false)?;
self.task.await??;
Ok (())
}
}
pub struct ForwardingParams {
pub client_tcp_port: u16,
pub server_id: String,
pub server_tcp_port: u16,
}
async fn forward_port (
connection_p2_p3: quinn::Connection,
params: ForwardingParams,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()>
{
let ForwardingParams {
client_tcp_port,
server_id,
server_tcp_port,
} = params;
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
while *shutdown_flag_rx.borrow () {
let mut shutdown_flag_rx_2 = shutdown_flag_rx.clone ();
tokio::select! {
x = listener.accept () => {
let (tcp_socket, _) = x?;
let connection = connection_p2_p3.clone ();
let server_id = server_id.clone ();
let shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx));
},
_ = shutdown_flag_rx_2.changed () => (),
};
}
Ok::<_, anyhow::Error> (())
}
async fn handle_p1 (
connection: quinn::Connection,
server_id: String,
server_tcp_port: u16,
tcp_socket: tokio::net::TcpStream,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()>
{
let (mut local_recv, mut local_send) = tcp_socket.into_split ();
debug! ("Starting PTTH connection");
let (mut relay_send, mut relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?;
trace! ("Relaying bytes...");
let task_blue = {
let mut shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (async move {
let mut buf = vec! [0u8; 65_536];
while *shutdown_flag_rx.borrow () {
trace! ("Blue reading from QUIC...");
tokio::select! {
x = relay_recv.read (&mut buf) => {
let bytes_read = match x? {
None => break,
Some (0) => break,
Some (x) => x,
};
let buf_slice = &buf [0..bytes_read];
trace! ("Uplink relaying {} bytes", bytes_read);
local_send.write_all (buf_slice).await?;
},
_ = shutdown_flag_rx.changed () => (),
};
}
debug! ("Blue QUIC --> TCP closed");
Ok::<_, anyhow::Error> (())
})
};
let task_green = {
let mut shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (async move {
let mut buf = vec! [0u8; 65_536];
while *shutdown_flag_rx.borrow () {
trace! ("Green reading from TCP...");
tokio::select! {
x = local_recv.read (&mut buf) => {
let bytes_read = match x? {
0 => break,
x => x,
};
let buf_slice = &buf [0..bytes_read];
trace! ("Downlink relaying {} bytes", bytes_read);
relay_send.write_all (buf_slice).await?;
},
_ = shutdown_flag_rx.changed () => (),
};
}
debug! ("Green TCP --> QUIC closed");
Ok::<_, anyhow::Error> (())
})
};
task_blue.await??;
task_green.await??;
debug! ("Ended PTTH connection");
Ok (())
}

View File

@ -1,3 +1,4 @@
pub mod client_proxy;
pub mod connection; pub mod connection;
pub mod prelude; pub mod prelude;
pub mod protocol; pub mod protocol;

3
todone.md Normal file
View File

@ -0,0 +1,3 @@
# Completed todos
- Split client proxy into its own crate to isolate FLTK dep