diff --git a/Cargo.lock b/Cargo.lock index 5dd14f0..ae75527 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1209,6 +1209,7 @@ dependencies = [ "fltk", "quic_demo", "quinn", + "reqwest", "structopt", "tokio", "tracing", @@ -1328,8 +1329,10 @@ dependencies = [ "anyhow", "base64", "futures-util", + "hyper", "quinn", "rcgen", + "reqwest", "rmp-serde", "structopt", "tokio", @@ -1499,9 +1502,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.2" +version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf12057f289428dbf5c591c74bf10392e4a8003f993405a902f20117019022d4" +checksum = "246e9f61b9bb77df069a947682be06e31ac43ea37862e244a69f177694ea6d22" dependencies = [ "base64", "bytes", diff --git a/build_ptth_server.bash b/build_ptth_server.bash index 6334552..71d612a 100755 --- a/build_ptth_server.bash +++ b/build_ptth_server.bash @@ -15,15 +15,19 @@ export LC_ALL="C" TEMP_GIBBERISH="ptth_server_build_BIHWLQXQ" DEST="$TEMP_GIBBERISH/ptth" +rm -rf "$TEMP_GIBBERISH" + mkdir "$TEMP_GIBBERISH" mkdir "$DEST" cargo build --release -p ptth_server +cargo build --release -p quic_demo --bin quic_demo_end_server mkdir -p "$DEST/handlebars/server" rsync -r handlebars/server/ "$DEST/handlebars/server/" -cp target/release/ptth_server "$DEST/ptth_server" +cp target/release/ptth_server "$DEST/" +cp target/release/quic_demo_end_server "$DEST/" ( cd "$TEMP_GIBBERISH" || exit diff --git a/docs/how-to/test-ptth-quic.md b/docs/how-to/test-ptth-quic.md new file mode 100644 index 0000000..ff92c1f --- /dev/null +++ b/docs/how-to/test-ptth-quic.md @@ -0,0 +1,24 @@ +# How-to: Test PTTH_QUIC + +## Initial setup + +- Open 3 terminals in `prototypes/quic_demo` +- Use `export RUST_LOG=quic_demo_relay_server=debug` to enable debug logging +for the terminal that will run the relay server (P3) +- Use `export RUST_LOG=quic_demo_end_server=debug` for the terminal that +will run the end server (P4) +- Use `export RUST_LOG=quic_demo_client=debug` for the terminal that +will run the client (P2) + +When the relay server is running, use curl to get the list of connected +end servers: `curl 127.0.0.1:4004` + +## Test loop - Happy path + +- Start a relay `cargo run --bin quic_demo_relay_server` +- Verify that the relay has no end servers connected +- Start an end server `cargo run --bin quic_demo_end_server -- --debug-echo` +- Verify that the end server connected +- Start a client `cargo run --bin quic_demo_client` +- Connect to the client and verify that the debug echo server is running +`nc 127.0.0.1 30381` diff --git a/prototypes/ptth_quic_client_gui/Cargo.toml b/prototypes/ptth_quic_client_gui/Cargo.toml index 7281233..995a913 100644 --- a/prototypes/ptth_quic_client_gui/Cargo.toml +++ b/prototypes/ptth_quic_client_gui/Cargo.toml @@ -12,6 +12,7 @@ anyhow = "1.0.38" fltk = "1.1.1" quic_demo = { path = "../quic_demo" } quinn = "0.7.2" +reqwest = "0.11.4" structopt = "0.3.20" tokio = { version = "1.8.1", features = ["full"] } tracing-subscriber = "0.2.16" diff --git a/prototypes/ptth_quic_client_gui/src/main.rs b/prototypes/ptth_quic_client_gui/src/main.rs index 4cfdfda..e11bfbe 100644 --- a/prototypes/ptth_quic_client_gui/src/main.rs +++ b/prototypes/ptth_quic_client_gui/src/main.rs @@ -25,6 +25,8 @@ struct Opt { relay_addr: Option , #[structopt (long)] client_id: Option , + #[structopt (long)] + cert_url: Option , } #[derive (Clone, Copy)] @@ -103,8 +105,15 @@ fn main () -> anyhow::Result <()> { wind.show (); let connection_p2_p3 = rt.block_on (async move { - let server_cert = tokio::fs::read ("quic_server.crt").await?; - let relay_addr = opt.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; + let server_cert = match opt.cert_url.as_ref () { + Some (url) => reqwest::get (url).await?.bytes ().await?, + None => tokio::fs::read ("quic_server.crt").await?.into (), + }; + + let relay_addr = opt.relay_addr + .unwrap_or_else (|| String::from ("127.0.0.1:30380")) + .parse () + .context ("relay_addr should be like 127.0.0.1:30380")?; let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?; trace! ("Connecting to relay server"); @@ -114,7 +123,8 @@ fn main () -> anyhow::Result <()> { let quinn::NewConnection { connection, .. - } = protocol::p2_connect_to_p3 (&endpoint, &relay_addr, &client_id).await?; + } = protocol::p2_connect_to_p3 (&endpoint, &relay_addr, &client_id).await + .context ("P2 can't connect to P3")?; Ok::<_, anyhow::Error> (connection) })?; diff --git a/prototypes/quic_demo/Cargo.toml b/prototypes/quic_demo/Cargo.toml index 2aa1f36..d10125b 100644 --- a/prototypes/quic_demo/Cargo.toml +++ b/prototypes/quic_demo/Cargo.toml @@ -10,9 +10,12 @@ license = "AGPL-3.0" [dependencies] anyhow = "1.0.38" base64 = "0.13.0" +# fltk = "1.1.1" futures-util = "0.3.9" +hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] } quinn = "0.7.2" rcgen = "0.8.11" +reqwest = "0.11.4" rmp-serde = "0.15.5" structopt = "0.3.20" tokio = { version = "1.8.1", features = ["full"] } diff --git a/prototypes/quic_demo/src/bin/quic_demo_client.rs b/prototypes/quic_demo/src/bin/quic_demo_client.rs index 6dbb1f6..9407d1d 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_client.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_client.rs @@ -46,37 +46,45 @@ async fn main () -> anyhow::Result <()> { let server_tcp_port = opt.server_tcp_port.unwrap_or (30382); let listener = TcpListener::bind (("127.0.0.1", client_tcp_port)).await?; - debug! ("Accepting local TCP connections from P1"); - // End of per-port stuff // Beginning of per-connection stuff - loop { - let (tcp_socket, _) = listener.accept ().await?; - let connection = connection.clone (); - let server_id = server_id.clone (); + let task_tcp_server = tokio::spawn (async move { + loop { + let (tcp_socket, _) = listener.accept ().await?; + let connection = connection.clone (); + let server_id = server_id.clone (); + + tokio::spawn (async move { + let (local_recv, local_send) = tcp_socket.into_split (); + + debug! ("Starting PTTH connection"); + + let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?; + + trace! ("Relaying bytes..."); + + let ptth_conn = quic_demo::connection::NewConnection { + local_send, + local_recv, + relay_send, + relay_recv, + }.build (); + + ptth_conn.wait_for_close ().await?; + + debug! ("Ended PTTH connection"); + + Ok::<_, anyhow::Error> (()) + }); + } - tokio::spawn (async move { - let (local_recv, local_send) = tcp_socket.into_split (); - - debug! ("Starting PTTH connection"); - - let (relay_send, relay_recv) = protocol::p2_connect_to_p5 (&connection, &server_id, server_tcp_port).await?; - - trace! ("Relaying bytes..."); - - let ptth_conn = quic_demo::connection::NewConnection { - local_send, - local_recv, - relay_send, - relay_recv, - }.build (); - - ptth_conn.wait_for_close ().await?; - - debug! ("Ended PTTH connection"); - - Ok::<_, anyhow::Error> (()) - }); - } + Ok::<_, anyhow::Error> (()) + }); + + debug! ("Accepting local TCP connections from P1"); + + task_tcp_server.await??; + + Ok (()) } diff --git a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs index 19ba94d..5f28433 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_end_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_end_server.rs @@ -10,37 +10,46 @@ struct Opt { relay_addr: Option , #[structopt (long)] server_id: Option , + #[structopt (long)] + debug_echo: bool, + #[structopt (long)] + cert_url: Option , } #[tokio::main] async fn main () -> anyhow::Result <()> { tracing_subscriber::fmt::init (); - let opt = Opt::from_args (); + let opt = Arc::new (Opt::from_args ()); - let server_cert = tokio::fs::read ("quic_server.crt").await?; - let relay_addr = opt.relay_addr.unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; + let server_cert = match opt.cert_url.as_ref () { + Some (url) => reqwest::get (url).await?.bytes ().await?, + None => tokio::fs::read ("quic_server.crt").await?.into (), + }; + let relay_addr = opt.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&server_cert])?; trace! ("Connecting to relay server"); - let server_id = opt.server_id.unwrap_or_else (|| "bogus_server".to_string ()); + let server_id = opt.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ()); let quinn::NewConnection { mut bi_streams, .. } = protocol::p4_connect_to_p3 (&endpoint, &relay_addr, &server_id).await?; + debug! ("Connected to relay server"); trace! ("Accepting bi streams from P3"); loop { let (relay_send, relay_recv) = bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("Relay server didn't open a bi stream"))??; - tokio::spawn (handle_bi_stream (relay_send, relay_recv)); + tokio::spawn (handle_bi_stream (Arc::clone (&opt), relay_send, relay_recv)); } } async fn handle_bi_stream ( + opt: Arc , relay_send: quinn::SendStream, mut relay_recv: quinn::RecvStream, ) -> anyhow::Result <()> @@ -49,13 +58,14 @@ async fn handle_bi_stream ( protocol::P3ToP4Stream::NewPtthConnection { client_id, .. - } => handle_new_ptth_connection (relay_send, relay_recv, client_id).await?, + } => handle_new_ptth_connection (opt, relay_send, relay_recv, client_id).await?, } Ok (()) } async fn handle_new_ptth_connection ( + opt: Arc , mut relay_send: quinn::SendStream, mut relay_recv: quinn::RecvStream, _client_id: String, @@ -72,19 +82,26 @@ async fn handle_new_ptth_connection ( debug! ("Started PTTH connection"); - let stream = TcpStream::connect (("127.0.0.1", p4_to_p5_req.port)).await?; - let (local_recv, local_send) = stream.into_split (); - - trace! ("Relaying bytes..."); - - let ptth_conn = quic_demo::connection::NewConnection { - local_send, - local_recv, - relay_send, - relay_recv, - }.build (); - - ptth_conn.wait_for_close ().await?; + if opt.debug_echo { + relay_send.write (b"Connected to P4=P5 debug echo server\n").await?; + debug! ("Relaying bytes using internal debug echo server (P4=P5)"); + tokio::io::copy (&mut relay_recv, &mut relay_send).await?; + } + else { + let stream = TcpStream::connect (("127.0.0.1", p4_to_p5_req.port)).await?; + let (local_recv, local_send) = stream.into_split (); + + trace! ("Relaying bytes..."); + + let ptth_conn = quic_demo::connection::NewConnection { + local_send, + local_recv, + relay_send, + relay_recv, + }.build (); + + ptth_conn.wait_for_close ().await?; + } Ok (()) } diff --git a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs index 171fc7c..dcb5d5e 100644 --- a/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs +++ b/prototypes/quic_demo/src/bin/quic_demo_relay_server.rs @@ -1,3 +1,14 @@ +use hyper::{ + Body, + Request, + Response, + Server, + service::{ + make_service_fn, + service_fn, + }, + StatusCode, +}; use structopt::StructOpt; use quic_demo::prelude::*; @@ -18,32 +29,126 @@ async fn main () -> anyhow::Result <()> { let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; let (mut incoming, server_cert) = make_server_endpoint (listen_addr)?; println! ("Base64 cert: {}", base64::encode (&server_cert)); - tokio::fs::write ("quic_server.crt", &server_cert).await?; - let relay_state = RelayState::default (); - let relay_state = Arc::new (relay_state); + tokio::fs::create_dir_all ("ptth_quic_output").await?; + tokio::fs::write ("ptth_quic_output/quic_server.crt", &server_cert).await?; - while let Some (conn) = incoming.next ().await { + let relay_state = Arc::new (RelayState::default ()); + + let make_svc = { let relay_state = Arc::clone (&relay_state); - - // Each new peer QUIC connection gets its own task - tokio::spawn (async move { - let active = relay_state.stats.quic.connect (); - debug! ("QUIC connections: {}", active); + make_service_fn (move |_conn| { + let relay_state = Arc::clone (&relay_state); - match handle_quic_connection (Arc::clone (&relay_state), conn).await { - Ok (_) => (), - Err (e) => warn! ("handle_quic_connection {:?}", e), + async move { + Ok::<_, String> (service_fn (move |req| { + let relay_state = Arc::clone (&relay_state); + + handle_http (req, relay_state) + })) + } + }) + }; + + let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); + let http_server = Server::bind (&http_addr); + + let tcp_port = 30382; + let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?; + + let task_quic_server = { + let relay_state = Arc::clone (&relay_state); + tokio::spawn (async move { + while let Some (conn) = incoming.next ().await { + let relay_state = Arc::clone (&relay_state); + + // Each new peer QUIC connection gets its own task + tokio::spawn (async move { + let active = relay_state.stats.quic.connect (); + debug! ("QUIC connections: {}", active); + + match handle_quic_connection (Arc::clone (&relay_state), conn).await { + Ok (_) => (), + Err (e) => warn! ("handle_quic_connection {:?}", e), + } + + let active = relay_state.stats.quic.disconnect (); + debug! ("QUIC connections: {}", active); + }); } - let active = relay_state.stats.quic.disconnect (); - debug! ("QUIC connections: {}", active); - }); - } + Ok::<_, anyhow::Error> (()) + }) + }; + + let task_http_server = tokio::spawn (async move { + http_server.serve (make_svc).await?; + Ok::<_, anyhow::Error> (()) + }); + + let task_tcp_server = { + let relay_state = Arc::clone (&relay_state); + tokio::spawn (async move { + loop { + let (tcp_socket, _) = tcp_listener.accept ().await?; + + let server_id = "bogus_server".to_string (); + + let relay_state = Arc::clone (&relay_state); + tokio::spawn (async move { + let (client_recv, client_send) = tcp_socket.into_split (); + + debug! ("Accepted direct TCP connection P1 --> P3"); + + let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; + let p4 = match p4_server_proxies.get ("bogus_server") { + Some (x) => x, + None => bail! ("That server isn't connected"), + }; + + unimplemented! (); + /* + p4.req_channel.send (RequestP2ToP4 { + client_send, + client_recv, + client_id: "bogus_client".to_string (), + }).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; + */ + Ok (()) + }); + } + + Ok::<_, anyhow::Error> (()) + }) + }; + + debug! ("Serving HTTP on {:?}", http_addr); + + task_quic_server.await??; + task_http_server.await??; + task_tcp_server.await??; Ok (()) } +async fn handle_http (_req: Request , relay_state: Arc ) +-> anyhow::Result > +{ + let debug_string; + { + let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; + + debug_string = format! ("{:#?}\n", p4_server_proxies.keys ().collect::> ()); + } + + let resp = Response::builder () + .status (StatusCode::OK) + .header ("content-type", "text/plain") + .body (Body::from (debug_string))?; + + Ok (resp) +} + #[derive (Default)] struct RelayState { p4_server_proxies: Mutex >, diff --git a/prototypes/quic_demo/src/prelude.rs b/prototypes/quic_demo/src/prelude.rs index 0315eda..03fc349 100644 --- a/prototypes/quic_demo/src/prelude.rs +++ b/prototypes/quic_demo/src/prelude.rs @@ -1,5 +1,6 @@ pub use std::{ collections::*, + net::SocketAddr, sync::{ Arc, atomic::{ @@ -20,6 +21,7 @@ pub use tokio::{ AsyncReadExt, AsyncWriteExt, }, + net::TcpListener, sync::{ Mutex, mpsc, diff --git a/readme_draft.md b/readme_draft.md index 5321a76..95fc654 100644 --- a/readme_draft.md +++ b/readme_draft.md @@ -1,5 +1,7 @@ ![The PTTH logo, a green box sitting on a black conveyor belt. The box has an arrow pointing left, and the text "PTTH", in white. The conveyor belt has an arrow pointing right, in white.](assets/logo-128-pixel.png) +TODO: "Splitting a server in half" diagram + # PTTH PTTH is a web server.