From 0cb24695d009487dd441c026c6c838d5c0f4c6bf Mon Sep 17 00:00:00 2001 From: _ <> Date: Tue, 12 Jan 2021 00:22:55 +0000 Subject: [PATCH] :construction: wip: idea for tunneling TCP over HTTP There's a lot of missing pieces, but the big picture is like this: - Use 2 completely separate HTTP streams, and try to keep them alive as long as possible, each in basically half-duplex mode - Each stream has a long-running PUT and GET, sort of like station307 - Each end has to be terminated by a native app that either connects to a local TCP server, or acts as a local TCP server - No clue how it would work for multiple connections on the same port. Poorly, I guess? - It's probably gonna run like garbage because we're splitting TCP into 2 TCP streams, and although backpressure might work, the ACKs will be less efficient. And the congestion control might get confused My only goal is to tunnel Tracy over it, so that I can have that remotely. --- Cargo.lock | 34 ++++++ crates/ptth_forwarding/Cargo.toml | 22 ++++ crates/ptth_forwarding/src/main.rs | 24 ++++ crates/ptth_forwarding_relay/Cargo.toml | 18 +++ crates/ptth_forwarding_relay/src/main.rs | 138 +++++++++++++++++++++++ 5 files changed, 236 insertions(+) create mode 100644 crates/ptth_forwarding/Cargo.toml create mode 100644 crates/ptth_forwarding/src/main.rs create mode 100644 crates/ptth_forwarding_relay/Cargo.toml create mode 100644 crates/ptth_forwarding_relay/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 4e83088..18c0263 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1656,6 +1656,40 @@ dependencies = [ "uom", ] +[[package]] +name = "ptth_forwarding" +version = "0.1.0" +dependencies = [ + "anyhow", + "base64 0.12.3", + "futures", + "reqwest", + "rmp-serde", + "serde", + "serde_json", + "structopt", + "thiserror", + "tokio", + "tracing", + "tracing-futures", + "tracing-subscriber", +] + +[[package]] +name = "ptth_forwarding_relay" +version = "0.1.0" +dependencies = [ + "anyhow", + "futures", + "hyper", + "thiserror", + "tokio", + "tracing", + "tracing-futures", + "tracing-subscriber", + "ulid", +] + [[package]] name = "ptth_kv" version = "0.1.0" diff --git a/crates/ptth_forwarding/Cargo.toml b/crates/ptth_forwarding/Cargo.toml new file mode 100644 index 0000000..fc9c862 --- /dev/null +++ b/crates/ptth_forwarding/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "ptth_forwarding" +version = "0.1.0" +authors = ["_"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.34" +base64 = "0.12.3" +futures = "0.3.7" +reqwest = { version = "0.10.8", features = ["stream"] } +rmp-serde = "0.14.4" +serde = {version = "1.0.117", features = ["derive"]} +serde_json = "1.0.60" +structopt = "0.3.20" +thiserror = "1.0.22" +tokio = { version = "0.2.22", features = ["full"] } +tracing = "0.1.21" +tracing-futures = "0.2.4" +tracing-subscriber = "0.2.15" diff --git a/crates/ptth_forwarding/src/main.rs b/crates/ptth_forwarding/src/main.rs new file mode 100644 index 0000000..57ccd0e --- /dev/null +++ b/crates/ptth_forwarding/src/main.rs @@ -0,0 +1,24 @@ +use reqwest::Client; +use tokio::{ + io::AsyncWriteExt, + net::TcpStream, + stream::StreamExt, +}; + +#[tokio::main] +async fn main () -> anyhow::Result <()> { + let client = Client::builder () + .build ()?; + + let mut tcp_stream = TcpStream::connect ("127.0.0.1:4010").await?; + + let resp = client.get ("http://127.0.0.1:4003/").send ().await?; + let mut downstream = resp.bytes_stream (); + + while let Some (Ok (item)) = downstream.next ().await { + println! ("Chunk: {:?}", item); + tcp_stream.write_all (&item).await?; + } + + Ok (()) +} diff --git a/crates/ptth_forwarding_relay/Cargo.toml b/crates/ptth_forwarding_relay/Cargo.toml new file mode 100644 index 0000000..50d6a60 --- /dev/null +++ b/crates/ptth_forwarding_relay/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "ptth_forwarding_relay" +version = "0.1.0" +authors = ["_"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.34" +futures = "0.3.7" +hyper = "0.13.8" +thiserror = "1.0.22" +tokio = { version = "0.2.22", features = ["full"] } +tracing = "0.1.21" +tracing-futures = "0.2.4" +tracing-subscriber = "0.2.15" +ulid = "0.4.1" diff --git a/crates/ptth_forwarding_relay/src/main.rs b/crates/ptth_forwarding_relay/src/main.rs new file mode 100644 index 0000000..c16d5bd --- /dev/null +++ b/crates/ptth_forwarding_relay/src/main.rs @@ -0,0 +1,138 @@ +use std::{ + sync::Arc, + time::Duration, +}; + +use hyper::{ + Body, + Method, + Request, + Response, + StatusCode, +}; +use tokio::{ + spawn, + sync::mpsc, + time::interval, +}; +use tracing::{ + info, trace, +}; +use tracing_subscriber::{ + fmt, + fmt::format::FmtSpan, + EnvFilter, +}; +use ulid::Ulid; + +pub struct RelayState { + +} + +pub struct HttpService { + state: Arc +} + +impl HttpService { + pub fn new () -> Self { + Self { + state: Arc::new (RelayState {}), + } + } + + pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> { + use std::net::SocketAddr; + + use hyper::{ + server::Server, + service::{ + make_service_fn, + service_fn, + }, + }; + + let make_svc = make_service_fn (|_conn| { + let state = self.state.clone (); + + async { + Ok::<_, String> (service_fn (move |req| { + let state = state.clone (); + + Self::handle_all (req, state) + })) + } + }); + + let addr = SocketAddr::from(([127, 0, 0, 1], port)); + + let server = Server::bind (&addr) + .serve (make_svc) + ; + + server.await + } + + async fn handle_all (req: Request , state: Arc ) + -> Result , anyhow::Error> + { + if req.method () == Method::GET { + return Self::handle_gets (req, &*state).await; + } + + Ok::<_, anyhow::Error> (Response::builder () + .body (Body::from ("hello\n"))?) + } + + async fn handle_gets (req: Request , state: &RelayState) + -> Result , anyhow::Error> + { + let (mut tx, rx) = mpsc::channel (1); + + spawn (async move { + let id = Ulid::new ().to_string (); + trace! ("Downstream {} started", id); + Self::handle_downstream (tx).await.ok (); + trace! ("Downstream {} ended", id); + }); + + Ok::<_, anyhow::Error> (Response::builder () + .body (Body::wrap_stream (rx))?) + } + + async fn handle_downstream (mut tx: mpsc::Sender >) -> Result <(), anyhow::Error> { + let mut int = interval (Duration::from_secs (1)); + let mut counter = 0u64; + + loop { + int.tick ().await; + + tx.send (Ok::<_, anyhow::Error> (format! ("Counter: {}\n", counter))).await?; + counter += 1; + } + + Ok (()) + } +} + + + +#[tokio::main] +async fn main () -> Result <(), anyhow::Error> { + use std::time::Duration; + + use tokio::{ + spawn, + time::interval, + }; + + fmt () + .with_env_filter (EnvFilter::from_default_env ()) + .with_span_events (FmtSpan::CLOSE) + .init () + ; + + let service = HttpService::new (); + + info! ("Starting relay"); + Ok (service.serve (4003).await?) +}