diff --git a/prototypes/quic_demo/src/bin/client_gui.rs b/prototypes/quic_demo/src/bin/client_gui.rs index 1273420..4cfdfda 100644 --- a/prototypes/quic_demo/src/bin/client_gui.rs +++ b/prototypes/quic_demo/src/bin/client_gui.rs @@ -10,10 +10,12 @@ use fltk::{ window::Window }; use structopt::StructOpt; -use tokio::net::TcpListener; -use quic_demo::prelude::*; -use protocol::PeerId; +use quic_demo::{ + client_proxy::*, + prelude::*, + protocol::PeerId, +}; #[derive (Debug, StructOpt)] struct Opt { @@ -122,19 +124,13 @@ fn main () -> anyhow::Result <()> { Some (Message::OpenPort (port_idx)) => { if let Ok (params) = gui_ports [port_idx].get_params () { let connection_p2_p3 = connection_p2_p3.clone (); - let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true); - let task = rt.spawn (forward_port ( + let _guard = rt.enter (); + forwarding_instances [port_idx].replace (ForwardingInstance::new ( connection_p2_p3, params, - shutdown_flag_rx )); - forwarding_instances [port_idx].replace (ForwardingInstance { - task, - shutdown_flag, - }); - gui_ports [port_idx].set_forwarding (true); frame_status.set_label ("Forwarding 1 port"); @@ -142,11 +138,7 @@ fn main () -> anyhow::Result <()> { }, Some (Message::ClosePort (port_idx)) => { if let Some (old_instance) = forwarding_instances [port_idx].take () { - rt.block_on (async { - old_instance.shutdown_flag.send (false)?; - old_instance.task.await??; - Ok::<_, anyhow::Error> (()) - })?; + rt.block_on (old_instance.close ())?; } gui_ports [port_idx].set_forwarding (false); @@ -168,125 +160,6 @@ fn set_active (w: &mut W, b: bool) { } } -struct ForwardingInstance { - task: tokio::task::JoinHandle >, - shutdown_flag: tokio::sync::watch::Sender , -} - -async fn forward_port ( - connection_p2_p3: quinn::Connection, - params: ForwardingParams, - shutdown_flag_rx: tokio::sync::watch::Receiver , -) -> anyhow::Result <()> -{ - let ForwardingParams { - client_tcp_port, - server_id, - server_tcp_port, - } = params; - - let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?; - - trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port); - - while *shutdown_flag_rx.borrow () { - let mut shutdown_flag_rx_2 = shutdown_flag_rx.clone (); - - tokio::select! { - x = listener.accept () => { - let (tcp_socket, _) = x?; - let connection = connection_p2_p3.clone (); - let server_id = server_id.clone (); - let shutdown_flag_rx = shutdown_flag_rx.clone (); - - tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx)); - }, - _ = shutdown_flag_rx_2.changed () => (), - }; - } - - Ok::<_, anyhow::Error> (()) -} - -async fn handle_p1 ( - connection: quinn::Connection, - server_id: String, - server_tcp_port: u16, - tcp_socket: tokio::net::TcpStream, - shutdown_flag_rx: tokio::sync::watch::Receiver , -) -> anyhow::Result <()> -{ - let (mut local_recv, mut local_send) = tcp_socket.into_split (); - - debug! ("Starting PTTH connection"); - - let (mut relay_send, mut relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?; - - trace! ("Relaying bytes..."); - - let task_blue = { - let mut shutdown_flag_rx = shutdown_flag_rx.clone (); - - tokio::spawn (async move { - let mut buf = vec! [0u8; 65_536]; - while *shutdown_flag_rx.borrow () { - trace! ("Blue reading from QUIC..."); - tokio::select! { - x = relay_recv.read (&mut buf) => { - let bytes_read = match x? { - None => break, - Some (0) => break, - Some (x) => x, - }; - let buf_slice = &buf [0..bytes_read]; - trace! ("Uplink relaying {} bytes", bytes_read); - local_send.write_all (buf_slice).await?; - }, - _ = shutdown_flag_rx.changed () => (), - }; - } - - debug! ("Blue QUIC --> TCP closed"); - - Ok::<_, anyhow::Error> (()) - }) - }; - - let task_green = { - let mut shutdown_flag_rx = shutdown_flag_rx.clone (); - - tokio::spawn (async move { - let mut buf = vec! [0u8; 65_536]; - while *shutdown_flag_rx.borrow () { - trace! ("Green reading from TCP..."); - tokio::select! { - x = local_recv.read (&mut buf) => { - let bytes_read = match x? { - 0 => break, - x => x, - }; - let buf_slice = &buf [0..bytes_read]; - trace! ("Downlink relaying {} bytes", bytes_read); - relay_send.write_all (buf_slice).await?; - }, - _ = shutdown_flag_rx.changed () => (), - }; - } - - debug! ("Green TCP --> QUIC closed"); - - Ok::<_, anyhow::Error> (()) - }) - }; - - task_blue.await??; - task_green.await??; - - debug! ("Ended PTTH connection"); - - Ok (()) -} - struct GuiPort { input_client_port: Input, input_server_id: Input, @@ -295,12 +168,6 @@ struct GuiPort { but_close: Button, } -struct ForwardingParams { - client_tcp_port: u16, - server_id: String, - server_tcp_port: u16, -} - impl GuiPort { fn new (fltk_tx: fltk::app::Sender , x: &mut i32, y: i32, port_idx: usize) -> Self { let margin = 10; diff --git a/prototypes/quic_demo/src/client_proxy.rs b/prototypes/quic_demo/src/client_proxy.rs new file mode 100644 index 0000000..0701ffc --- /dev/null +++ b/prototypes/quic_demo/src/client_proxy.rs @@ -0,0 +1,159 @@ +use tokio::{ + net::TcpListener, + sync::watch, + task::JoinHandle, +}; + +use crate::prelude::*; + +pub struct ForwardingInstance { + task: JoinHandle >, + shutdown_flag: watch::Sender , +} + +impl ForwardingInstance { + pub fn new ( + connection_p2_p3: quinn::Connection, + params: ForwardingParams, + ) -> Self + { + let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true); + + let task = tokio::spawn (forward_port ( + connection_p2_p3, + params, + shutdown_flag_rx + )); + + Self { + task, + shutdown_flag, + } + } + + pub async fn close (self) -> anyhow::Result <()> { + self.shutdown_flag.send (false)?; + self.task.await??; + Ok (()) + } +} + +pub struct ForwardingParams { + pub client_tcp_port: u16, + pub server_id: String, + pub server_tcp_port: u16, +} + +async fn forward_port ( + connection_p2_p3: quinn::Connection, + params: ForwardingParams, + shutdown_flag_rx: tokio::sync::watch::Receiver , +) -> anyhow::Result <()> +{ + let ForwardingParams { + client_tcp_port, + server_id, + server_tcp_port, + } = params; + + let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?; + + trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port); + + while *shutdown_flag_rx.borrow () { + let mut shutdown_flag_rx_2 = shutdown_flag_rx.clone (); + + tokio::select! { + x = listener.accept () => { + let (tcp_socket, _) = x?; + let connection = connection_p2_p3.clone (); + let server_id = server_id.clone (); + let shutdown_flag_rx = shutdown_flag_rx.clone (); + + tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx)); + }, + _ = shutdown_flag_rx_2.changed () => (), + }; + } + + Ok::<_, anyhow::Error> (()) +} + +async fn handle_p1 ( + connection: quinn::Connection, + server_id: String, + server_tcp_port: u16, + tcp_socket: tokio::net::TcpStream, + shutdown_flag_rx: tokio::sync::watch::Receiver , +) -> anyhow::Result <()> +{ + let (mut local_recv, mut local_send) = tcp_socket.into_split (); + + debug! ("Starting PTTH connection"); + + let (mut relay_send, mut relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?; + + trace! ("Relaying bytes..."); + + let task_blue = { + let mut shutdown_flag_rx = shutdown_flag_rx.clone (); + + tokio::spawn (async move { + let mut buf = vec! [0u8; 65_536]; + while *shutdown_flag_rx.borrow () { + trace! ("Blue reading from QUIC..."); + tokio::select! { + x = relay_recv.read (&mut buf) => { + let bytes_read = match x? { + None => break, + Some (0) => break, + Some (x) => x, + }; + let buf_slice = &buf [0..bytes_read]; + trace! ("Uplink relaying {} bytes", bytes_read); + local_send.write_all (buf_slice).await?; + }, + _ = shutdown_flag_rx.changed () => (), + }; + } + + debug! ("Blue QUIC --> TCP closed"); + + Ok::<_, anyhow::Error> (()) + }) + }; + + let task_green = { + let mut shutdown_flag_rx = shutdown_flag_rx.clone (); + + tokio::spawn (async move { + let mut buf = vec! [0u8; 65_536]; + while *shutdown_flag_rx.borrow () { + trace! ("Green reading from TCP..."); + tokio::select! { + x = local_recv.read (&mut buf) => { + let bytes_read = match x? { + 0 => break, + x => x, + }; + let buf_slice = &buf [0..bytes_read]; + trace! ("Downlink relaying {} bytes", bytes_read); + relay_send.write_all (buf_slice).await?; + }, + _ = shutdown_flag_rx.changed () => (), + }; + } + + debug! ("Green TCP --> QUIC closed"); + + Ok::<_, anyhow::Error> (()) + }) + }; + + task_blue.await??; + task_green.await??; + + debug! ("Ended PTTH connection"); + + Ok (()) +} diff --git a/prototypes/quic_demo/src/lib.rs b/prototypes/quic_demo/src/lib.rs index 83c96a2..dcbd50a 100644 --- a/prototypes/quic_demo/src/lib.rs +++ b/prototypes/quic_demo/src/lib.rs @@ -1,3 +1,4 @@ +pub mod client_proxy; pub mod connection; pub mod prelude; pub mod protocol;