use std::{ convert::TryFrom, }; use anyhow::Result; use quinn::{ SendStream, RecvStream, }; use crate::prelude::*; pub type PeerId = String; const MAX_ID_LENGTH: usize = 128; #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Command (pub u8); // I can't remember how I picked the numbers. Just increment I guess, // and then switch to a variable-length format around 200. impl Command { pub const CONNECT_P2_TO_P3: Command = Command (2); pub const CONNECT_P4_TO_P3: Command = Command (4); pub const CONNECT_P2_TO_P4: Command = Command (10); pub const CONNECT_P2_TO_P4_STEP_2: Command = Command (11); pub const CONNECT_P2_TO_P5: Command = Command (12); pub const OKAY: Command = Command (20); pub const DIREC_P2_TO_P3: Command = Command (21); } pub async fn p2_connect_to_p3 ( endpoint: &quinn::Endpoint, relay_addr: std::net::SocketAddr, client_id: &str, ) -> Result { if client_id.as_bytes ().len () > MAX_ID_LENGTH { bail! ("Client ID is longer than MAX_ID_LENGTH"); } let new_conn = endpoint.connect (relay_addr, "localhost")?.await?; let (mut send, mut recv) = new_conn.connection.open_bi ().await?; let cmd_type = Command::CONNECT_P2_TO_P3.0; send.write_all (&[cmd_type, 0, 0, 0]).await?; send_lv_string (&mut send, client_id).await?; expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await .context ("P2 didn't get OK response when connecting to P3")?; Ok (new_conn) } pub async fn p2_connect_to_p5 ( connection: &quinn::Connection, server_id: &str, server_port: u16, ) -> Result <(SendStream, RecvStream)> { let (mut send, mut recv) = connection.open_bi ().await?; // Ask P3 if we can connect to P4 let cmd_type = Command::CONNECT_P2_TO_P4.0; send.write_all (&[cmd_type, 0, 0, 0]).await?; send_lv_string (&mut send, server_id).await?; expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await .context ("P2 didn't get OK response when asking P3 to connect P2 to P4")?; // Ask P4 if we can connect to P5 let cmd_type = Command::CONNECT_P2_TO_P5.0; send.write_all (&[cmd_type, 0, 0, 0]).await?; send.write_all (&server_port.to_le_bytes ()).await?; expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await .context ("P2 didn't get OK response when asking P4 to connect P2 to P5")?; Ok ((send, recv)) } pub async fn p2_direc_to_p4 ( connection: &quinn::Connection, server_id: &str, ) -> Result > { let (mut send, mut recv) = connection.open_bi ().await?; let cmd_type = Command::DIREC_P2_TO_P3.0; let mut cookie = vec! [0u8; 32]; rand::thread_rng ().fill_bytes (&mut cookie [..]); let cookie = cookie; send.write_all (&[cmd_type, 0, 0, 0]).await?; send_lv_string (&mut send, server_id).await?; send_lv_u16 (&mut send, &cookie).await?; debug! ("Waiting for OK response for DIREC"); expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await?; Ok (cookie) } pub enum P3Peer { P2ClientProxy (P2ClientProxy), P4ServerProxy (P4ServerProxy), } pub struct P2ClientProxy { pub id: PeerId, } pub struct P4ServerProxy { pub id: PeerId, } pub async fn p3_accept_peer ( recv: &mut RecvStream, ) -> Result { let mut buf = [0, 0, 0, 0]; recv.read_exact (&mut buf).await?; let command = Command (buf [0]); Ok (match command { Command::CONNECT_P2_TO_P3 => { let id = recv_lv_string (recv, MAX_ID_LENGTH).await?; debug! ("Client-side proxy (P2) connected, ID {}", id); P3Peer::P2ClientProxy (P2ClientProxy { id, }) }, Command::CONNECT_P4_TO_P3 => { let id = recv_lv_string (recv, MAX_ID_LENGTH).await?; debug! ("Server-side proxy (P4) connected, ID {}", id); P3Peer::P4ServerProxy (P4ServerProxy { id, }) }, _ => bail! ("Unknown QUIC client type"), }) } pub async fn p3_authorize_p2_peer ( send: &mut SendStream, ) -> Result <()> { send.write_all (&[Command::OKAY.0, Command::CONNECT_P2_TO_P3.0, 0, 0]).await?; Ok (()) } pub async fn p3_authorize_p4_peer ( send: &mut SendStream, ) -> Result <()> { send.write_all (&[Command::OKAY.0, Command::CONNECT_P4_TO_P3.0, 0, 0]).await?; Ok (()) } pub async fn p3_connect_p2_to_p4 ( connection: &quinn::Connection, client_id: &str, ) -> Result <(SendStream, RecvStream)> { if client_id.as_bytes ().len () > MAX_ID_LENGTH { bail! ("Client ID is longer than MAX_ID_LENGTH"); } let (mut send, mut recv) = connection.open_bi ().await?; let cmd_type = Command::CONNECT_P2_TO_P4_STEP_2.0; let buf = [cmd_type, 0, 0, 0]; send.write_all (&buf).await?; send_lv_string (&mut send, client_id).await?; expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await .context ("P3 didn't get OK response when asking P4 to connect P2 to P4")?; Ok ((send, recv)) } pub enum P2ToP3Stream { ConnectP2ToP4 { server_id: PeerId, }, DirecP2ToP4 { /// P2 wants a P2P connection to this P4 server_id: PeerId, /// P2 will send this cookie over plain UDP to P3 /// P3 will learn P2's WAN address from that. cookie: Vec , }, } pub async fn p3_accept_p2_stream ( recv: &mut RecvStream, ) -> Result { let mut buf = [0, 0, 0, 0]; recv.read_exact (&mut buf).await?; let cmd_type = buf [0]; Ok (match Command (cmd_type) { Command::CONNECT_P2_TO_P4 => { let server_id = recv_lv_string (recv, MAX_ID_LENGTH).await?; P2ToP3Stream::ConnectP2ToP4 { server_id, } }, Command::DIREC_P2_TO_P3 => { let server_id = recv_lv_string (recv, MAX_ID_LENGTH).await?; let cookie = recv_lv_u16 (recv, 64).await?; P2ToP3Stream::DirecP2ToP4 { server_id, cookie, } }, _ => bail! ("Invalid command type while P3 was accepting a new bi stream from P2"), }) } pub async fn p3_authorize_p2_to_p4_connection ( send: &mut SendStream, ) -> Result <()> { send.write_all (&[Command::OKAY.0, Command::CONNECT_P2_TO_P4.0, 0, 0]).await?; Ok (()) } pub async fn p3_authorize_p2_to_p4_direc ( send: &mut SendStream, ) -> Result <()> { send.write_all (&[Command::OKAY.0, Command::DIREC_P2_TO_P3.0, 0, 0]).await?; Ok (()) } pub async fn p4_connect_to_p3 ( endpoint: &quinn::Endpoint, relay_addr: std::net::SocketAddr, server_id: &str, ) -> Result { if server_id.as_bytes ().len () > MAX_ID_LENGTH { bail! ("Server ID is longer than MAX_ID_LENGTH"); } let new_conn = endpoint.connect (relay_addr, "localhost")?.await?; let (mut send, mut recv) = new_conn.connection.open_bi ().await?; let cmd_type = Command::CONNECT_P4_TO_P3.0; send.write_all (&[cmd_type, 0, 0, 0]).await?; send_lv_string (&mut send, server_id).await?; expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await .context ("P4 didn't get OK response when connecting to P3")?; Ok (new_conn) } pub enum P3ToP4Stream { NewPtthConnection { client_id: PeerId, } } pub async fn p4_accept_p3_stream ( recv: &mut RecvStream, ) -> Result { let mut buf = [0, 0, 0, 0]; recv.read_exact (&mut buf).await?; let cmd_type = buf [0]; Ok (match Command (cmd_type) { Command::CONNECT_P2_TO_P4_STEP_2 => { let client_id = recv_lv_string (recv, MAX_ID_LENGTH).await?; P3ToP4Stream::NewPtthConnection { client_id, } }, _ => bail! ("Invalid command type while P4 was accepting a new bi stream from P3"), }) } pub async fn p4_authorize_p2_connection ( send: &mut SendStream, ) -> Result <()> { let buf = [ Command::OKAY.0, Command::CONNECT_P2_TO_P4_STEP_2.0, 0, 0, ]; send.write_all (&buf).await?; Ok (()) } pub async fn p4_authorize_p1_connection ( send: &mut SendStream, ) -> Result <()> { let buf = [ Command::OKAY.0, Command::CONNECT_P2_TO_P5.0, 0, 0, ]; send.write_all (&buf).await?; Ok (()) } pub struct P2ConnectToP5Request { pub port: u16, } pub async fn p4_expect_p5_request ( recv: &mut RecvStream, ) -> Result { let mut buf = [0, 0, 0, 0]; recv.read_exact (&mut buf).await?; let cmd_type = Command (buf [0]); if cmd_type != Command::CONNECT_P2_TO_P5 { bail! ("P4 expected CONNECT_P2_TO_P5 but P2 sent something different"); } let mut port_buf = [0, 0]; recv.read_exact (&mut port_buf).await?; let port = u16::from_le_bytes (port_buf); Ok (P2ConnectToP5Request { port, }) } async fn expect_exact_response ( recv: &mut RecvStream, expected: [u8; 4] ) -> Result <()> { let mut buf = [0, 0, 0, 0]; recv.read_exact (&mut buf).await?; if buf != expected { bail! ("Didn't receive exact response, got {:?}", buf); } Ok (()) } async fn send_lv_u16 (send: &mut SendStream, buf: &[u8]) -> Result <()> { if buf.len () >= 65_536 { bail! ("Buffer is too long to send with u16 length prefix"); } let len = u16::try_from (buf.len ())?; send.write_all (&len.to_le_bytes ()).await?; send.write_all (buf).await?; Ok (()) } async fn send_lv_string (send: &mut SendStream, s: &str) -> Result <()> { send_lv_u16 (send, s.as_bytes ()).await?; Ok (()) } async fn recv_lv_u16 (recv: &mut RecvStream, max_len: usize) -> Result > { let mut len_buf = [0, 0]; recv.read_exact (&mut len_buf).await?; let len = u16::from_le_bytes (len_buf); let len = usize::try_from (len)?; if len > max_len { bail! ("Buffer is longer than max_len"); } // We could skip this allocation but who cares let mut buf = vec! [0u8; len]; recv.read_exact (&mut buf).await?; Ok (buf) } async fn recv_lv_string (recv: &mut RecvStream, max_len: usize) -> Result { let buf = recv_lv_u16 (recv, max_len).await?; let s = String::from_utf8 (buf)?; Ok (s) }