🚧 P3 can now do multiple connections in series

And it doesn't matter if P2 or P4 connects first.
However, P2 and P4 are still limited to one connection each. Then they quit
gracefully.
main
_ 2021-07-17 07:43:21 +00:00
parent 7406587d30
commit 5417fbc77b
3 changed files with 124 additions and 43 deletions

View File

@ -35,50 +35,51 @@ async fn main () -> anyhow::Result <()> {
debug! ("Connecting to end server"); debug! ("Connecting to end server");
let (mut send, mut recv) = connection.open_bi ().await?; let (mut relay_send, mut relay_recv) = connection.open_bi ().await?;
let req_buf = [1, 43, 0, 0, 1, 0, 0, 0]; let req_buf = [1, 43, 0, 0, 1, 0, 0, 0];
send.write_all (&req_buf).await?; relay_send.write_all (&req_buf).await?;
let mut resp_buf = [0; 8]; let mut resp_buf = [0; 8];
recv.read_exact (&mut resp_buf).await?; relay_recv.read_exact (&mut resp_buf).await?;
/* debug! ("Relaying bytes...");
if false {
debug! ("Relaying bytes..."); let uplink_task = tokio::spawn (async move {
// Uplink - local client to relay server
let uplink_task = tokio::spawn (async move { let mut buf = vec! [0u8; 65_536];
// Uplink - local client to relay server loop {
let bytes_read = local_recv.read (&mut buf).await?;
let mut buf = vec! [0u8; 65_536]; if bytes_read == 0 {
loop { break;
let bytes_read = local_recv.read (&mut buf).await?;
let buf_slice = &buf [0..bytes_read];
relay_send.write_all (buf_slice).await?;
} }
let buf_slice = &buf [0..bytes_read];
debug! ("Uplink closed"); debug! ("Uplink relaying {} bytes", bytes_read);
relay_send.write_all (buf_slice).await?;
Ok::<_, anyhow::Error> (()) }
});
let downlink_task = tokio::spawn (async move { debug! ("Uplink closed");
// Downlink - Relay server to local client
let mut buf = vec! [0u8; 65_536];
while let Some (bytes_read) = relay_recv.read (&mut buf).await? {
let buf_slice = &buf [0..bytes_read];
local_send.write_all (buf_slice).await?;
}
debug! ("Downlink closed");
Ok::<_, anyhow::Error> (())
});
uplink_task.await??; Ok::<_, anyhow::Error> (())
downlink_task.await??; });
}
*/ let downlink_task = tokio::spawn (async move {
// Downlink - Relay server to local client
let mut buf = vec! [0u8; 65_536];
while let Some (bytes_read) = relay_recv.read (&mut buf).await? {
let buf_slice = &buf [0..bytes_read];
debug! ("Downlink relaying {} bytes", bytes_read);
local_send.write_all (buf_slice).await?;
}
debug! ("Downlink closed");
Ok::<_, anyhow::Error> (())
});
uplink_task.await??;
downlink_task.await??;
Ok (()) Ok (())
} }

View File

