From be03300f551e1b5500ef26513a8a7efd8979ba62 Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Sun, 9 Oct 2022 16:43:50 +0000 Subject: [PATCH] :recycle: refactor: break out most of the relay server into a module I want an end-to-end test that runs all 5 nodes in 1 process, like PTTH itself has. --- .../src/bin/ptth_quic_relay_server.rs | 515 +----------------- .../ptth_quic/src/executable_relay_server.rs | 514 +++++++++++++++++ crates/ptth_quic/src/lib.rs | 1 + 3 files changed, 527 insertions(+), 503 deletions(-) create mode 100644 crates/ptth_quic/src/executable_relay_server.rs diff --git a/crates/ptth_quic/src/bin/ptth_quic_relay_server.rs b/crates/ptth_quic/src/bin/ptth_quic_relay_server.rs index 4f2b420..85542fc 100644 --- a/crates/ptth_quic/src/bin/ptth_quic_relay_server.rs +++ b/crates/ptth_quic/src/bin/ptth_quic_relay_server.rs @@ -1,521 +1,30 @@ -use hyper::{ - Body, - Request, - Response, - Server, - service::{ - make_service_fn, - service_fn, - }, - StatusCode, -}; -use structopt::StructOpt; -use tokio::{ - net::UdpSocket, - sync::watch, -}; +use tokio::sync::watch; use ptth_quic::prelude::*; -use protocol::PeerId; - -#[derive (Debug, StructOpt)] -struct Opt { - #[structopt (long)] - listen_addr: Option , -} #[tokio::main] async fn main () -> anyhow::Result <()> { + use structopt::StructOpt; + tracing_subscriber::fmt::init (); - let opt = Opt::from_args (); + let opt = ptth_quic::executable_relay_server::Opt::from_args (); - let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; - let (mut incoming, server_cert) = make_server_endpoint (listen_addr)?; - println! ("Base64 cert: {}", base64::encode (&server_cert)); + let (running_tx, mut running_rx) = watch::channel (true); - tokio::fs::create_dir_all ("ptth_quic_output").await?; - tokio::fs::write ("ptth_quic_output/quic_server.crt", &server_cert).await?; + ctrlc::set_handler (move || { + running_tx.send (false).expect ("Couldn't forward Ctrl+C signal"); + })?; + trace! ("Set Ctrl+C handler"); - let relay_state = Arc::new (RelayState::default ()); - - let make_svc = { - let relay_state = Arc::clone (&relay_state); - make_service_fn (move |_conn| { - let relay_state = Arc::clone (&relay_state); + tokio::select! { + val = ptth_quic::executable_relay_server::main (opt) => { - async move { - Ok::<_, String> (service_fn (move |req| { - let relay_state = Arc::clone (&relay_state); - - handle_http (req, relay_state) - })) - } - }) - }; - - let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); - let http_server = Server::bind (&http_addr); - - let tcp_port = 30382; - let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?; - - let (_running_tx, running_rx) = watch::channel (true); - - let task_quic_server = { - let relay_state = Arc::clone (&relay_state); - tokio::spawn (async move { - while let Some (conn) = incoming.next ().await { - let relay_state = Arc::clone (&relay_state); - - // Each new peer QUIC connection gets its own task - tokio::spawn (async move { - let active = relay_state.stats.quic.connect (); - debug! ("QUIC connections: {}", active); - - match handle_quic_connection (Arc::clone (&relay_state), conn).await { - Ok (_) => (), - Err (e) => warn! ("handle_quic_connection `{:?}`", e), - } - - let active = relay_state.stats.quic.disconnect (); - debug! ("QUIC connections: {}", active); - }); - } - - Ok::<_, anyhow::Error> (()) - }) - }; - - let task_direc_server = { - let relay_state = Arc::clone (&relay_state); - - tokio::spawn (async move { - let sock = UdpSocket::bind("0.0.0.0:30379").await?; - let mut buf = [0; 2048]; - loop { - let (len, addr) = sock.recv_from (&mut buf).await?; - debug! ("{:?} bytes received from {:?}", len, addr); - - let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x)); - - { - let mut direc_cookies = relay_state.direc_cookies.lock ().await; - - if let Some (direc_state) = direc_cookies.remove (&packet) { - debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id); - direc_state.p2_addr.send (addr).ok (); - } - else { - debug! ("UDP packet didn't match any PTTH_DIREC cookie"); - } - } - } - - Ok::<_, anyhow::Error> (()) - }) - }; - - let task_http_server = tokio::spawn (async move { - http_server.serve (make_svc).await?; - Ok::<_, anyhow::Error> (()) - }); - - let task_tcp_server = { - let relay_state = Arc::clone (&relay_state); - tokio::spawn (async move { - while *running_rx.borrow () { - let (tcp_socket, _) = tcp_listener.accept ().await?; - - let relay_state = Arc::clone (&relay_state); - tokio::spawn (async move { - let (_client_recv, _client_send) = tcp_socket.into_split (); - - debug! ("Accepted direct TCP connection P1 --> P3"); - - let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; - let _p4 = match p4_server_proxies.get ("bogus_server") { - Some (x) => x, - None => bail! ("That server isn't connected"), - }; - - // unimplemented! (); - /* - p4.req_channel.send (RequestP2ToP4 { - client_send, - client_recv, - client_id: "bogus_client".to_string (), - }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; - */ - Ok (()) - }); - } - - Ok::<_, anyhow::Error> (()) - }) - }; - - debug! ("Serving HTTP on {:?}", http_addr); - - task_quic_server.await??; - task_http_server.await??; - task_tcp_server.await??; - task_direc_server.await??; - - Ok (()) -} - -async fn handle_http (_req: Request , relay_state: Arc ) --> anyhow::Result > -{ - let debug_string; - { - let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; - - debug_string = format! ("{:#?}\n", p4_server_proxies.keys ().collect::> ()); - } - - let resp = Response::builder () - .status (StatusCode::OK) - .header ("content-type", "text/plain") - .body (Body::from (debug_string))?; - - Ok (resp) -} - -#[derive (Default)] -struct RelayState { - p4_server_proxies: Mutex >, - direc_cookies: Mutex , DirecState>>, - stats: Stats, -} - -struct DirecState { - start_time: Instant, - p2_id: PeerId, - p2_addr: tokio::sync::oneshot::Sender , -} - -#[derive (Default)] -struct Stats { - quic: ConnectEvents, -} - -#[derive (Default)] -struct ConnectEvents { - connects: AtomicU64, - disconnects: AtomicU64, -} - -impl ConnectEvents { - fn connect (&self) -> u64 { - let connects = self.connects.fetch_add (1, Ordering::Relaxed) + 1; - let disconnects = self.disconnects.load (Ordering::Relaxed); - connects - disconnects - } - - fn disconnect (&self) -> u64 { - let disconnects = self.disconnects.fetch_add (1, Ordering::Relaxed) + 1; - let connects = self.connects.load (Ordering::Relaxed); - connects - disconnects - } - - fn _active (&self) -> u64 { - let connects = self.connects.load (Ordering::Relaxed); - let disconnects = self.disconnects.load (Ordering::Relaxed); - connects - disconnects - } -} - -struct P4State { - req_channel: mpsc::Sender , - -} - -impl RelayState { - -} - -struct RequestP2ToP4 { - client_send: quinn::SendStream, - client_recv: quinn::RecvStream, - client_id: String, -} - -struct PtthNewConnection { - client_send: quinn::SendStream, - client_recv: quinn::RecvStream, - server_send: quinn::SendStream, - server_recv: quinn::RecvStream, -} - -struct PtthConnection { - uplink_task: JoinHandle >, - downlink_task: JoinHandle >, -} - -impl PtthNewConnection { - fn build (self) -> PtthConnection { - let Self { - mut client_send, - mut client_recv, - mut server_send, - mut server_recv, - } = self; - - let uplink_task = tokio::spawn (async move { - // Uplink - Client to end server - - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = client_recv.read (&mut buf).await? { - if bytes_read == 0 { - break; - } - let buf_slice = &buf [0..bytes_read]; - trace! ("Uplink relaying {} bytes", bytes_read); - server_send.write_all (buf_slice).await?; - } - - trace! ("Uplink closed"); - - Ok::<_, anyhow::Error> (()) - }); - - let downlink_task = tokio::spawn (async move { - // Downlink - End server to client - - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = server_recv.read (&mut buf).await? { - let buf_slice = &buf [0..bytes_read]; - trace! ("Downlink relaying {} bytes", bytes_read); - client_send.write_all (buf_slice).await?; - } - - trace! ("Downlink closed"); - - Ok::<_, anyhow::Error> (()) - }); - - PtthConnection { - uplink_task, - downlink_task, - } - } -} - -async fn handle_quic_connection ( - relay_state: Arc , - conn: quinn::Connecting, -) -> anyhow::Result <()> -{ - let mut conn = conn.await?; - - // Everyone who connects must identify themselves with the first - // bi stream - // TODO: Timeout - - let (mut send, mut recv) = conn.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("QUIC client didn't identify itself"))??; - - let peer = protocol::p3_accept_peer (&mut recv).await?; - - match peer { - protocol::P3Peer::P2ClientProxy (peer) => { - trace! ("Accepting connection from P2 client"); - // TODO: Check authorization for P2 peers - - protocol::p3_authorize_p2_peer (&mut send).await?; - handle_p2_connection (relay_state, conn, peer).await?; }, - protocol::P3Peer::P4ServerProxy (peer) => { - trace! ("Accepting connection from P4 end server"); - // TODO: Check authorization for P4 peers + val = running_rx.changed () => { - protocol::p3_authorize_p4_peer (&mut send).await?; - handle_p4_connection (relay_state, conn, peer).await?; }, } - Ok::<_, anyhow::Error> (()) -} - -async fn handle_p2_connection ( - relay_state: Arc , - conn: quinn::NewConnection, - peer: protocol::P2ClientProxy, -) -> anyhow::Result <()> -{ - let client_id = peer.id; - - let quinn::NewConnection { - mut bi_streams, - .. - } = conn; - - while let Some (bi_stream) = bi_streams.next ().await { - let (send, mut recv) = bi_stream?; - let relay_state = Arc::clone (&relay_state); - let client_id = client_id.clone (); - - tokio::spawn (async move { - debug! ("Request started for P2"); - - match protocol::p3_accept_p2_stream (&mut recv).await? { - protocol::P2ToP3Stream::ConnectP2ToP4 { - server_id, - } => { - handle_request_p2_to_p4 ( - relay_state, - client_id, - server_id, - send, - recv - ).await? - }, - protocol::P2ToP3Stream::DirecP2ToP4 { - server_id, - cookie, - } => { - handle_direc_p2_to_p4 ( - relay_state, - client_id, - server_id, - cookie, - send, - recv - ).await? - }, - } - - debug! ("Request ended for P2"); - - Ok::<_, anyhow::Error> (()) - }); - } - - debug! ("P2 {} disconnected", client_id); - Ok (()) -} - -async fn handle_request_p2_to_p4 ( - relay_state: Arc , - client_id: String, - server_id: PeerId, - mut client_send: quinn::SendStream, - client_recv: quinn::RecvStream, -) -> anyhow::Result <()> -{ - trace! ("P2 {} wants to connect to P4 {}", client_id, server_id); - - // TODO: Check authorization for P2 to connect to P4 - - protocol::p3_authorize_p2_to_p4_connection (&mut client_send).await?; - - { - let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; - match p4_server_proxies.get (&server_id) { - Some (p4_state) => { - p4_state.req_channel.send (RequestP2ToP4 { - client_send, - client_recv, - client_id, - }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; - }, - None => warn! ("That server isn't connected"), - } - } - - Ok (()) -} - -async fn handle_direc_p2_to_p4 ( - relay_state: Arc , - client_id: String, - server_id: PeerId, - cookie: Vec , - mut client_send: quinn::SendStream, - client_recv: quinn::RecvStream, -) -> anyhow::Result <()> -{ - debug! ("P2 {} wants a P2P connection to P4 {}", client_id, server_id); - - // TODO: Check authorization - - protocol::p3_authorize_p2_to_p4_direc (&mut client_send).await?; - - let (tx, rx) = tokio::sync::oneshot::channel (); - - { - let mut direc_cookies = relay_state.direc_cookies.lock ().await; - direc_cookies.insert (cookie, DirecState { - start_time: Instant::now (), - p2_id: client_id.clone (), - p2_addr: tx, - }); - } - - debug! ("Waiting to learn P2's WAN address..."); - - let wan_addr = rx.await?; - - debug! ("And that WAN address is {}", wan_addr); - - Ok (()) -} - -async fn handle_p4_connection ( - relay_state: Arc , - conn: quinn::NewConnection, - peer: protocol::P4ServerProxy, -) -> anyhow::Result <()> -{ - let server_id = peer.id; - let quinn::NewConnection { - connection, - .. - } = conn; - let (tx, mut rx) = mpsc::channel (2); - - let p4_state = P4State { - req_channel: tx, - }; - - { - let mut p4_server_proxies = relay_state.p4_server_proxies.lock ().await; - p4_server_proxies.insert (server_id.clone (), p4_state); - } - - while let Some (req) = rx.recv ().await { - let connection = connection.clone (); - let server_id = server_id.clone (); - - tokio::spawn (async move { - let RequestP2ToP4 { - client_send, - client_recv, - client_id, - } = req; - - debug! ("P4 {} got a request from P2 {}", server_id, client_id); - - let (server_send, server_recv) = protocol::p3_connect_p2_to_p4 (&connection, &client_id).await?; - - trace! ("Relaying bytes..."); - - let ptth_conn = PtthNewConnection { - client_send, - client_recv, - server_send, - server_recv, - }.build (); - - ptth_conn.uplink_task.await??; - ptth_conn.downlink_task.await??; - - debug! ("Request ended for P4"); - Ok::<_, anyhow::Error> (()) - }); - } - - debug! ("P4 {} disconnected", server_id); Ok (()) } diff --git a/crates/ptth_quic/src/executable_relay_server.rs b/crates/ptth_quic/src/executable_relay_server.rs new file mode 100644 index 0000000..f12662a --- /dev/null +++ b/crates/ptth_quic/src/executable_relay_server.rs @@ -0,0 +1,514 @@ +use hyper::{ + Body, + Request, + Response, + Server, + service::{ + make_service_fn, + service_fn, + }, + StatusCode, +}; +use structopt::StructOpt; +use tokio::{ + net::UdpSocket, +}; + +use crate::prelude::*; +use protocol::PeerId; + +#[derive (Debug, StructOpt)] +pub struct Opt { + #[structopt (long)] + listen_addr: Option , +} + +pub async fn main (opt: Opt) -> anyhow::Result <()> +{ + let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; + let (mut incoming, server_cert) = make_server_endpoint (listen_addr)?; + println! ("Base64 cert: {}", base64::encode (&server_cert)); + + tokio::fs::create_dir_all ("ptth_quic_output").await?; + tokio::fs::write ("ptth_quic_output/quic_server.crt", &server_cert).await?; + + let relay_state = Arc::new (RelayState::default ()); + + let make_svc = { + let relay_state = Arc::clone (&relay_state); + make_service_fn (move |_conn| { + let relay_state = Arc::clone (&relay_state); + + async move { + Ok::<_, String> (service_fn (move |req| { + let relay_state = Arc::clone (&relay_state); + + handle_http (req, relay_state) + })) + } + }) + }; + + let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); + let http_server = Server::bind (&http_addr); + + let tcp_port = 30382; + let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?; + + let task_quic_server = { + let relay_state = Arc::clone (&relay_state); + tokio::spawn (async move { + while let Some (conn) = incoming.next ().await { + let relay_state = Arc::clone (&relay_state); + + // Each new peer QUIC connection gets its own task + tokio::spawn (async move { + let active = relay_state.stats.quic.connect (); + debug! ("QUIC connections: {}", active); + + match handle_quic_connection (Arc::clone (&relay_state), conn).await { + Ok (_) => (), + Err (e) => warn! ("handle_quic_connection `{:?}`", e), + } + + let active = relay_state.stats.quic.disconnect (); + debug! ("QUIC connections: {}", active); + }); + } + + Ok::<_, anyhow::Error> (()) + }) + }; + + let task_direc_server = { + let relay_state = Arc::clone (&relay_state); + + tokio::spawn (async move { + let sock = UdpSocket::bind("0.0.0.0:30379").await?; + let mut buf = [0; 2048]; + loop { + let (len, addr) = sock.recv_from (&mut buf).await?; + debug! ("{:?} bytes received from {:?}", len, addr); + + let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x)); + + { + let mut direc_cookies = relay_state.direc_cookies.lock ().await; + + if let Some (direc_state) = direc_cookies.remove (&packet) { + debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id); + direc_state.p2_addr.send (addr).ok (); + } + else { + debug! ("UDP packet didn't match any PTTH_DIREC cookie"); + } + } + } + + Ok::<_, anyhow::Error> (()) + }) + }; + + let task_http_server = tokio::spawn (async move { + http_server.serve (make_svc).await?; + Ok::<_, anyhow::Error> (()) + }); + + let task_tcp_server = { + let relay_state = Arc::clone (&relay_state); + tokio::spawn (async move { + loop { + let (tcp_socket, _) = tcp_listener.accept ().await?; + + let relay_state = Arc::clone (&relay_state); + tokio::spawn (async move { + let (_client_recv, _client_send) = tcp_socket.into_split (); + + debug! ("Accepted direct TCP connection P1 --> P3"); + + let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; + let _p4 = match p4_server_proxies.get ("bogus_server") { + Some (x) => x, + None => bail! ("That server isn't connected"), + }; + + // unimplemented! (); + /* + p4.req_channel.send (RequestP2ToP4 { + client_send, + client_recv, + client_id: "bogus_client".to_string (), + }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; + */ + Ok (()) + }); + } + + Ok::<_, anyhow::Error> (()) + }) + }; + + debug! ("Serving HTTP on {:?}", http_addr); + + task_quic_server.await??; + task_http_server.await??; + task_tcp_server.await??; + task_direc_server.await??; + + Ok (()) +} + +async fn handle_http (_req: Request , relay_state: Arc ) +-> anyhow::Result > +{ + let debug_string; + { + let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; + + debug_string = format! ("{:#?}\n", p4_server_proxies.keys ().collect::> ()); + } + + let resp = Response::builder () + .status (StatusCode::OK) + .header ("content-type", "text/plain") + .body (Body::from (debug_string))?; + + Ok (resp) +} + +#[derive (Default)] +struct RelayState { + p4_server_proxies: Mutex >, + direc_cookies: Mutex , DirecState>>, + stats: Stats, +} + +struct DirecState { + start_time: Instant, + p2_id: PeerId, + p2_addr: tokio::sync::oneshot::Sender , +} + +#[derive (Default)] +struct Stats { + quic: ConnectEvents, +} + +#[derive (Default)] +struct ConnectEvents { + connects: AtomicU64, + disconnects: AtomicU64, +} + +impl ConnectEvents { + fn connect (&self) -> u64 { + let connects = self.connects.fetch_add (1, Ordering::Relaxed) + 1; + let disconnects = self.disconnects.load (Ordering::Relaxed); + connects - disconnects + } + + fn disconnect (&self) -> u64 { + let disconnects = self.disconnects.fetch_add (1, Ordering::Relaxed) + 1; + let connects = self.connects.load (Ordering::Relaxed); + connects - disconnects + } + + fn _active (&self) -> u64 { + let connects = self.connects.load (Ordering::Relaxed); + let disconnects = self.disconnects.load (Ordering::Relaxed); + connects - disconnects + } +} + +struct P4State { + req_channel: mpsc::Sender , + +} + +impl RelayState { + +} + +struct RequestP2ToP4 { + client_send: quinn::SendStream, + client_recv: quinn::RecvStream, + client_id: String, +} + +struct PtthNewConnection { + client_send: quinn::SendStream, + client_recv: quinn::RecvStream, + server_send: quinn::SendStream, + server_recv: quinn::RecvStream, +} + +struct PtthConnection { + uplink_task: JoinHandle >, + downlink_task: JoinHandle >, +} + +impl PtthNewConnection { + fn build (self) -> PtthConnection { + let Self { + mut client_send, + mut client_recv, + mut server_send, + mut server_recv, + } = self; + + let uplink_task = tokio::spawn (async move { + // Uplink - Client to end server + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = client_recv.read (&mut buf).await? { + if bytes_read == 0 { + break; + } + let buf_slice = &buf [0..bytes_read]; + trace! ("Uplink relaying {} bytes", bytes_read); + server_send.write_all (buf_slice).await?; + } + + trace! ("Uplink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + let downlink_task = tokio::spawn (async move { + // Downlink - End server to client + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = server_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + trace! ("Downlink relaying {} bytes", bytes_read); + client_send.write_all (buf_slice).await?; + } + + trace! ("Downlink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + PtthConnection { + uplink_task, + downlink_task, + } + } +} + +async fn handle_quic_connection ( + relay_state: Arc , + conn: quinn::Connecting, +) -> anyhow::Result <()> +{ + let mut conn = conn.await?; + + // Everyone who connects must identify themselves with the first + // bi stream + // TODO: Timeout + + let (mut send, mut recv) = conn.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("QUIC client didn't identify itself"))??; + + let peer = protocol::p3_accept_peer (&mut recv).await?; + + match peer { + protocol::P3Peer::P2ClientProxy (peer) => { + trace! ("Accepting connection from P2 client"); + // TODO: Check authorization for P2 peers + + protocol::p3_authorize_p2_peer (&mut send).await?; + handle_p2_connection (relay_state, conn, peer).await?; + }, + protocol::P3Peer::P4ServerProxy (peer) => { + trace! ("Accepting connection from P4 end server"); + // TODO: Check authorization for P4 peers + + protocol::p3_authorize_p4_peer (&mut send).await?; + handle_p4_connection (relay_state, conn, peer).await?; + }, + } + + Ok::<_, anyhow::Error> (()) +} + +async fn handle_p2_connection ( + relay_state: Arc , + conn: quinn::NewConnection, + peer: protocol::P2ClientProxy, +) -> anyhow::Result <()> +{ + let client_id = peer.id; + + let quinn::NewConnection { + mut bi_streams, + .. + } = conn; + + while let Some (bi_stream) = bi_streams.next ().await { + let (send, mut recv) = bi_stream?; + let relay_state = Arc::clone (&relay_state); + let client_id = client_id.clone (); + + tokio::spawn (async move { + debug! ("Request started for P2"); + + match protocol::p3_accept_p2_stream (&mut recv).await? { + protocol::P2ToP3Stream::ConnectP2ToP4 { + server_id, + } => { + handle_request_p2_to_p4 ( + relay_state, + client_id, + server_id, + send, + recv + ).await? + }, + protocol::P2ToP3Stream::DirecP2ToP4 { + server_id, + cookie, + } => { + handle_direc_p2_to_p4 ( + relay_state, + client_id, + server_id, + cookie, + send, + recv + ).await? + }, + } + + debug! ("Request ended for P2"); + + Ok::<_, anyhow::Error> (()) + }); + } + + debug! ("P2 {} disconnected", client_id); + Ok (()) +} + +async fn handle_request_p2_to_p4 ( + relay_state: Arc , + client_id: String, + server_id: PeerId, + mut client_send: quinn::SendStream, + client_recv: quinn::RecvStream, +) -> anyhow::Result <()> +{ + trace! ("P2 {} wants to connect to P4 {}", client_id, server_id); + + // TODO: Check authorization for P2 to connect to P4 + + protocol::p3_authorize_p2_to_p4_connection (&mut client_send).await?; + + { + let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; + match p4_server_proxies.get (&server_id) { + Some (p4_state) => { + p4_state.req_channel.send (RequestP2ToP4 { + client_send, + client_recv, + client_id, + }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; + }, + None => warn! ("That server isn't connected"), + } + } + + Ok (()) +} + +async fn handle_direc_p2_to_p4 ( + relay_state: Arc , + client_id: String, + server_id: PeerId, + cookie: Vec , + mut client_send: quinn::SendStream, + client_recv: quinn::RecvStream, +) -> anyhow::Result <()> +{ + debug! ("P2 {} wants a P2P connection to P4 {}", client_id, server_id); + + // TODO: Check authorization + + protocol::p3_authorize_p2_to_p4_direc (&mut client_send).await?; + + let (tx, rx) = tokio::sync::oneshot::channel (); + + { + let mut direc_cookies = relay_state.direc_cookies.lock ().await; + direc_cookies.insert (cookie, DirecState { + start_time: Instant::now (), + p2_id: client_id.clone (), + p2_addr: tx, + }); + } + + debug! ("Waiting to learn P2's WAN address..."); + + let wan_addr = rx.await?; + + debug! ("And that WAN address is {}", wan_addr); + + Ok (()) +} + +async fn handle_p4_connection ( + relay_state: Arc , + conn: quinn::NewConnection, + peer: protocol::P4ServerProxy, +) -> anyhow::Result <()> +{ + let server_id = peer.id; + let quinn::NewConnection { + connection, + .. + } = conn; + let (tx, mut rx) = mpsc::channel (2); + + let p4_state = P4State { + req_channel: tx, + }; + + { + let mut p4_server_proxies = relay_state.p4_server_proxies.lock ().await; + p4_server_proxies.insert (server_id.clone (), p4_state); + } + + while let Some (req) = rx.recv ().await { + let connection = connection.clone (); + let server_id = server_id.clone (); + + tokio::spawn (async move { + let RequestP2ToP4 { + client_send, + client_recv, + client_id, + } = req; + + debug! ("P4 {} got a request from P2 {}", server_id, client_id); + + let (server_send, server_recv) = protocol::p3_connect_p2_to_p4 (&connection, &client_id).await?; + + trace! ("Relaying bytes..."); + + let ptth_conn = PtthNewConnection { + client_send, + client_recv, + server_send, + server_recv, + }.build (); + + ptth_conn.uplink_task.await??; + ptth_conn.downlink_task.await??; + + debug! ("Request ended for P4"); + Ok::<_, anyhow::Error> (()) + }); + } + + debug! ("P4 {} disconnected", server_id); + Ok (()) +} diff --git a/crates/ptth_quic/src/lib.rs b/crates/ptth_quic/src/lib.rs index d5f0aa0..5cf334c 100644 --- a/crates/ptth_quic/src/lib.rs +++ b/crates/ptth_quic/src/lib.rs @@ -1,6 +1,7 @@ pub mod client_proxy; pub mod connection; pub mod executable_end_server; +pub mod executable_relay_server; pub mod prelude; pub mod protocol; pub mod quinn_utils;