diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index 03e2ec8..058f710 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -27,17 +27,24 @@ async fn main () -> anyhow::Result <()> { let mut resp_buf = [0u8, 0, 0, 0]; recv.read_exact (&mut resp_buf).await?; + debug! ("Waiting for local TCP client to connect to us"); + + let listener = TcpListener::bind ("127.0.0.1:30381").await?; + let (tcp_socket, _) = listener.accept ().await?; + let (mut local_recv, mut local_send) = tcp_socket.into_split (); + + debug! ("Connecting to end server"); + + let (mut send, mut recv) = connection.open_bi ().await?; + + let req_buf = [1, 43, 0, 0, 1, 0, 0, 0]; + send.write_all (&req_buf).await?; + + let mut resp_buf = [0; 8]; + recv.read_exact (&mut resp_buf).await?; + + /* if false { - debug! ("Waiting for local TCP client to connect to us"); - - let listener = TcpListener::bind ("127.0.0.1:30381").await?; - let (tcp_socket, _) = listener.accept ().await?; - let (mut local_recv, mut local_send) = tcp_socket.into_split (); - - debug! ("Connecting to end server"); - - let (mut relay_send, mut relay_recv) = connection.open_bi ().await?; - debug! ("Relaying bytes..."); let uplink_task = tokio::spawn (async move { @@ -72,6 +79,6 @@ async fn main () -> anyhow::Result <()> { uplink_task.await??; downlink_task.await??; } - + */ Ok (()) } diff --git a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs index 4977c3b..d0d9c02 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs @@ -26,50 +26,48 @@ async fn main () -> anyhow::Result <()> { let mut resp_buf = [0u8, 0, 0, 0]; recv.read_exact (&mut resp_buf).await?; - if false { - debug! ("Waiting for relay server to forward a bi stream"); + debug! ("Waiting for relay server to forward a bi stream"); + + let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; + + debug! ("Connecting to TCP end server"); + + let stream = TcpStream::connect ("127.0.0.1:30382").await?; + let (mut local_recv, mut local_send) = stream.into_split (); + + debug! ("Relaying bytes..."); + + let uplink_task = tokio::spawn (async move { + // Uplink - local client to relay server - let (mut relay_send, mut relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; + let mut buf = vec! [0u8; 65_536]; + loop { + let bytes_read = local_recv.read (&mut buf).await?; + let buf_slice = &buf [0..bytes_read]; + relay_send.write_all (buf_slice).await?; + } - debug! ("Connecting to TCP end server"); + debug! ("Uplink closed"); - let stream = TcpStream::connect ("127.0.0.1:30382").await?; - let (mut local_recv, mut local_send) = stream.into_split (); + Ok::<_, anyhow::Error> (()) + }); + + let downlink_task = tokio::spawn (async move { + // Downlink - Relay server to local client - debug! ("Relaying bytes..."); + let mut buf = vec! [0u8; 65_536]; + while let Some (bytes_read) = relay_recv.read (&mut buf).await? { + let buf_slice = &buf [0..bytes_read]; + local_send.write_all (buf_slice).await?; + } - let uplink_task = tokio::spawn (async move { - // Uplink - local client to relay server - - let mut buf = vec! [0u8; 65_536]; - loop { - let bytes_read = local_recv.read (&mut buf).await?; - let buf_slice = &buf [0..bytes_read]; - relay_send.write_all (buf_slice).await?; - } - - debug! ("Uplink closed"); - - Ok::<_, anyhow::Error> (()) - }); + debug! ("Downlink closed"); - let downlink_task = tokio::spawn (async move { - // Downlink - Relay server to local client - - let mut buf = vec! [0u8; 65_536]; - while let Some (bytes_read) = relay_recv.read (&mut buf).await? { - let buf_slice = &buf [0..bytes_read]; - local_send.write_all (buf_slice).await?; - } - - debug! ("Downlink closed"); - - Ok::<_, anyhow::Error> (()) - }); - - uplink_task.await??; - downlink_task.await??; - } + Ok::<_, anyhow::Error> (()) + }); + + uplink_task.await??; + downlink_task.await??; Ok (()) } diff --git a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs index 13886cc..48f228a 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs @@ -44,15 +44,52 @@ async fn main () -> anyhow::Result <()> { send.write_all (&resp_buf).await?; match peer_type { - 2 => while let Some (bi_stream) = bi_streams.next ().await { - let (mut send, mut recv) = bi_stream?; + 2 => { + while let Some (bi_stream) = bi_streams.next ().await { + let (mut send, mut recv) = bi_stream?; + + tokio::spawn (async move { + let mut req_buf = [0u8; 4]; + recv.read_exact (&mut req_buf).await?; + + let cmd_type = req_buf [0]; + match cmd_type { + 1 => { + let server_id = req_buf [1]; + + debug! ("P2 {} wants to connect to P4 {}", peer_id, server_id); + }, + _ => bail! ("Unknown command type from P2"), + } + + Ok::<_, anyhow::Error> (()) + }); + } - // Each new + debug! ("P2 {} disconnected", peer_id); + }, + 4 => { + let (tx, mut rx) = mpsc::channel (2); + + let p4_state = P4State { + req_channel: tx, + }; + + { + let mut relay_state = relay_state.lock ().await; + relay_state.p4_server_proxies.insert (peer_id, p4_state); + } + + while let Some (req) = rx.recv ().await { + debug! ("P4 {} got a request", peer_id); + } + + debug! ("P4 {} disconnected", peer_id); }, - 4 => (), _ => bail! ("Unknown QUIC client type"), } + debug! ("Peer {} disconnected", peer_id); Ok::<_, anyhow::Error> (()) }); } @@ -107,13 +144,23 @@ async fn main () -> anyhow::Result <()> { #[derive (Default)] struct RelayState { - + p4_server_proxies: HashMap , +} + +struct P4State { + req_channel: mpsc::Sender , } impl RelayState { } +enum RequestP2ToP4 { + Connect { + + }, +} + struct PtthNewConnection { client_send: quinn::SendStream, client_recv: quinn::RecvStream, diff --git a/prototypes/quic_demo/src/prelude.rs b/prototypes/quic_demo/src/prelude.rs index 13562f4..4da3209 100644 --- a/prototypes/quic_demo/src/prelude.rs +++ b/prototypes/quic_demo/src/prelude.rs @@ -1,4 +1,5 @@ pub use std::{ + collections::*, sync::Arc, time::Duration, }; @@ -10,7 +11,10 @@ pub use tokio::{ AsyncReadExt, AsyncWriteExt, }, - sync::Mutex, + sync::{ + Mutex, + mpsc, + }, task::JoinHandle, }; pub use tracing::{