parent
							
								
									b3295d2542
								
							
						
					
					
						commit
						0fe8c64c4f
					
				| 
						 | 
					@ -1,25 +0,0 @@
 | 
				
			||||||
[package]
 | 
					 | 
				
			||||||
name = "ptth_forwarding"
 | 
					 | 
				
			||||||
version = "0.1.0"
 | 
					 | 
				
			||||||
authors = ["_"]
 | 
					 | 
				
			||||||
edition = "2018"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
[dependencies]
 | 
					 | 
				
			||||||
anyhow = "1.0.38"
 | 
					 | 
				
			||||||
base64 = "0.13.0"
 | 
					 | 
				
			||||||
clap = "2.33.3"
 | 
					 | 
				
			||||||
futures = "0.3.7"
 | 
					 | 
				
			||||||
futures-util = "0.3.8"
 | 
					 | 
				
			||||||
reqwest = { version = "0.11.1", features = ["stream"] }
 | 
					 | 
				
			||||||
rmp-serde = "0.15.4"
 | 
					 | 
				
			||||||
serde = {version = "1.0.124", features = ["derive"]}
 | 
					 | 
				
			||||||
serde_json = "1.0.64"
 | 
					 | 
				
			||||||
structopt = "0.3.21"
 | 
					 | 
				
			||||||
thiserror = "1.0.24"
 | 
					 | 
				
			||||||
tokio = { version = "1.2.0", features = [] }
 | 
					 | 
				
			||||||
tokio-stream = "0.1.3"
 | 
					 | 
				
			||||||
tracing = "0.1.25"
 | 
					 | 
				
			||||||
tracing-futures = "0.2.5"
 | 
					 | 
				
			||||||
tracing-subscriber = "0.2.16"
 | 
					 | 
				
			||||||
| 
						 | 
					@ -1,93 +0,0 @@
 | 
				
			||||||
use clap::{App, SubCommand};
 | 
					 | 
				
			||||||
use futures_util::StreamExt;
 | 
					 | 
				
			||||||
use reqwest::Client;
 | 
					 | 
				
			||||||
