🚧 wip: further POC

main
_ 2021-01-12 01:20:36 +00:00
parent 8d62b29319
commit f94b40b6b8
2 changed files with 64 additions and 4 deletions

View File

@ -1,12 +1,18 @@
use clap::{App, SubCommand}; use clap::{App, SubCommand};
use reqwest::Client; use reqwest::Client;
use tokio::{ use tokio::{
io::AsyncWriteExt, io::{
AsyncReadExt,
AsyncWriteExt,
},
net::{ net::{
TcpStream, TcpStream,
TcpListener, TcpListener,
tcp::OwnedReadHalf,
}, },
spawn,
stream::StreamExt, stream::StreamExt,
sync::mpsc,
}; };
#[tokio::main] #[tokio::main]
@ -27,10 +33,10 @@ async fn main () -> anyhow::Result <()> {
let client = Client::builder () let client = Client::builder ()
.build ()?; .build ()?;
let mut tcp_stream = if let Some (matches) = matches.subcommand_matches ("server") { let tcp_stream = if let Some (_matches) = matches.subcommand_matches ("server") {
TcpStream::connect ("127.0.0.1:4010").await? TcpStream::connect ("127.0.0.1:4010").await?
} }
else if let Some (matches) = matches.subcommand_matches ("client") { else if let Some (_matches) = matches.subcommand_matches ("client") {
let mut listener = TcpListener::bind ("127.0.0.1:4020").await?; let mut listener = TcpListener::bind ("127.0.0.1:4020").await?;
let (stream, _addr) = listener.accept ().await?; let (stream, _addr) = listener.accept ().await?;
stream stream
@ -39,12 +45,47 @@ async fn main () -> anyhow::Result <()> {
panic! ("Must use server or client subcommand."); panic! ("Must use server or client subcommand.");
}; };
let (tcp_upstream, mut writer) = tcp_stream.into_split ();
let (upstream_tx, upstream_rx) = mpsc::channel (1);
spawn (async move {
handle_upstream (tcp_upstream, upstream_tx).await
});
let upstream = client.post ("http://127.0.0.1:4003/").body (reqwest::Body::wrap_stream (upstream_rx));
spawn (async move {
upstream.send ().await
});
let resp = client.get ("http://127.0.0.1:4003/").send ().await?; let resp = client.get ("http://127.0.0.1:4003/").send ().await?;
let mut downstream = resp.bytes_stream (); let mut downstream = resp.bytes_stream ();
while let Some (Ok (item)) = downstream.next ().await { while let Some (Ok (item)) = downstream.next ().await {
println! ("Chunk: {:?}", item); println! ("Chunk: {:?}", item);
tcp_stream.write_all (&item).await?; writer.write_all (&item).await?;
}
Ok (())
}
async fn handle_upstream (
mut tcp_upstream: OwnedReadHalf,
mut upstream_tx: mpsc::Sender <Result <Vec <u8>, anyhow::Error>>
) -> anyhow::Result <()> {
loop {
let mut buffer = vec! [0u8; 65_536];
let bytes_read = tcp_upstream.read (&mut buffer).await?;
buffer.truncate (bytes_read);
println! ("Read {} bytes", bytes_read);
if bytes_read == 0 {
break;
}
upstream_tx.send (Ok (buffer)).await?;
} }
Ok (()) Ok (())

View File

@ -12,6 +12,7 @@ use hyper::{
}; };
use tokio::{ use tokio::{
spawn, spawn,
stream::StreamExt,
sync::mpsc, sync::mpsc,
time::interval, time::interval,
}; };
@ -78,6 +79,9 @@ impl HttpService {
if req.method () == Method::GET { if req.method () == Method::GET {
return Self::handle_gets (req, &*state).await; return Self::handle_gets (req, &*state).await;
} }
if req.method () == Method::POST {
return Self::handle_posts (req, &*state).await;
}
Ok::<_, anyhow::Error> (Response::builder () Ok::<_, anyhow::Error> (Response::builder ()
.body (Body::from ("hello\n"))?) .body (Body::from ("hello\n"))?)
@ -99,6 +103,21 @@ impl HttpService {
.body (Body::wrap_stream (rx))?) .body (Body::wrap_stream (rx))?)
} }
async fn handle_posts (req: Request <Body>, state: &RelayState)
-> Result <Response <Body>, anyhow::Error>
{
let id = Ulid::new ().to_string ();
trace! ("Upstream {} started", id);
let mut body = req.into_body ();
while let Some (Ok (item)) = body.next ().await {
println! ("Chunk: {:?}", item);
}
trace! ("Upstream {} ended", id);
Ok::<_, anyhow::Error> (Response::builder ()
.body (Body::from ("hello\n"))?)
}
async fn handle_downstream (mut tx: mpsc::Sender <anyhow::Result <String>>) -> Result <(), anyhow::Error> { async fn handle_downstream (mut tx: mpsc::Sender <anyhow::Result <String>>) -> Result <(), anyhow::Error> {
let mut int = interval (Duration::from_secs (1)); let mut int = interval (Duration::from_secs (1));
let mut counter = 0u64; let mut counter = 0u64;