diff --git a/crates/ptth_forwarding/Cargo.toml b/crates/ptth_forwarding/Cargo.toml deleted file mode 100644 index f6a780d..0000000 --- a/crates/ptth_forwarding/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "ptth_forwarding" -version = "0.1.0" -authors = ["_"] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -anyhow = "1.0.38" -base64 = "0.13.0" -clap = "2.33.3" -futures = "0.3.7" -futures-util = "0.3.8" -reqwest = { version = "0.11.1", features = ["stream"] } -rmp-serde = "0.15.4" -serde = {version = "1.0.124", features = ["derive"]} -serde_json = "1.0.64" -structopt = "0.3.21" -thiserror = "1.0.24" -tokio = { version = "1.2.0", features = [] } -tokio-stream = "0.1.3" -tracing = "0.1.25" -tracing-futures = "0.2.5" -tracing-subscriber = "0.2.16" diff --git a/crates/ptth_forwarding/src/main.rs b/crates/ptth_forwarding/src/main.rs deleted file mode 100644 index 8c73c4f..0000000 --- a/crates/ptth_forwarding/src/main.rs +++ /dev/null @@ -1,93 +0,0 @@ -use clap::{App, SubCommand}; -use futures_util::StreamExt; -use reqwest::Client; -use tokio::{ - io::{ - AsyncReadExt, - AsyncWriteExt, - }, - net::{ - TcpStream, - TcpListener, - tcp::OwnedReadHalf, - }, - spawn, - sync::mpsc, -}; -use tokio_stream::wrappers::ReceiverStream; - -#[tokio::main] -async fn main () -> anyhow::Result <()> { - let matches = App::new ("ptth_forwarding") - .author ("Trish") - .about ("Terminator for PTTH port forwarding") - .subcommand ( - SubCommand::with_name ("server") - .about ("Run this on the host with the TCP server") - ) - .subcommand ( - SubCommand::with_name ("client") - .about ("Run this on the host with the TCP client") - ) - .get_matches (); - - let client = Client::builder () - .build ()?; - - let tcp_stream = if let Some (_matches) = matches.subcommand_matches ("server") { - TcpStream::connect ("127.0.0.1:4010").await? - } - else if let Some (_matches) = matches.subcommand_matches ("client") { - let listener = TcpListener::bind ("127.0.0.1:4020").await?; - let (stream, _addr) = listener.accept ().await?; - stream - } - else { - panic! ("Must use server or client subcommand."); - }; - - let (tcp_upstream, mut writer) = tcp_stream.into_split (); - - let (upstream_tx, upstream_rx) = mpsc::channel (1); - - spawn (async move { - handle_upstream (tcp_upstream, upstream_tx).await - }); - - let upstream = client.post ("http://127.0.0.1:4003/").body (reqwest::Body::wrap_stream (ReceiverStream::new (upstream_rx))); - - spawn (async move { - upstream.send ().await - }); - - let resp = client.get ("http://127.0.0.1:4003/").send ().await?; - let mut downstream = resp.bytes_stream (); - - while let Some (Ok (item)) = downstream.next ().await { - println! ("Chunk: {:?}", item); - writer.write_all (&item).await?; - } - - Ok (()) -} - -async fn handle_upstream ( - mut tcp_upstream: OwnedReadHalf, - upstream_tx: mpsc::Sender , anyhow::Error>> -) -> anyhow::Result <()> { - loop { - let mut buffer = vec! [0u8; 65_536]; - - let bytes_read = tcp_upstream.read (&mut buffer).await?; - buffer.truncate (bytes_read); - - println! ("Read {} bytes", bytes_read); - if bytes_read == 0 { - break; - } - - upstream_tx.send (Ok (buffer)).await?; - } - - Ok (()) -} diff --git a/crates/ptth_forwarding_relay/Cargo.toml b/crates/ptth_forwarding_relay/Cargo.toml deleted file mode 100644 index 91b9bab..0000000 --- a/crates/ptth_forwarding_relay/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "ptth_forwarding_relay" -version = "0.1.0" -authors = ["_"] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -anyhow = "1.0.38" -futures = "0.3.7" -futures-util = "0.3.8" -hyper = "0.14.4" -rusty_ulid = "0.10.1" -thiserror = "1.0.24" -tokio = { version = "1.2.0", features = [] } -tokio-stream = "0.1.3" -tracing = "0.1.25" -tracing-futures = "0.2.5" -tracing-subscriber = "0.2.16" diff --git a/crates/ptth_forwarding_relay/src/main.rs b/crates/ptth_forwarding_relay/src/main.rs deleted file mode 100644 index 0271eb4..0000000 --- a/crates/ptth_forwarding_relay/src/main.rs +++ /dev/null @@ -1,208 +0,0 @@ -use std::{ - collections::HashMap, - sync::Arc, - time::Duration, -}; - -use futures_util::StreamExt; -use hyper::{ - Body, - Method, - Request, - Response, - StatusCode, -}; -use tokio::{ - spawn, - sync::{ - RwLock, - mpsc, - }, - time::interval, -}; -use tokio_stream::wrappers::ReceiverStream; -use tracing::{ - info, trace, -}; -use tracing_subscriber::{ - fmt, - fmt::format::FmtSpan, - EnvFilter, -}; - -#[derive (Default)] -struct RelayState { - connections: HashMap , - client_opaques: HashMap , - server_opaques: HashMap , -} - -/* - -HTTP has 2 good pause points: - -- Client has uploaded request body, server has said nothing -- Server has sent status code + response headers - -Because we want to stream everything, there is no point in a single HTTP -req-resp pair -having both a streaming request body and a streaming response body. - -To move the state machine, the first request from client and server must not -be streaming. - -With all that in mind, the r - -*/ - -enum ConnectionState { - // We got 1 connection from the client. We need a 2nd to form the upstream. - WaitForUpstream (String, String), - // We got 2 connections from the client. We need the server to accept - // by sending its downstream. - WaitForAccept (String, String, String), - Connected (String, String, String, String), -} - -// An established connection has 4 individual HTTP streams - -struct EstablishedConnection { - // Request body of 'upstream' call - client_up: String, - - // Response body of 'connect' call - client_down: String, - - // Response body of 'listen' call - server_up: String, - - // Request body of 'accept' call - server_down: String, -} - -#[derive (Default)] -pub struct HttpService { - state: Arc -} - -impl HttpService { - pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> { - use std::net::SocketAddr; - - use hyper::{ - server::Server, - service::{ - make_service_fn, - service_fn, - }, - }; - - let make_svc = make_service_fn (|_conn| { - let state = self.state.clone (); - - async { - Ok::<_, String> (service_fn (move |req| { - let state = state.clone (); - - Self::handle_all (req, state) - })) - } - }); - - let addr = SocketAddr::from(([127, 0, 0, 1], port)); - - let server = Server::bind (&addr) - .serve (make_svc) - ; - - server.await - } - - async fn handle_all (req: Request , state: Arc ) - -> Result , anyhow::Error> - { - if req.method () == Method::GET { - return Self::handle_gets (req, &*state).await; - } - if req.method () == Method::POST { - return Self::handle_posts (req, &*state).await; - } - - Ok::<_, anyhow::Error> (Response::builder () - .body (Body::from ("hello\n"))?) - } - - async fn handle_gets (req: Request , state: &RelayState) - -> Result , anyhow::Error> - { - let (tx, rx) = mpsc::channel (1); - - spawn (async move { - let id = rusty_ulid::generate_ulid_string (); - trace! ("Downstream {} started", id); - Self::handle_downstream (tx).await.ok (); - trace! ("Downstream {} ended", id); - }); - - Ok::<_, anyhow::Error> (Response::builder () - .body (Body::wrap_stream (ReceiverStream::new (rx)))?) - } - - async fn handle_posts (req: Request , state: &RelayState) - -> Result , anyhow::Error> - { - let id = rusty_ulid::generate_ulid_string (); - trace! ("Upstream {} started", id); - let mut body = req.into_body (); - while let Some (Ok (item)) = body.next ().await { - println! ("Chunk: {:?}", item); - } - trace! ("Upstream {} ended", id); - - Ok::<_, anyhow::Error> (Response::builder () - .body (Body::from ("hello\n"))?) - } - - async fn handle_downstream (tx: mpsc::Sender >) -> Result <(), anyhow::Error> { - let mut int = interval (Duration::from_secs (1)); - let mut counter = 0u64; - - loop { - int.tick ().await; - - tx.send (Ok::<_, anyhow::Error> (format! ("Counter: {}\n", counter))).await?; - counter += 1; - } - } -} - -#[tokio::main] -async fn main () -> Result <(), anyhow::Error> { - use std::time::Duration; - - use tokio::{ - spawn, - time::interval, - }; - - fmt () - .with_env_filter (EnvFilter::from_default_env ()) - .with_span_events (FmtSpan::CLOSE) - .init () - ; - - let service = HttpService::default (); - - info! ("Starting relay"); - Ok (service.serve (4003).await?) -} - -#[cfg (test)] -mod tests { - use super::*; - - #[test] - fn state_machine () { - // assert! (false); - } -}