diff --git a/Cargo.lock b/Cargo.lock index 4e83088..18c0263 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1656,6 +1656,40 @@ dependencies = [ "uom", ] +[[package]] +name = "ptth_forwarding" +version = "0.1.0" +dependencies = [ + "anyhow", + "base64 0.12.3", + "futures", + "reqwest", + "rmp-serde", + "serde", + "serde_json", + "structopt", + "thiserror", + "tokio", + "tracing", + "tracing-futures", + "tracing-subscriber", +] + +[[package]] +name = "ptth_forwarding_relay" +version = "0.1.0" +dependencies = [ + "anyhow", + "futures", + "hyper", + "thiserror", + "tokio", + "tracing", + "tracing-futures", + "tracing-subscriber", + "ulid", +] + [[package]] name = "ptth_kv" version = "0.1.0" diff --git a/crates/ptth_forwarding/Cargo.toml b/crates/ptth_forwarding/Cargo.toml new file mode 100644 index 0000000..fc9c862 --- /dev/null +++ b/crates/ptth_forwarding/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "ptth_forwarding" +version = "0.1.0" +authors = ["_"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.34" +base64 = "0.12.3" +futures = "0.3.7" +reqwest = { version = "0.10.8", features = ["stream"] } +rmp-serde = "0.14.4" +serde = {version = "1.0.117", features = ["derive"]} +serde_json = "1.0.60" +structopt = "0.3.20" +thiserror = "1.0.22" +tokio = { version = "0.2.22", features = ["full"] } +tracing = "0.1.21" +tracing-futures = "0.2.4" +tracing-subscriber = "0.2.15" diff --git a/crates/ptth_forwarding/src/main.rs b/crates/ptth_forwarding/src/main.rs new file mode 100644 index 0000000..57ccd0e --- /dev/null +++ b/crates/ptth_forwarding/src/main.rs @@ -0,0 +1,24 @@ +use reqwest::Client; +use tokio::{ + io::AsyncWriteExt, + net::TcpStream, + stream::StreamExt, +}; + +#[tokio::main] +async fn main () -> anyhow::Result <()> { + let client = Client::builder () + .build ()?; + + let mut tcp_stream = TcpStream::connect ("127.0.0.1:4010").await?; + + let resp = client.get ("http://127.0.0.1:4003/").send ().await?; + let mut downstream = resp.bytes_stream (); + + while let Some (Ok (item)) = downstream.next ().await { + println! ("Chunk: {:?}", item); + tcp_stream.write_all (&item).await?; + } + + Ok (()) +} diff --git a/crates/ptth_forwarding_relay/Cargo.toml b/crates/ptth_forwarding_relay/Cargo.toml new file mode 100644 index 0000000..50d6a60 --- /dev/null +++ b/crates/ptth_forwarding_relay/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "ptth_forwarding_relay" +version = "0.1.0" +authors = ["_"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.34" +futures = "0.3.7" +hyper = "0.13.8" +thiserror = "1.0.22" +tokio = { version = "0.2.22", features = ["full"] } +tracing = "0.1.21" +tracing-futures = "0.2.4" +tracing-subscriber = "0.2.15" +ulid = "0.4.1" diff --git a/crates/ptth_forwarding_relay/src/main.rs b/crates/ptth_forwarding_relay/src/main.rs new file mode 100644 index 0000000..c16d5bd --- /dev/null +++ b/crates/ptth_forwarding_relay/src/main.rs @@ -0,0 +1,138 @@ +use std::{ + sync::Arc, + time::Duration, +}; + +use hyper::{ + Body, + Method, + Request, + Response, + StatusCode, +}; +use tokio::{ + spawn, + sync::mpsc, + time::interval, +}; +use tracing::{ + info, trace, +}; +use tracing_subscriber::{ + fmt, + fmt::format::FmtSpan, + EnvFilter, +}; +use ulid::Ulid; + +pub struct RelayState { + +} + +pub struct HttpService { + state: Arc +} + +impl HttpService { + pub fn new () -> Self { + Self { + state: Arc::new (RelayState {}), + } + } + + pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> { + use std::net::SocketAddr; + + use hyper::{ + server::Server, + service::{ + make_service_fn, + service_fn, + }, + }; + + let make_svc = make_service_fn (|_conn| { + let state = self.state.clone (); + + async { + Ok::<_, String> (service_fn (move |req| { + let state = state.clone (); + + Self::handle_all (req, state) + })) + } + }); + + let addr = SocketAddr::from(([127, 0, 0, 1], port)); + + let server = Server::bind (&addr) + .serve (make_svc) + ; + + server.await + } + + async fn handle_all (req: Request , state: Arc ) + -> Result , anyhow::Error> + { + if req.method () == Method::GET { + return Self::handle_gets (req, &*state).await; + } + + Ok::<_, anyhow::Error> (Response::builder () + .body (Body::from ("hello\n"))?) + } + + async fn handle_gets (req: Request , state: &RelayState) + -> Result , anyhow::Error> + { + let (mut tx, rx) = mpsc::channel (1); + + spawn (async move { + let id = Ulid::new ().to_string (); + trace! ("Downstream {} started", id); + Self::handle_downstream (tx).await.ok (); + trace! ("Downstream {} ended", id); + }); + + Ok::<_, anyhow::Error> (Response::builder () + .body (Body::wrap_stream (rx))?) + } + + async fn handle_downstream (mut tx: mpsc::Sender >) -> Result <(), anyhow::Error> { + let mut int = interval (Duration::from_secs (1)); + let mut counter = 0u64; + + loop { + int.tick ().await; + + tx.send (Ok::<_, anyhow::Error> (format! ("Counter: {}\n", counter))).await?; + counter += 1; + } + + Ok (()) + } +} + + + +#[tokio::main] +async fn main () -> Result <(), anyhow::Error> { + use std::time::Duration; + + use tokio::{ + spawn, + time::interval, + }; + + fmt () + .with_env_filter (EnvFilter::from_default_env ()) + .with_span_events (FmtSpan::CLOSE) + .init () + ; + + let service = HttpService::new (); + + info! ("Starting relay"); + Ok (service.serve (4003).await?) +}