🐛 bug: with some complication, I was able to make the GUI close the port properly

main
_ 2021-07-19 01:46:06 +00:00
parent 5163d51cbd
commit 34c9e5e7a1
1 changed files with 130 additions and 44 deletions

View File

@ -53,6 +53,7 @@ fn main () -> anyhow::Result <()> {
} }
y += h + margin; y += h + margin;
x = margin;
{ {
let w = 80; let w = 80;
@ -69,26 +70,27 @@ fn main () -> anyhow::Result <()> {
} }
y += h + margin; y += h + margin;
x = margin;
{ {
let w = 80; let w = 80;
let mut input_client_port = Input::new (10, y, w, h, ""); let mut input_client_port = Input::new (x, y, w, h, "");
x += w + margin; x += w + margin;
let w = 120; let w = 120;
input_server_id = Input::new (100, y, w, h, ""); input_server_id = Input::new (x, y, w, h, "");
x += w + margin; x += w + margin;
let w = 80; let w = 80;
let mut input_server_port = Input::new (230, y, w, h, ""); let mut input_server_port = Input::new (x, y, w, h, "");
x += w + margin; x += w + margin;
let w = 80; let w = 80;
but_open = Button::new (320, y, w, h, "Open"); but_open = Button::new (x, y, w, h, "Open");
x += w + margin; x += w + margin;
let w = 80; let w = 80;
but_close = Button::new (410, y, w, h, "Close"); but_close = Button::new (x, y, w, h, "Close");
// x += w + margin; // x += w + margin;
input_client_port.set_value ("5901"); input_client_port.set_value ("5901");
@ -133,49 +135,16 @@ fn main () -> anyhow::Result <()> {
Some (Message::OpenPort) => { Some (Message::OpenPort) => {
let connection_p2_p3 = connection_p2_p3.clone (); let connection_p2_p3 = connection_p2_p3.clone ();
let server_id = input_server_id.value ().to_string (); let server_id = input_server_id.value ().to_string ();
let (shutdown_flag, shutdown_flag_rx) = tokio::sync::watch::channel (true);
let task = rt.spawn (async move { let task = rt.spawn (async move {
let client_tcp_port = 30381; forward_port (connection_p2_p3, server_id, shutdown_flag_rx).await
let server_tcp_port = 30382;
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
loop {
let (tcp_socket, _) = listener.accept ().await?;
let connection = connection_p2_p3.clone ();
let server_id = server_id.clone ();
tokio::spawn (async move {
let (local_recv, local_send) = tcp_socket.into_split ();
debug! ("Starting PTTH connection");
let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?;
trace! ("Relaying bytes...");
let ptth_conn = quic_demo::connection::NewConnection {
local_send,
local_recv,
relay_send,
relay_recv,
}.build ();
ptth_conn.wait_for_close ().await?;
debug! ("Ended PTTH connection");
Ok::<_, anyhow::Error> (())
});
}
Ok::<_, anyhow::Error> (())
}); });
forwarding_instance.replace (Some (ForwardingInstance { forwarding_instance.replace (ForwardingInstance {
task, task,
})); shutdown_flag,
});
set_active (&mut but_open, false); set_active (&mut but_open, false);
set_active (&mut but_close, true); set_active (&mut but_close, true);
@ -183,7 +152,13 @@ fn main () -> anyhow::Result <()> {
but_close.set (false); but_close.set (false);
}, },
Some (Message::ClosePort) => { Some (Message::ClosePort) => {
forwarding_instance.replace (None); if let Some (mut old_instance) = forwarding_instance.take () {
rt.block_on (async {
old_instance.shutdown_flag.send (false)?;
old_instance.task.await??;
Ok::<_, anyhow::Error> (())
})?;
}
set_active (&mut but_open, true); set_active (&mut but_open, true);
set_active (&mut but_close, false); set_active (&mut but_close, false);
@ -208,4 +183,115 @@ fn set_active <W: WidgetExt> (w: &mut W, b: bool) {
struct ForwardingInstance { struct ForwardingInstance {
task: tokio::task::JoinHandle <anyhow::Result <()>>, task: tokio::task::JoinHandle <anyhow::Result <()>>,
shutdown_flag: tokio::sync::watch::Sender <bool>,
}
async fn forward_port (
connection_p2_p3: quinn::Connection,
server_id: String,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()>
{
let client_tcp_port = 30381;
let server_tcp_port = 30382;
let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?;
trace! ("Accepting local TCP connections from P1 on {}", client_tcp_port);
while *shutdown_flag_rx.borrow () {
let mut shutdown_flag_rx_2 = shutdown_flag_rx.clone ();
tokio::select! {
x = listener.accept () => {
let (tcp_socket, _) = x?;
let connection = connection_p2_p3.clone ();
let server_id = server_id.clone ();
let shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (handle_p1 (connection, server_id, server_tcp_port, tcp_socket, shutdown_flag_rx));
},
_ = shutdown_flag_rx_2.changed () => (),
};
}
Ok::<_, anyhow::Error> (())
}
async fn handle_p1 (
connection: quinn::Connection,
server_id: String,
server_tcp_port: u16,
tcp_socket: tokio::net::TcpStream,
shutdown_flag_rx: tokio::sync::watch::Receiver <bool>,
) -> anyhow::Result <()>
{
let (mut local_recv, mut local_send) = tcp_socket.into_split ();
debug! ("Starting PTTH connection");
let (mut relay_send, mut relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?;
trace! ("Relaying bytes...");
let task_blue = {
let mut shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (async move {
let mut buf = vec! [0u8; 65_536];
while *shutdown_flag_rx.borrow () {
trace! ("Blue reading from QUIC...");
tokio::select! {
x = relay_recv.read (&mut buf) => {
let bytes_read = match x? {
None => break,
Some (0) => break,
Some (x) => x,
};
let buf_slice = &buf [0..bytes_read];
trace! ("Uplink relaying {} bytes", bytes_read);
local_send.write_all (buf_slice).await?;
},
_ = shutdown_flag_rx.changed () => (),
};
}
debug! ("Blue QUIC --> TCP closed");
Ok::<_, anyhow::Error> (())
})
};
let task_green = {
let mut shutdown_flag_rx = shutdown_flag_rx.clone ();
tokio::spawn (async move {
let mut buf = vec! [0u8; 65_536];
while *shutdown_flag_rx.borrow () {
trace! ("Green reading from TCP...");
tokio::select! {
x = local_recv.read (&mut buf) => {
let bytes_read = match x? {
0 => break,
x => x,
};
let buf_slice = &buf [0..bytes_read];
trace! ("Downlink relaying {} bytes", bytes_read);
relay_send.write_all (buf_slice).await?;
},
_ = shutdown_flag_rx.changed () => (),
};
}
debug! ("Green TCP --> QUIC closed");
Ok::<_, anyhow::Error> (())
})
};
task_blue.await??;
task_green.await??;
debug! ("Ended PTTH connection");
Ok (())
} }