ptth/crates/ptth_forwarding/src/main.rs

93 lines
2.1 KiB
Rust
Raw Normal View History

use clap::{App, SubCommand};
use reqwest::Client;
use tokio::{
2021-01-12 01:20:36 +00:00
io::{
AsyncReadExt,
AsyncWriteExt,
},
net::{
TcpStream,
TcpListener,
2021-01-12 01:20:36 +00:00
tcp::OwnedReadHalf,
},
2021-01-12 01:20:36 +00:00
spawn,
stream::StreamExt,
2021-01-12 01:20:36 +00:00
sync::mpsc,
};
#[tokio::main]
async fn main () -> anyhow::Result <()> {
let matches = App::new ("ptth_forwarding")
.author ("Trish")
.about ("Terminator for PTTH port forwarding")
.subcommand (
SubCommand::with_name ("server")
.about ("Run this on the host with the TCP server")
)
.subcommand (
SubCommand::with_name ("client")
.about ("Run this on the host with the TCP client")
)
.get_matches ();
let client = Client::builder ()
.build ()?;
2021-01-12 01:20:36 +00:00
let tcp_stream = if let Some (_matches) = matches.subcommand_matches ("server") {
TcpStream::connect ("127.0.0.1:4010").await?
}
2021-01-12 01:20:36 +00:00
else if let Some (_matches) = matches.subcommand_matches ("client") {
let mut listener = TcpListener::bind ("127.0.0.1:4020").await?;
let (stream, _addr) = listener.accept ().await?;
stream
}
else {
panic! ("Must use server or client subcommand.");
};
2021-01-12 01:20:36 +00:00
let (tcp_upstream, mut writer) = tcp_stream.into_split ();
let (upstream_tx, upstream_rx) = mpsc::channel (1);
spawn (async move {
handle_upstream (tcp_upstream, upstream_tx).await
});
let upstream = client.post ("http://127.0.0.1:4003/").body (reqwest::Body::wrap_stream (upstream_rx));
spawn (async move {
upstream.send ().await
});
let resp = client.get ("http://127.0.0.1:4003/").send ().await?;
let mut downstream = resp.bytes_stream ();
while let Some (Ok (item)) = downstream.next ().await {
println! ("Chunk: {:?}", item);
2021-01-12 01:20:36 +00:00
writer.write_all (&item).await?;
}
Ok (())
}
async fn handle_upstream (
mut tcp_upstream: OwnedReadHalf,
mut upstream_tx: mpsc::Sender <Result <Vec <u8>, anyhow::Error>>
) -> anyhow::Result <()> {
loop {
let mut buffer = vec! [0u8; 65_536];
let bytes_read = tcp_upstream.read (&mut buffer).await?;
buffer.truncate (bytes_read);
println! ("Read {} bytes", bytes_read);
if bytes_read == 0 {
break;
}
upstream_tx.send (Ok (buffer)).await?;
}
Ok (())
}