From 0129f36d2a9ad2d8787d5dc1e4c89557dea10968 Mon Sep 17 00:00:00 2001 From: _ <> Date: Mon, 11 Oct 2021 01:59:08 +0000 Subject: [PATCH] :construction: begin building PTTH_DIREC Working: - Client sends the first cookie to relay - Relay learns client's WAN address Next steps: - Associate PTTH_DIREC state with QUIC connection so we can track it / destroy it all at once - P3 asks P4 to accept PTTH_DIREC connection --- Cargo.lock | 1 + docs/explanation/ptth_direc.md | 73 +++++++++++++ prototypes/quic_demo/Cargo.toml | 1 + .../quic_demo/src/bin/quic_demo_client.rs | 28 +++++ .../src/bin/quic_demo_relay_server.rs | 100 +++++++++++++++++- prototypes/quic_demo/src/prelude.rs | 10 +- prototypes/quic_demo/src/protocol.rs | 53 ++++++++++ 7 files changed, 263 insertions(+), 3 deletions(-) create mode 100644 docs/explanation/ptth_direc.md diff --git a/Cargo.lock b/Cargo.lock index 9287c76..6644a46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1348,6 +1348,7 @@ dependencies = [ "futures-util", "hyper", "quinn", + "rand", "rcgen", "reqwest", "rmp-serde", diff --git a/docs/explanation/ptth_direc.md b/docs/explanation/ptth_direc.md new file mode 100644 index 0000000..8269928 --- /dev/null +++ b/docs/explanation/ptth_direc.md @@ -0,0 +1,73 @@ +# PTTH_DIREC - Direct P2P connections + +_It could work, even!_ + +To keep each ridiculous new feature simple, we'll rely on bootstrapping: + +1. PTTH is just HTTPS connections +2. PTTH_QUIC uses a PTTH relay to download the QUIC cert and bootstrap +3. PTTH_DIREC will use a PTTH_QUIC relay to bootstrap + +# Overview + +Given that: + +- P2 is connected to P3 +- P4 is connected to P3 + +Steps: + +- S1. P2 starts a bi stream to P3 +- S2.0. P2 says, "I want to initiate a PTTH_DIREC connection..." +- "... And I'll send you cookie X to do hole-punching..." +- "... And I want to connect to end server Y..." +- S3.0. P3 creates an ID for this connection +- S3.1. P3 replies "go ahead" to P2 +- S4. P3 starts a bi stream to P4 (end server Y) +- S5. P3 says, "I want you to accept a PTTH_DIREC connection..." +- "... And you should send me cookie Z to do hole-punching..." +- "... And the client will be client W..." +- S6. P3 waits for P4 to accept the offer +- S7. P3 waits for both cookies to arrive +- S8. When the cookies arrive, P3 learns the WAN addresses of P2 and P4 +- S9. P3 sends the WAN addresses of P2 and P4 to each other (on the existing bi streams) +- S10. P4 tries to connect directly to P2 +- S11. P2 does the same to P4 +- S12. When P4 sees round-tripped data, it attempts to upgrade to QUIC +- S13. When P2 sees round-tripped data, it attempts to upgrade to QUIC +- Cool stuff happens over QUIC +- ReactorScram implements the rest of the protocol + +P2's PoV: + +- S1. Start a bi stream to P3 +- S2.0. Send cookie and server ID +- S2.1. Wait for go-ahead signal (this may contain the hole-punch address and a new cookie for P4) +- S2.2. Send cookie to hole-punch address repeatedly +- S2.3. While you're sending the cookie, wait to hear P4's WAN address +- S9. Learn P4's WAN address +- S10. Send the new cookie to P4's address +- S12. When you see round-tripped data, upgrade to QUIC + +P4's PoV: + +- S4. Accept a bi stream from P3 +- S5. Receive cookie and client ID +- S6. Reply "OK" +- S7.0. Send cookie to hole-punch address repeatedly +- S7.1. While sending the cookie, wait to hear P2's WAN address +- S9. Learn P2's WAN address +- S10. Try to connect directly to P2 +- S12. When you see round-tripped data, upgrade to QUIC + +Commands needed: + +- ??? + +# Decisions + +I'll add a delay between giving P2's address to P4, and giving P4's address to P2. +This miiiight change things slightly if P4's firewall is suspicious of packets +coming in too early, but I doubt it. + +The delay is easy to remove relay-side if it doesn't help. diff --git a/prototypes/quic_demo/Cargo.toml b/prototypes/quic_demo/Cargo.toml index 9e591f3..98e2072 100644 --- a/prototypes/quic_demo/Cargo.toml +++ b/prototypes/quic_demo/Cargo.toml @@ -15,6 +15,7 @@ ctrlc = "3.2.1" futures-util = "0.3.9" hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] } quinn = "0.7.2" +rand = "0.8.4" rcgen = "0.8.11" reqwest = "0.11.4" rmp-serde = "0.15.5" diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index 6208d31..a015744 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -1,5 +1,6 @@ use structopt::StructOpt; use tokio::{ + net::UdpSocket, sync::watch, }; @@ -98,7 +99,34 @@ impl P2Client { }) }; + if false { + let task_direc_connect = { + let connection = connection.clone (); + + tokio::spawn (async move { + let cookie = protocol::p2_direc_to_p4 ( + &connection, + "bogus_server", + ).await?; + + let sock = UdpSocket::bind ("0.0.0.0:0").await?; + + let mut interval = tokio::time::interval (Duration::from_millis (1000)); + interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Delay); + + loop { + interval.tick ().await; + sock.send_to(&cookie [..], "127.0.0.1:30379").await?; + debug! ("P2 sent cookie to P3 over plain UDP"); + } + + Ok::<_, anyhow::Error> (()) + }) + }; + } + task_tcp_server.await??; + //task_direc_connect.await??; Ok (()) } diff --git a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs index e7be1a3..11c6ad8 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs @@ -10,7 +10,10 @@ use hyper::{ StatusCode, }; use structopt::StructOpt; -use tokio::sync::watch; +use tokio::{ + net::UdpSocket, + sync::watch, +}; use quic_demo::prelude::*; use protocol::PeerId; @@ -84,6 +87,35 @@ async fn main () -> anyhow::Result <()> { }) }; + let task_direc_server = { + let relay_state = Arc::clone (&relay_state); + + tokio::spawn (async move { + let sock = UdpSocket::bind("0.0.0.0:30379").await?; + let mut buf = [0; 2048]; + loop { + let (len, addr) = sock.recv_from (&mut buf).await?; + debug! ("{:?} bytes received from {:?}", len, addr); + + let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x)); + + { + let mut direc_cookies = relay_state.direc_cookies.lock ().await; + + if let Some (direc_state) = direc_cookies.remove (&packet) { + debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id); + direc_state.p2_addr.send (addr).ok (); + } + else { + debug! ("UDP packet didn't match any PTTH_DIREC cookie"); + } + } + } + + Ok::<_, anyhow::Error> (()) + }) + }; + let task_http_server = tokio::spawn (async move { http_server.serve (make_svc).await?; Ok::<_, anyhow::Error> (()) @@ -128,6 +160,7 @@ async fn main () -> anyhow::Result <()> { task_quic_server.await??; task_http_server.await??; task_tcp_server.await??; + task_direc_server.await??; Ok (()) } @@ -153,9 +186,16 @@ async fn handle_http (_req: Request , relay_state: Arc ) #[derive (Default)] struct RelayState { p4_server_proxies: Mutex >, + direc_cookies: Mutex , DirecState>>, stats: Stats, } +struct DirecState { + start_time: Instant, + p2_id: PeerId, + p2_addr: tokio::sync::oneshot::Sender , +} + #[derive (Default)] struct Stats { quic: ConnectEvents, @@ -320,7 +360,28 @@ async fn handle_p2_connection ( match protocol::p3_accept_p2_stream (&mut recv).await? { protocol::P2ToP3Stream::ConnectP2ToP4 { server_id, - } => handle_request_p2_to_p4 (relay_state, client_id, server_id, send, recv).await?, + } => { + handle_request_p2_to_p4 ( + relay_state, + client_id, + server_id, + send, + recv + ).await? + }, + protocol::P2ToP3Stream::DirecP2ToP4 { + server_id, + cookie, + } => { + handle_direc_p2_to_p4 ( + relay_state, + client_id, + server_id, + cookie, + send, + recv + ).await? + }, } debug! ("Request ended for P2"); @@ -364,6 +425,41 @@ async fn handle_request_p2_to_p4 ( Ok (()) } +async fn handle_direc_p2_to_p4 ( + relay_state: Arc , + client_id: String, + server_id: PeerId, + cookie: Vec , + mut client_send: quinn::SendStream, + client_recv: quinn::RecvStream, +) -> anyhow::Result <()> +{ + debug! ("P2 {} wants a P2P connection to P4 {}", client_id, server_id); + + // TODO: Check authorization + + protocol::p3_authorize_p2_to_p4_direc (&mut client_send).await?; + + let (tx, rx) = tokio::sync::oneshot::channel (); + + { + let mut direc_cookies = relay_state.direc_cookies.lock ().await; + direc_cookies.insert (cookie, DirecState { + start_time: Instant::now (), + p2_id: client_id.clone (), + p2_addr: tx, + }); + } + + debug! ("Waiting to learn P2's WAN address..."); + + let wan_addr = rx.await?; + + debug! ("And that WAN address is {}", wan_addr); + + Ok (()) +} + async fn handle_p4_connection ( relay_state: Arc , conn: quinn::NewConnection, diff --git a/prototypes/quic_demo/src/prelude.rs b/prototypes/quic_demo/src/prelude.rs index 7515241..d091c0d 100644 --- a/prototypes/quic_demo/src/prelude.rs +++ b/prototypes/quic_demo/src/prelude.rs @@ -1,6 +1,7 @@ pub use std::{ collections::*, ffi::OsString, + iter::FromIterator, net::SocketAddr, sync::{ Arc, @@ -9,7 +10,10 @@ pub use std::{ Ordering, }, }, - time::Duration, + time::{ + Duration, + Instant, + }, }; pub use anyhow::{ @@ -29,6 +33,10 @@ pub use tokio::{ }, task::JoinHandle, }; +pub use rand::{ + Rng, + RngCore, +}; pub use tracing::{ debug, error, diff --git a/prototypes/quic_demo/src/protocol.rs b/prototypes/quic_demo/src/protocol.rs index 56f05a8..9bc24e5 100644 --- a/prototypes/quic_demo/src/protocol.rs +++ b/prototypes/quic_demo/src/protocol.rs @@ -16,6 +16,9 @@ const MAX_ID_LENGTH: usize = 128; #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Command (pub u8); +// I can't remember how I picked the numbers. Just increment I guess, +// and then switch to a variable-length format around 200. + impl Command { pub const CONNECT_P2_TO_P3: Command = Command (2); pub const CONNECT_P4_TO_P3: Command = Command (4); @@ -23,6 +26,7 @@ impl Command { pub const CONNECT_P2_TO_P4_STEP_2: Command = Command (11); pub const CONNECT_P2_TO_P5: Command = Command (12); pub const OKAY: Command = Command (20); + pub const DIREC_P2_TO_P3: Command = Command (21); } pub async fn p2_connect_to_p3 ( @@ -79,6 +83,30 @@ pub async fn p2_connect_to_p5 ( Ok ((send, recv)) } +pub async fn p2_direc_to_p4 ( + connection: &quinn::Connection, + server_id: &str, +) -> Result > +{ + let (mut send, mut recv) = connection.open_bi ().await?; + + let cmd_type = Command::DIREC_P2_TO_P3.0; + + let mut cookie = vec! [0u8; 32]; + rand::thread_rng ().fill_bytes (&mut cookie [..]); + let cookie = cookie; + + send.write_all (&[cmd_type, 0, 0, 0]).await?; + send_lv_string (&mut send, server_id).await?; + send_lv_u16 (&mut send, &cookie).await?; + + debug! ("Waiting for OK response for DIREC"); + + expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await?; + + Ok (cookie) +} + pub enum P3Peer { P2ClientProxy (P2ClientProxy), P4ServerProxy (P4ServerProxy), @@ -163,6 +191,14 @@ pub enum P2ToP3Stream { ConnectP2ToP4 { server_id: PeerId, }, + DirecP2ToP4 { + /// P2 wants a P2P connection to this P4 + server_id: PeerId, + + /// P2 will send this cookie over plain UDP to P3 + /// P3 will learn P2's WAN address from that. + cookie: Vec , + }, } pub async fn p3_accept_p2_stream ( @@ -182,6 +218,15 @@ pub async fn p3_accept_p2_stream ( server_id, } }, + Command::DIREC_P2_TO_P3 => { + let server_id = recv_lv_string (recv, MAX_ID_LENGTH).await?; + let cookie = recv_lv_u16 (recv, 64).await?; + + P2ToP3Stream::DirecP2ToP4 { + server_id, + cookie, + } + }, _ => bail! ("Invalid command type while P3 was accepting a new bi stream from P2"), }) } @@ -194,6 +239,14 @@ pub async fn p3_authorize_p2_to_p4_connection ( Ok (()) } +pub async fn p3_authorize_p2_to_p4_direc ( + send: &mut SendStream, +) -> Result <()> +{ + send.write_all (&[Command::OKAY.0, Command::DIREC_P2_TO_P3.0, 0, 0]).await?; + Ok (()) +} + pub async fn p4_connect_to_p3 ( endpoint: &quinn::Endpoint, relay_addr: &std::net::SocketAddr,