From f94b40b6b8c86f880c159e3794572c146daa3d55 Mon Sep 17 00:00:00 2001 From: _ <> Date: Tue, 12 Jan 2021 01:20:36 +0000 Subject: [PATCH] :construction: wip: further POC --- crates/ptth_forwarding/src/main.rs | 49 ++++++++++++++++++++++-- crates/ptth_forwarding_relay/src/main.rs | 19 +++++++++ 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/crates/ptth_forwarding/src/main.rs b/crates/ptth_forwarding/src/main.rs index 917b5be..1297f71 100644 --- a/crates/ptth_forwarding/src/main.rs +++ b/crates/ptth_forwarding/src/main.rs @@ -1,12 +1,18 @@ use clap::{App, SubCommand}; use reqwest::Client; use tokio::{ - io::AsyncWriteExt, + io::{ + AsyncReadExt, + AsyncWriteExt, + }, net::{ TcpStream, TcpListener, + tcp::OwnedReadHalf, }, + spawn, stream::StreamExt, + sync::mpsc, }; #[tokio::main] @@ -27,10 +33,10 @@ async fn main () -> anyhow::Result <()> { let client = Client::builder () .build ()?; - let mut tcp_stream = if let Some (matches) = matches.subcommand_matches ("server") { + let tcp_stream = if let Some (_matches) = matches.subcommand_matches ("server") { TcpStream::connect ("127.0.0.1:4010").await? } - else if let Some (matches) = matches.subcommand_matches ("client") { + else if let Some (_matches) = matches.subcommand_matches ("client") { let mut listener = TcpListener::bind ("127.0.0.1:4020").await?; let (stream, _addr) = listener.accept ().await?; stream @@ -39,12 +45,47 @@ async fn main () -> anyhow::Result <()> { panic! ("Must use server or client subcommand."); }; + let (tcp_upstream, mut writer) = tcp_stream.into_split (); + + let (upstream_tx, upstream_rx) = mpsc::channel (1); + + spawn (async move { + handle_upstream (tcp_upstream, upstream_tx).await + }); + + let upstream = client.post ("http://127.0.0.1:4003/").body (reqwest::Body::wrap_stream (upstream_rx)); + + spawn (async move { + upstream.send ().await + }); + let resp = client.get ("http://127.0.0.1:4003/").send ().await?; let mut downstream = resp.bytes_stream (); while let Some (Ok (item)) = downstream.next ().await { println! ("Chunk: {:?}", item); - tcp_stream.write_all (&item).await?; + writer.write_all (&item).await?; + } + + Ok (()) +} + +async fn handle_upstream ( + mut tcp_upstream: OwnedReadHalf, + mut upstream_tx: mpsc::Sender , anyhow::Error>> +) -> anyhow::Result <()> { + loop { + let mut buffer = vec! [0u8; 65_536]; + + let bytes_read = tcp_upstream.read (&mut buffer).await?; + buffer.truncate (bytes_read); + + println! ("Read {} bytes", bytes_read); + if bytes_read == 0 { + break; + } + + upstream_tx.send (Ok (buffer)).await?; } Ok (()) diff --git a/crates/ptth_forwarding_relay/src/main.rs b/crates/ptth_forwarding_relay/src/main.rs index c16d5bd..27ffb87 100644 --- a/crates/ptth_forwarding_relay/src/main.rs +++ b/crates/ptth_forwarding_relay/src/main.rs @@ -12,6 +12,7 @@ use hyper::{ }; use tokio::{ spawn, + stream::StreamExt, sync::mpsc, time::interval, }; @@ -78,6 +79,9 @@ impl HttpService { if req.method () == Method::GET { return Self::handle_gets (req, &*state).await; } + if req.method () == Method::POST { + return Self::handle_posts (req, &*state).await; + } Ok::<_, anyhow::Error> (Response::builder () .body (Body::from ("hello\n"))?) @@ -99,6 +103,21 @@ impl HttpService { .body (Body::wrap_stream (rx))?) } + async fn handle_posts (req: Request , state: &RelayState) + -> Result , anyhow::Error> + { + let id = Ulid::new ().to_string (); + trace! ("Upstream {} started", id); + let mut body = req.into_body (); + while let Some (Ok (item)) = body.next ().await { + println! ("Chunk: {:?}", item); + } + trace! ("Upstream {} ended", id); + + Ok::<_, anyhow::Error> (Response::builder () + .body (Body::from ("hello\n"))?) + } + async fn handle_downstream (mut tx: mpsc::Sender >) -> Result <(), anyhow::Error> { let mut int = interval (Duration::from_secs (1)); let mut counter = 0u64;