@ -30,11 +30,20 @@ async fn main () -> anyhow::Result <()> {
let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??;
let mut req_buf = [0, 0, 0, 0];
relay_recv.read_exact (&mut req_buf).await?;
// TODO: Auth stuff
debug! ("Connecting to TCP end server"); debug! ("Connecting to TCP end server");
let stream = TcpStream::connect ("127.0.0.1:30382").await?; let stream = TcpStream::connect ("127.0.0.1:30382").await?;
let (mut local_recv, mut local_send) = stream.into_split (); let (mut local_recv, mut local_send) = stream.into_split ();
let resp_buf = [20, 0, 0, 0];
relay_send.write_all (&resp_buf).await?;
relay_send.write_all (&resp_buf).await?;
debug! ("Relaying bytes..."); debug! ("Relaying bytes...");
let uplink_task = tokio::spawn (async move { let uplink_task = tokio::spawn (async move {
@ -43,7 +52,11 @@ async fn main () -> anyhow::Result <()> {
let mut buf = vec! [0u8; 65_536]; let mut buf = vec! [0u8; 65_536];
loop { loop {
let bytes_read = local_recv.read (&mut buf).await?; let bytes_read = local_recv.read (&mut buf).await?;
if bytes_read == 0 {
break;
}
let buf_slice = &buf [0..bytes_read]; let buf_slice = &buf [0..bytes_read];
debug! ("Uplink relaying {} bytes", bytes_read);
relay_send.write_all (buf_slice).await?; relay_send.write_all (buf_slice).await?;
} }
@ -58,6 +71,7 @@ async fn main () -> anyhow::Result <()> {
let mut buf = vec! [0u8; 65_536]; let mut buf = vec! [0u8; 65_536];
while let Some (bytes_read) = relay_recv.read (&mut buf).await? { while let Some (bytes_read) = relay_recv.read (&mut buf).await? {
let buf_slice = &buf [0..bytes_read]; let buf_slice = &buf [0..bytes_read];
debug! ("Downlink relaying {} bytes", bytes_read);
local_send.write_all (buf_slice).await?; local_send.write_all (buf_slice).await?;
} }

View File

@ -45,12 +45,14 @@ async fn main () -> anyhow::Result <()> {
match peer_type { match peer_type {
2 => { 2 => {
let client_id = peer_id;
while let Some (bi_stream) = bi_streams.next ().await { while let Some (bi_stream) = bi_streams.next ().await {
let (mut send, mut recv) = bi_stream?; let (mut client_send, mut client_recv) = bi_stream?;
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move { tokio::spawn (async move {
let mut req_buf = [0u8; 4]; let mut req_buf = [0u8; 4];
recv.read_exact (&mut req_buf).await?; client_recv.read_exact (&mut req_buf).await?;
let cmd_type = req_buf [0]; let cmd_type = req_buf [0];
match cmd_type { match cmd_type {
@ -58,10 +60,31 @@ async fn main () -> anyhow::Result <()> {
let server_id = req_buf [1]; let server_id = req_buf [1];
debug! ("P2 {} wants to connect to P4 {}", peer_id, server_id); debug! ("P2 {} wants to connect to P4 {}", peer_id, server_id);
// TODO: Auth checks
let resp_buf = [0, 0, 0, 0];
client_send.write_all (&resp_buf).await?;
{
let relay_state = relay_state.lock ().await;
match relay_state.p4_server_proxies.get (&server_id) {
Some (p4_state) => {
p4_state.req_channel.send (RequestP2ToP4 {
client_send,
client_recv,
client_id,
}).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?;
},
None => warn! ("That server isn't connected"),
}
}
}, },
_ => bail! ("Unknown command type from P2"), _ => bail! ("Unknown command type from P2"),
} }
debug! ("Request ended for P2");
Ok::<_, anyhow::Error> (()) Ok::<_, anyhow::Error> (())
}); });
} }
@ -81,7 +104,45 @@ async fn main () -> anyhow::Result <()> {
} }
while let Some (req) = rx.recv ().await { while let Some (req) = rx.recv ().await {
debug! ("P4 {} got a request", peer_id); let connection = connection.clone ();
tokio::spawn (async move {
let RequestP2ToP4 {
client_send,
client_recv,
client_id,
} = req;
debug! ("P4 {} got a request from P2 {}", peer_id, req.client_id);
let (mut server_send, mut server_recv) = connection.open_bi ().await?;
let req_buf = [2u8, client_id, 0, 0];
server_send.write_all (&req_buf).await?;
let mut resp_buf = [0u8, 0, 0, 0];
server_recv.read_exact (&mut resp_buf).await?;
let status_code = resp_buf [0];
if status_code != 20 {
bail! ("P4 rejected request from {}", client_id);
}
debug! ("Relaying bytes...");
let ptth_conn = PtthNewConnection {
client_send,
client_recv,
server_send,
server_recv,
}.build ();
ptth_conn.uplink_task.await??;
ptth_conn.downlink_task.await??;
debug! ("Request ended for P4");
Ok::<_, anyhow::Error> (())
});
} }
debug! ("P4 {} disconnected", peer_id); debug! ("P4 {} disconnected", peer_id);
@ -155,10 +216,10 @@ impl RelayState {
} }
enum RequestP2ToP4 { struct RequestP2ToP4 {
Connect { client_send: quinn::SendStream,
client_recv: quinn::RecvStream,
}, client_id: u8,
} }
struct PtthNewConnection { struct PtthNewConnection {
@ -187,7 +248,11 @@ impl PtthNewConnection {
let mut buf = vec! [0u8; 65_536]; let mut buf = vec! [0u8; 65_536];
while let Some (bytes_read) = client_recv.read (&mut buf).await? { while let Some (bytes_read) = client_recv.read (&mut buf).await? {
if bytes_read == 0 {
break;
}
let buf_slice = &buf [0..bytes_read]; let buf_slice = &buf [0..bytes_read];
debug! ("Uplink relaying {} bytes", bytes_read);
server_send.write_all (buf_slice).await?; server_send.write_all (buf_slice).await?;
} }
@ -202,6 +267,7 @@ impl PtthNewConnection {
let mut buf = vec! [0u8; 65_536]; let mut buf = vec! [0u8; 65_536];
while let Some (bytes_read) = server_recv.read (&mut buf).await? { while let Some (bytes_read) = server_recv.read (&mut buf).await? {
let buf_slice = &buf [0..bytes_read]; let buf_slice = &buf [0..bytes_read];
debug! ("Downlink relaying {} bytes", bytes_read);
client_send.write_all (buf_slice).await?; client_send.write_all (buf_slice).await?;
} }