use tokio::{
 | 
					 | 
				
			||||||
	io::{
 | 
					 | 
				
			||||||
		AsyncReadExt,
 | 
					 | 
				
			||||||
		AsyncWriteExt,
 | 
					 | 
				
			||||||
	},
 | 
					 | 
				
			||||||
	net::{
 | 
					 | 
				
			||||||
		TcpStream,
 | 
					 | 
				
			||||||
		TcpListener,
 | 
					 | 
				
			||||||
		tcp::OwnedReadHalf,
 | 
					 | 
				
			||||||
	},
 | 
					 | 
				
			||||||
	spawn,
 | 
					 | 
				
			||||||
	sync::mpsc,
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
use tokio_stream::wrappers::ReceiverStream;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
#[tokio::main]
 | 
					 | 
				
			||||||
async fn main () -> anyhow::Result <()> {
 | 
					 | 
				
			||||||
	let matches = App::new ("ptth_forwarding")
 | 
					 | 
				
			||||||
		.author ("Trish")
 | 
					 | 
				
			||||||
		.about ("Terminator for PTTH port forwarding")
 | 
					 | 
				
			||||||
		.subcommand (
 | 
					 | 
				
			||||||
			SubCommand::with_name ("server")
 | 
					 | 
				
			||||||
			.about ("Run this on the host with the TCP server")
 | 
					 | 
				
			||||||
		)
 | 
					 | 
				
			||||||
		.subcommand (
 | 
					 | 
				
			||||||
			SubCommand::with_name ("client")
 | 
					 | 
				
			||||||
			.about ("Run this on the host with the TCP client")
 | 
					 | 
				
			||||||
		)
 | 
					 | 
				
			||||||
	.get_matches ();
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	let client = Client::builder ()
 | 
					 | 
				
			||||||
	.build ()?;
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	let tcp_stream = if let Some (_matches) = matches.subcommand_matches ("server") { 
 | 
					 | 
				
			||||||
		TcpStream::connect ("127.0.0.1:4010").await?
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	else if let Some (_matches) = matches.subcommand_matches ("client") {
 | 
					 | 
				
			||||||
		let listener = TcpListener::bind ("127.0.0.1:4020").await?;
 | 
					 | 
				
			||||||
		let (stream, _addr) = listener.accept ().await?;
 | 
					 | 
				
			||||||
		stream
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	else {
 | 
					 | 
				
			||||||
		panic! ("Must use server or client subcommand.");
 | 
					 | 
				
			||||||
	};
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	let (tcp_upstream, mut writer) = tcp_stream.into_split ();
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	let (upstream_tx, upstream_rx) = mpsc::channel (1);
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	spawn (async move {
 | 
					 | 
				
			||||||
		handle_upstream (tcp_upstream, upstream_tx).await
 | 
					 | 
				
			||||||
	});
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	let upstream = client.post ("http://127.0.0.1:4003/").body (reqwest::Body::wrap_stream (ReceiverStream::new (upstream_rx)));
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	spawn (async move {
 | 
					 | 
				
			||||||
		upstream.send ().await
 | 
					 | 
				
			||||||
	});
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	let resp = client.get ("http://127.0.0.1:4003/").send ().await?;
 | 
					 | 
				
			||||||
	let mut downstream = resp.bytes_stream ();
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	while let Some (Ok (item)) = downstream.next ().await {
 | 
					 | 
				
			||||||
		println! ("Chunk: {:?}", item);
 | 
					 | 
				
			||||||
		writer.write_all (&item).await?;
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	Ok (())
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
async fn handle_upstream (
 | 
					 | 
				
			||||||
	mut tcp_upstream: OwnedReadHalf,
 | 
					 | 
				
			||||||
	upstream_tx: mpsc::Sender <Result <Vec <u8>, anyhow::Error>>
 | 
					 | 
				
			||||||
) -> anyhow::Result <()> {
 | 
					 | 
				
			||||||
	loop {
 | 
					 | 
				
			||||||
		let mut buffer = vec! [0u8; 65_536];
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		let bytes_read = tcp_upstream.read (&mut buffer).await?;
 | 
					 | 
				
			||||||
		buffer.truncate (bytes_read);
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		println! ("Read {} bytes", bytes_read);
 | 
					 | 
				
			||||||
		if bytes_read == 0 {
 | 
					 | 
				
			||||||
			break;
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		upstream_tx.send (Ok (buffer)).await?;
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	Ok (())
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
| 
						 | 
					@ -1,20 +0,0 @@
 | 
				
			||||||
[package]
 | 
					 | 
				
			||||||
name = "ptth_forwarding_relay"
 | 
					 | 
				
			||||||
version = "0.1.0"
 | 
					 | 
				
			||||||
authors = ["_"]
 | 
					 | 
				
			||||||
edition = "2018"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
[dependencies]
 | 
					 | 
				
			||||||
anyhow = "1.0.38"
 | 
					 | 
				
			||||||
futures = "0.3.7"
 | 
					 | 
				
			||||||
futures-util = "0.3.8"
 | 
					 | 
				
			||||||
hyper = "0.14.4"
 | 
					 | 
				
			||||||
rusty_ulid = "0.10.1"
 | 
					 | 
				
			||||||
thiserror = "1.0.24"
 | 
					 | 
				
			||||||
tokio = { version = "1.2.0", features = [] }
 | 
					 | 
				
			||||||
tokio-stream = "0.1.3"
 | 
					 | 
				
			||||||
tracing = "0.1.25"
 | 
					 | 
				
			||||||
tracing-futures = "0.2.5"
 | 
					 | 
				
			||||||
tracing-subscriber = "0.2.16"
 | 
					 | 
				
			||||||
| 
						 | 
					@ -1,208 +0,0 @@
 | 
				
			||||||
use std::{
 | 
					 | 
				
			||||||
	collections::HashMap,
 | 
					 | 
				
			||||||
	sync::Arc,
 | 
					 | 
				
			||||||
	time::Duration,
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
use futures_util::StreamExt;
 | 
					 | 
				
			||||||
use hyper::{
 | 
					 | 
				
			||||||
	Body,
 | 
					 | 
				
			||||||
	Method,
 | 
					 | 
				
			||||||
	Request,
 | 
					 | 
				
			||||||
	Response,
 | 
					 | 
				
			||||||
	StatusCode,
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
use tokio::{
 | 
					 | 
				
			||||||
	spawn,
 | 
					 | 
				
			||||||
	sync::{
 | 
					 | 
				
			||||||
		RwLock,
 | 
					 | 
				
			||||||
		mpsc,
 | 
					 | 
				
			||||||
	},
 | 
					 | 
				
			||||||
	time::interval,
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
use tokio_stream::wrappers::ReceiverStream;
 | 
					 | 
				
			||||||
use tracing::{
 | 
					 | 
				
			||||||
	info, trace,
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
use tracing_subscriber::{
 | 
					 | 
				
			||||||
	fmt, 
 | 
					 | 
				
			||||||
	fmt::format::FmtSpan,
 | 
					 | 
				
			||||||
	EnvFilter,
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
#[derive (Default)]
 | 
					 | 
				
			||||||
struct RelayState {
 | 
					 | 
				
			||||||
	connections: HashMap <String, ConnectionState>,
 | 
					 | 
				
			||||||
	client_opaques: HashMap <String, String>,
 | 
					 | 
				
			||||||
	server_opaques: HashMap <String, String>,
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
/*
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
HTTP has 2 good pause points:
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
- Client has uploaded request body, server has said nothing
 | 
					 | 
				
			||||||
- Server has sent status code + response headers
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
Because we want to stream everything, there is no point in a single HTTP
 | 
					 | 
				
			||||||
req-resp pair
 | 
					 | 
				
			||||||
having both a streaming request body and a streaming response body.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
To move the state machine, the first request from client and server must not
 | 
					 | 
				
			||||||
be streaming.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
With all that in mind, the r
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
*/
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
enum ConnectionState {
 | 
					 | 
				
			||||||
	// We got 1 connection from the client. We need a 2nd to form the upstream.
 | 
					 | 
				
			||||||
	WaitForUpstream (String, String),
 | 
					 | 
				
			||||||
	// We got 2 connections from the client. We need the server to accept
 | 
					 | 
				
			||||||
	// by sending its downstream.
 | 
					 | 
				
			||||||
	WaitForAccept (String, String, String),
 | 
					 | 
				
			||||||
	Connected (String, String, String, String),
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// An established connection has 4 individual HTTP streams
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
struct EstablishedConnection {
 | 
					 | 
				
			||||||
	// Request body of 'upstream' call
 | 
					 | 
				
			||||||
	client_up: String,
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	// Response body of 'connect' call
 | 
					 | 
				
			||||||
	client_down: String,
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	// Response body of 'listen' call
 | 
					 | 
				
			||||||
	server_up: String,
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	// Request body of 'accept' call
 | 
					 | 
				
			||||||
	server_down: String,
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
#[derive (Default)]
 | 
					 | 
				
			||||||
pub struct HttpService {
 | 
					 | 
				
			||||||
	state: Arc <RelayState>
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
impl HttpService {
 | 
					 | 
				
			||||||
	pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> {
 | 
					 | 
				
			||||||
		use std::net::SocketAddr;
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		use hyper::{
 | 
					 | 
				
			||||||
			server::Server,
 | 
					 | 
				
			||||||
			service::{
 | 
					 | 
				
			||||||
				make_service_fn,
 | 
					 | 
				
			||||||
				service_fn,
 | 
					 | 
				
			||||||
			},
 | 
					 | 
				
			||||||
		};
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		let make_svc = make_service_fn (|_conn| {
 | 
					 | 
				
			||||||
			let state = self.state.clone ();
 | 
					 | 
				
			||||||
			
 | 
					 | 
				
			||||||
			async {
 | 
					 | 
				
			||||||
				Ok::<_, String> (service_fn (move |req| {
 | 
					 | 
				
			||||||
					let state = state.clone ();
 | 
					 | 
				
			||||||
					
 | 
					 | 
				
			||||||
					Self::handle_all (req, state)
 | 
					 | 
				
			||||||
				}))
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		});
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		let addr = SocketAddr::from(([127, 0, 0, 1], port));
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		let server = Server::bind (&addr)
 | 
					 | 
				
			||||||
		.serve (make_svc)
 | 
					 | 
				
			||||||
		;
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		server.await
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	async fn handle_all (req: Request <Body>, state: Arc <RelayState>) 
 | 
					 | 
				
			||||||
	-> Result <Response <Body>, anyhow::Error>
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		if req.method () == Method::GET {
 | 
					 | 
				
			||||||
			return Self::handle_gets (req, &*state).await;
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if req.method () == Method::POST {
 | 
					 | 
				
			||||||
			return Self::handle_posts (req, &*state).await;
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		Ok::<_, anyhow::Error> (Response::builder ()
 | 
					 | 
				
			||||||
		.body (Body::from ("hello\n"))?)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	async fn handle_gets (req: Request <Body>, state: &RelayState)
 | 
					 | 
				
			||||||
	-> Result <Response <Body>, anyhow::Error>
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		let (tx, rx) = mpsc::channel (1);
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		spawn (async move {
 | 
					 | 
				
			||||||
			let id = rusty_ulid::generate_ulid_string ();
 | 
					 | 
				
			||||||
			trace! ("Downstream {} started", id);
 | 
					 | 
				
			||||||
			Self::handle_downstream (tx).await.ok ();
 | 
					 | 
				
			||||||
			trace! ("Downstream {} ended", id);
 | 
					 | 
				
			||||||
		});
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		Ok::<_, anyhow::Error> (Response::builder ()
 | 
					 | 
				
			||||||
		.body (Body::wrap_stream (ReceiverStream::new (rx)))?)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	async fn handle_posts (req: Request <Body>, state: &RelayState)
 | 
					 | 
				
			||||||
	-> Result <Response <Body>, anyhow::Error>
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		let id = rusty_ulid::generate_ulid_string ();
 | 
					 | 
				
			||||||
		trace! ("Upstream {} started", id);
 | 
					 | 
				
			||||||
		let mut body = req.into_body ();
 | 
					 | 
				
			||||||
		while let Some (Ok (item)) = body.next ().await {
 | 
					 | 
				
			||||||
			println! ("Chunk: {:?}", item);
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		trace! ("Upstream {} ended", id);
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		Ok::<_, anyhow::Error> (Response::builder ()
 | 
					 | 
				
			||||||
		.body (Body::from ("hello\n"))?)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	async fn handle_downstream (tx: mpsc::Sender <anyhow::Result <String>>) -> Result <(), anyhow::Error> {
 | 
					 | 
				
			||||||
		let mut int = interval (Duration::from_secs (1));
 | 
					 | 
				
			||||||
		let mut counter = 0u64;
 | 
					 | 
				
			||||||
		
 | 
					 | 
				
			||||||
		loop {
 | 
					 | 
				
			||||||
			int.tick ().await;
 | 
					 | 
				
			||||||
			
 | 
					 | 
				
			||||||
			tx.send (Ok::<_, anyhow::Error> (format! ("Counter: {}\n", counter))).await?;
 | 
					 | 
				
			||||||
			counter += 1;
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
#[tokio::main]
 | 
					 | 
				
			||||||
async fn main () -> Result <(), anyhow::Error> {
 | 
					 | 
				
			||||||
	use std::time::Duration;
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	use tokio::{
 | 
					 | 
				
			||||||
		spawn,
 | 
					 | 
				
			||||||
		time::interval,
 | 
					 | 
				
			||||||
	};
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	fmt ()
 | 
					 | 
				
			||||||
		.with_env_filter (EnvFilter::from_default_env ())
 | 
					 | 
				
			||||||
		.with_span_events (FmtSpan::CLOSE)
 | 
					 | 
				
			||||||
		.init ()
 | 
					 | 
				
			||||||
	;
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	let service = HttpService::default ();
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	info! ("Starting relay");
 | 
					 | 
				
			||||||
	Ok (service.serve (4003).await?)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
#[cfg (test)]
 | 
					 | 
				
			||||||
mod tests {
 | 
					 | 
				
			||||||
	use super::*;
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	#[test]
 | 
					 | 
				
			||||||
	fn state_machine () {
 | 
					 | 
				
			||||||
		// assert! (false);
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
		Loading…
	
		Reference in New Issue