diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index 6dbb1f6..9407d1d 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -46,37 +46,45 @@ async fn main () -> anyhow::Result <()> { let server_tcp_port = opt.server_tcp_port.unwrap_or (30382); let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?; - debug! ("Accepting local TCP connections from P1"); - // End of per-port stuff // Beginning of per-connection stuff - loop { - let (tcp_socket, _) = listener.accept ().await?; - let connection = connection.clone (); - let server_id = server_id.clone (); + let task_tcp_server = tokio::spawn (async move { + loop { + let (tcp_socket, _) = listener.accept ().await?; + let connection = connection.clone (); + let server_id = server_id.clone (); + + tokio::spawn (async move { + let (local_recv, local_send) = tcp_socket.into_split (); + + debug! ("Starting PTTH connection"); + + let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?; + + trace! ("Relaying bytes..."); + + let ptth_conn = quic_demo::connection::NewConnection { + local_send, + local_recv, + relay_send, + relay_recv, + }.build (); + + ptth_conn.wait_for_close ().await?; + + debug! ("Ended PTTH connection"); + + Ok::<_, anyhow::Error> (()) + }); + } - tokio::spawn (async move { - let (local_recv, local_send) = tcp_socket.into_split (); - - debug! ("Starting PTTH connection"); - - let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?; - - trace! ("Relaying bytes..."); - - let ptth_conn = quic_demo::connection::NewConnection { - local_send, - local_recv, - relay_send, - relay_recv, - }.build (); - - ptth_conn.wait_for_close ().await?; - - debug! ("Ended PTTH connection"); - - Ok::<_, anyhow::Error> (()) - }); - } + Ok::<_, anyhow::Error> (()) + }); + + debug! ("Accepting local TCP connections from P1"); + + task_tcp_server.await??; + + Ok (()) } diff --git a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs index f2e51c9..064c928 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs @@ -49,37 +49,87 @@ async fn main () -> anyhow::Result <()> { }; let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); - let http_server = Server::bind (&http_addr); - tokio::spawn (async move { - http_server.serve (make_svc).await + let tcp_port = 30382; + let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?; + + let task_quic_server = { + let relay_state = Arc::clone (&relay_state); + tokio::spawn (async move { + while let Some (conn) = incoming.next ().await { + let relay_state = Arc::clone (&relay_state); + + // Each new peer QUIC connection gets its own task + tokio::spawn (async move { + let active = relay_state.stats.quic.connect (); + debug! ("QUIC connections: {}", active); + + match handle_quic_connection (Arc::clone (&relay_state), conn).await { + Ok (_) => (), + Err (e) => warn! ("handle_quic_connection {:?}", e), + } + + let active = relay_state.stats.quic.disconnect (); + debug! ("QUIC connections: {}", active); + }); + } + + Ok::<_, anyhow::Error> (()) + }) + }; + + let task_http_server = tokio::spawn (async move { + http_server.serve (make_svc).await?; + Ok::<_, anyhow::Error> (()) }); + let task_tcp_server = { + let relay_state = Arc::clone (&relay_state); + tokio::spawn (async move { + loop { + let (tcp_socket, _) = tcp_listener.accept ().await?; + + let server_id = "bogus_server".to_string (); + + let relay_state = Arc::clone (&relay_state); + tokio::spawn (async move { + let (client_recv, client_send) = tcp_socket.into_split (); + + debug! ("Accepted direct TCP connection P1 --> P3"); + + let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; + let p4 = match p4_server_proxies.get ("bogus_server") { + Some (x) => x, + None => bail! ("That server isn't connected"), + }; + + unimplemented! (); + /* + p4.req_channel.send (RequestP2ToP4 { + client_send, + client_recv, + client_id: "bogus_client".to_string (), + }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; + */ + Ok (()) + }); + } + + Ok::<_, anyhow::Error> (()) + }) + }; + debug! ("Serving HTTP on {:?}", http_addr); - while let Some (conn) = incoming.next ().await { - let relay_state = Arc::clone (&relay_state); - - // Each new peer QUIC connection gets its own task - tokio::spawn (async move { - let active = relay_state.stats.quic.connect (); - debug! ("QUIC connections: {}", active); - - match handle_quic_connection (Arc::clone (&relay_state), conn).await { - Ok (_) => (), - Err (e) => warn! ("handle_quic_connection {:?}", e), - } - - let active = relay_state.stats.quic.disconnect (); - debug! ("QUIC connections: {}", active); - }); - } + task_quic_server.await??; + task_http_server.await??; + task_tcp_server.await??; Ok (()) } -async fn handle_http (req: Request
, relay_state: Arc