♻️ refactor: extract ForwardingInstance from `client_gui`

main
_ 2021-07-21 23:28:42 +00:00
parent 320355ca28
commit 67e74169be
3 changed files with 168 additions and 141 deletions

View File

@ -10,10 +10,12 @@ use fltk::{
window::Window window::Window
}; };
use structopt::StructOpt; use structopt::StructOpt;
use tokio::net::TcpListener;
use quic_demo::prelude::*; use quic_demo::{
use protocol::PeerId; client_proxy::*,
prelude::*,
protocol::PeerId,
};
#[derive (Debug, StructOpt)] #[derive (Debug, StructOpt)]
struct Opt { struct Opt {
@ -122,19 +124,13 @@ fn main () -> anyhow::Result <()> {
Some (Message::OpenPort (port_idx)) => { Some (Message::OpenPort (port_idx)) => {
if let Ok (params) = gui_ports [port_idx].get_params () { if let Ok (params) = gui_ports [port_idx].get_params () {
let connection_p2_p3 = connection_p2_p3.clone (); let connection_p2_p3 = connection_p2_p3.clone ();
let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true);
let task = rt.spawn (forward_port ( let _guard = rt.enter ();
forwarding_instances [port_idx].replace (ForwardingInstance::new (
connection_p2_p3, connection_p2_p3,
params, params,
shutdown_flag_rx
)); ));
forwarding_instances [port_idx].replace (ForwardingInstance {
task,
shutdown_flag,
});
gui_ports [port_idx].set_forwarding (true); gui_ports [port_idx].set_forwarding (true);
frame_status.set_label ("Forwarding 1 port"); frame_status.set_label ("Forwarding 1 port");
@ -142,11 +138,7 @@ fn main () -> anyhow::Result <()> {
}, },
Some (Message::ClosePort (port_idx)) => { Some (Message::ClosePort (port_idx)) => {
if let Some (old_instance) = forwarding_instances [port_idx].take () { if let Some (old_instance) = forwarding_instances [port_idx].take () {
rt.block_on (async { rt.block_on (old_instance.close ())?;
old_instance.shutdown_flag.send (false)?;
old_instance.task.await??;
Ok::<_, anyhow::Error> (())
})?;
} }
gui_ports [port_idx].set_forwarding (false); gui_ports [port_idx].set_forwarding (false);
@ -168,125 +160,6 @@ fn set_active <W: WidgetExt> (w: &mut W, b: bool) {
} }
} }
struct ForwardingInstance {
task: tokio::task::JoinHandle <anyhow::Result <()>>,
shutdown_flag: tokio::sync::watch::Sender <bool>,
}
async fn forward_port (
connection_p2_p3: quinn::Connection,
params: ForwardingParams,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()>
{
let ForwardingParams {
client_tcp_port,
server_id,
server_tcp_port,
} = params;
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
while *shutdown_flag_rx.borrow () {
let mut shutdown_flag_rx_2 = shutdown_flag_rx.clone ();
tokio::select! {
x = listener.accept () => {
let (tcp_socket, _) = x?;
let connection = connection_p2_p3.clone ();
let server_id = server_id.clone ();
let shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx));
},
_ = shutdown_flag_rx_2.changed () => (),
};
}
Ok::<_, anyhow::Error> (())
}
async fn handle_p1 (
connection: quinn::Connection,
server_id: String,
server_tcp_port: u16,
tcp_socket: tokio::net::TcpStream,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()>
{
let (mut local_recv, mut local_send) = tcp_socket.into_split ();
debug! ("Starting PTTH connection");
let (mut relay_send, mut relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?;
trace! ("Relaying bytes...");
let task_blue = {
let mut shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (async move {
let mut buf = vec! [0u8; 65_536];
while *shutdown_flag_rx.borrow () {
trace! ("Blue reading from QUIC...");
tokio::select! {
x = relay_recv.read (&mut buf) => {
let bytes_read = match x? {
None => break,
Some (0) => break,
Some (x) => x,
};
let buf_slice = &buf [0..bytes_read];
trace! ("Uplink relaying {} bytes", bytes_read);
local_send.write_all (buf_slice).await?;
},
_ = shutdown_flag_rx.changed () => (),
};
}
debug! ("Blue QUIC --> TCP closed");
Ok::<_, anyhow::Error> (())
})
};
let task_green = {
let mut shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (async move {
let mut buf = vec! [0u8; 65_536];
while *shutdown_flag_rx.borrow () {
trace! ("Green reading from TCP...");
tokio::select! {
x = local_recv.read (&mut buf) => {
let bytes_read = match x? {
0 => break,
x => x,
};
let buf_slice = &buf [0..bytes_read];
trace! ("Downlink relaying {} bytes", bytes_read);
relay_send.write_all (buf_slice).await?;
},
_ = shutdown_flag_rx.changed () => (),
};
}
debug! ("Green TCP --> QUIC closed");
Ok::<_, anyhow::Error> (())
})
};
task_blue.await??;
task_green.await??;
debug! ("Ended PTTH connection");
Ok (())
}
struct GuiPort { struct GuiPort {
input_client_port: Input, input_client_port: Input,
input_server_id: Input, input_server_id: Input,
@ -295,12 +168,6 @@ struct GuiPort {
but_close: Button, but_close: Button,
} }
struct ForwardingParams {
client_tcp_port: u16,
server_id: String,
server_tcp_port: u16,
}
impl GuiPort { impl GuiPort {
fn new (fltk_tx: fltk::app::Sender <Message>, x: &mut i32, y: i32, port_idx: usize) -> Self { fn new (fltk_tx: fltk::app::Sender <Message>, x: &mut i32, y: i32, port_idx: usize) -> Self {
let margin = 10; let margin = 10;

View File

@ -0,0 +1,159 @@
use tokio::{
net::TcpListener,
sync::watch,
task::JoinHandle,
};
use crate::prelude::*;
pub struct ForwardingInstance {
task: JoinHandle <anyhow::Result <()>>,
shutdown_flag: watch::Sender <bool>,
}
impl ForwardingInstance {
pub fn new (
connection_p2_p3: quinn::Connection,
params: ForwardingParams,
) -> Self
{
let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true);
let task = tokio::spawn (forward_port (
connection_p2_p3,
params,
shutdown_flag_rx
));
Self {
task,
shutdown_flag,
}
}
pub async fn close (self) -> anyhow::Result <()> {
self.shutdown_flag.send (false)?;
self.task.await??;
Ok (())
}
}
pub struct ForwardingParams {
pub client_tcp_port: u16,
pub server_id: String,
pub server_tcp_port: u16,
}
async fn forward_port (
connection_p2_p3: quinn::Connection,
params: ForwardingParams,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()>
{
let ForwardingParams {
client_tcp_port,
server_id,
server_tcp_port,
} = params;
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
while *shutdown_flag_rx.borrow () {
let mut shutdown_flag_rx_2 = shutdown_flag_rx.clone ();
tokio::select! {
x = listener.accept () => {
let (tcp_socket, _) = x?;
let connection = connection_p2_p3.clone ();
let server_id = server_id.clone ();
let shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx));
},
_ = shutdown_flag_rx_2.changed () => (),
};
}
Ok::<_, anyhow::Error> (())
}
async fn handle_p1 (
connection: quinn::Connection,
server_id: String,
server_tcp_port: u16,
tcp_socket: tokio::net::TcpStream,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()>
{
let (mut local_recv, mut local_send) = tcp_socket.into_split ();
debug! ("Starting PTTH connection");
let (mut relay_send, mut relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?;
trace! ("Relaying bytes...");
let task_blue = {
let mut shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (async move {
let mut buf = vec! [0u8; 65_536];
while *shutdown_flag_rx.borrow () {
trace! ("Blue reading from QUIC...");
tokio::select! {
x = relay_recv.read (&mut buf) => {
let bytes_read = match x? {
None => break,
Some (0) => break,
Some (x) => x,
};
let buf_slice = &buf [0..bytes_read];
trace! ("Uplink relaying {} bytes", bytes_read);
local_send.write_all (buf_slice).await?;
},
_ = shutdown_flag_rx.changed () => (),
};
}
debug! ("Blue QUIC --> TCP closed");
Ok::<_, anyhow::Error> (())
})
};
let task_green = {
let mut shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (async move {
let mut buf = vec! [0u8; 65_536];
while *shutdown_flag_rx.borrow () {
trace! ("Green reading from TCP...");
tokio::select! {
x = local_recv.read (&mut buf) => {
let bytes_read = match x? {
0 => break,
x => x,
};
let buf_slice = &buf [0..bytes_read];
trace! ("Downlink relaying {} bytes", bytes_read);
relay_send.write_all (buf_slice).await?;
},
_ = shutdown_flag_rx.changed () => (),
};
}
debug! ("Green TCP --> QUIC closed");
Ok::<_, anyhow::Error> (())
})
};
task_blue.await??;
task_green.await??;
debug! ("Ended PTTH connection");
Ok (())
}

View File

@ -1,3 +1,4 @@
pub mod client_proxy;
pub mod connection; pub mod connection;
pub mod prelude; pub mod prelude;
pub mod protocol; pub mod protocol;