diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index 058f710..511af4e 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -35,50 +35,51 @@ async fn main () -> anyhow::Result <()> { debug! ("Connecting to end server"); - let (mut send, mut recv) = connection.open_bi ().await?; + let (mut relay_send, mut relay_recv) = connection.open_bi ().await?; let req_buf = [1, 43, 0, 0, 1, 0, 0, 0]; - send.write_all (&req_buf).await?; + relay_send.write_all (&req_buf).await?; let mut resp_buf = [0; 8]; - recv.read_exact (&mut resp_buf).await?; + relay_recv.read_exact (&mut resp_buf).await?; - /* - if false { - debug! ("Relaying bytes..."); + debug! ("Relaying bytes..."); + + let uplink_task = tokio::spawn (async move { + // Uplink - local client to relay server - let uplink_task = tokio::spawn (async move { - // Uplink - local client to relay server - - let mut buf = vec! [0u8; 65_536]; - loop { - let bytes_read = local_recv.read (&mut buf).await?; - let buf_slice = &buf [0..bytes_read]; - relay_send.write_all (buf_slice).await?; + let mut buf = vec! [0u8; 65_536]; + loop { + let bytes_read = local_recv.read (&mut buf).await?; + if bytes_read == 0 { + break; } - - debug! ("Uplink closed"); - - Ok::<_, anyhow::Error> (()) - }); + let buf_slice = &buf [0..bytes_read]; + debug! ("Uplink relaying {} bytes", bytes_read); + relay_send.write_all (buf_slice).await?; + } - let downlink_task = tokio::spawn (async move { - // Downlink - Relay server to local client - - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = relay_recv.read (&mut buf).await? { - let buf_slice = &buf [0..bytes_read]; - local_send.write_all (buf_slice).await?; - } - - debug! ("Downlink closed"); - - Ok::<_, anyhow::Error> (()) - }); + debug! ("Uplink closed"); - uplink_task.await??; - downlink_task.await??; - } - */ + Ok::<_, anyhow::Error> (()) + }); + + let downlink_task = tokio::spawn (async move { + // Downlink - Relay server to local client + + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = relay_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + debug! ("Downlink relaying {} bytes", bytes_read); + local_send.write_all (buf_slice).await?; + } + + debug! ("Downlink closed"); + + Ok::<_, anyhow::Error> (()) + }); + + uplink_task.await??; + downlink_task.await??; Ok (()) } diff --git a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs index d0d9c02..1ca5371 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs @@ -30,11 +30,20 @@ async fn main () -> anyhow::Result <()> { let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; + let mut req_buf = [0, 0, 0, 0]; + relay_recv.read_exact (&mut req_buf).await?; + + // TODO: Auth stuff + debug! ("Connecting to TCP end server"); let stream = TcpStream::connect ("127.0.0.1:30382").await?; let (mut local_recv, mut local_send) = stream.into_split (); + let resp_buf = [20, 0, 0, 0]; + relay_send.write_all (&resp_buf).await?; + relay_send.write_all (&resp_buf).await?; + debug! ("Relaying bytes..."); let uplink_task = tokio::spawn (async move { @@ -43,7 +52,11 @@ async fn main () -> anyhow::Result <()> { let mut buf = vec! [0u8; 65_536]; loop { let bytes_read = local_recv.read (&mut buf).await?; + if bytes_read == 0 { + break; + } let buf_slice = &buf [0..bytes_read]; + debug! ("Uplink relaying {} bytes", bytes_read); relay_send.write_all (buf_slice).await?; } @@ -58,6 +71,7 @@ async fn main () -> anyhow::Result <()> { let mut buf = vec! [0u8; 65_536]; while let Some (bytes_read) = relay_recv.read (&mut buf).await? { let buf_slice = &buf [0..bytes_read]; + debug! ("Downlink relaying {} bytes", bytes_read); local_send.write_all (buf_slice).await?; } diff --git a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs index 48f228a..a64c31e 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs @@ -45,12 +45,14 @@ async fn main () -> anyhow::Result <()> { match peer_type { 2 => { + let client_id = peer_id; while let Some (bi_stream) = bi_streams.next ().await { - let (mut send, mut recv) = bi_stream?; + let (mut client_send, mut client_recv) = bi_stream?; + let relay_state = Arc::clone (&relay_state); tokio::spawn (async move { let mut req_buf = [0u8; 4]; - recv.read_exact (&mut req_buf).await?; + client_recv.read_exact (&mut req_buf).await?; let cmd_type = req_buf [0]; match cmd_type { @@ -58,10 +60,31 @@ async fn main () -> anyhow::Result <()> { let server_id = req_buf [1]; debug! ("P2 {} wants to connect to P4 {}", peer_id, server_id); + + // TODO: Auth checks + + let resp_buf = [0, 0, 0, 0]; + client_send.write_all (&resp_buf).await?; + + { + let relay_state = relay_state.lock ().await; + match relay_state.p4_server_proxies.get (&server_id) { + Some (p4_state) => { + p4_state.req_channel.send (RequestP2ToP4 { + client_send, + client_recv, + client_id, + }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; + }, + None => warn! ("That server isn't connected"), + } + } }, _ => bail! ("Unknown command type from P2"), } + debug! ("Request ended for P2"); + Ok::<_, anyhow::Error> (()) }); } @@ -81,7 +104,45 @@ async fn main () -> anyhow::Result <()> { } while let Some (req) = rx.recv ().await { - debug! ("P4 {} got a request", peer_id); + let connection = connection.clone (); + + tokio::spawn (async move { + let RequestP2ToP4 { + client_send, + client_recv, + client_id, + } = req; + + debug! ("P4 {} got a request from P2 {}", peer_id, req.client_id); + + let (mut server_send, mut server_recv) = connection.open_bi ().await?; + + let req_buf = [2u8, client_id, 0, 0]; + server_send.write_all (&req_buf).await?; + + let mut resp_buf = [0u8, 0, 0, 0]; + server_recv.read_exact (&mut resp_buf).await?; + + let status_code = resp_buf [0]; + if status_code != 20 { + bail! ("P4 rejected request from {}", client_id); + } + + debug! ("Relaying bytes..."); + + let ptth_conn = PtthNewConnection { + client_send, + client_recv, + server_send, + server_recv, + }.build (); + + ptth_conn.uplink_task.await??; + ptth_conn.downlink_task.await??; + + debug! ("Request ended for P4"); + Ok::<_, anyhow::Error> (()) + }); } debug! ("P4 {} disconnected", peer_id); @@ -155,10 +216,10 @@ impl RelayState { } -enum RequestP2ToP4 { - Connect { - - }, +struct RequestP2ToP4 { + client_send: quinn::SendStream, + client_recv: quinn::RecvStream, + client_id: u8, } struct PtthNewConnection { @@ -187,7 +248,11 @@ impl PtthNewConnection { let mut buf = vec! [0u8; 65_536]; while let Some (bytes_read) = client_recv.read (&mut buf).await? { + if bytes_read == 0 { + break; + } let buf_slice = &buf [0..bytes_read]; + debug! ("Uplink relaying {} bytes", bytes_read); server_send.write_all (buf_slice).await?; } @@ -202,6 +267,7 @@ impl PtthNewConnection { let mut buf = vec! [0u8; 65_536]; while let Some (bytes_read) = server_recv.read (&mut buf).await? { let buf_slice = &buf [0..bytes_read]; + debug! ("Downlink relaying {} bytes", bytes_read); client_send.write_all (buf_slice).await?; }