Compare commits
	
		
			No commits in common. "dfc6885b8c4d097b6fa4306441dbda6e6d204f70" and "2a909307230475597ffdf9cff3d4e670572a83db" have entirely different histories. 
		
	
	
		
			dfc6885b8c
			...
			2a90930723
		
	
		|  | @ -9,10 +9,6 @@ | |||
| /scope/untracked | ||||
| /scraper-secret.txt | ||||
| /target | ||||
| /untracked | ||||
| 
 | ||||
| # TLS certs used for QUIC experiments | ||||
| *.crt | ||||
| 
 | ||||
| # Kate editor temp file | ||||
| *.kate-swp | ||||
|  |  | |||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							|  | @ -1,8 +1,8 @@ | |||
| # https://whitfin.io/speeding-up-rust-docker-builds/ | ||||
| # TODO: https://stackoverflow.com/questions/57389547/how-to-define-the-context-for-a-docker-build-as-a-specific-commit-on-one-of-the | ||||
| 
 | ||||
| # docker pull rust:1.66-slim-buster | ||||
| FROM rust@sha256:98c9b1fca0c9a6183369daf9efadb57c634340ae877bb027aeadf72afdd086a3 as build | ||||
| # rust:1.60-slim-buster | ||||
| FROM rust@sha256:c0f26a0b299a8a74cd87be0b4bd291d55aa292198bab1bafd906edd8665edb82 as build | ||||
| 
 | ||||
| WORKDIR / | ||||
| ENV USER root | ||||
|  | @ -20,8 +20,7 @@ cargo new --bin crates/ptth_server && \ | |||
| cargo new --bin crates/ptth_file_server_bin && \ | ||||
| cargo new --bin tools/ptth_tail && \ | ||||
| cargo new --bin crates/debug_proxy && \ | ||||
| cargo new --bin crates/ptth_quic && \ | ||||
| cargo new --lib crates/udp_over_tcp | ||||
| cargo new --bin crates/ptth_quic | ||||
| 
 | ||||
| # copy over your manifests | ||||
| COPY ./Cargo.lock                               ./ | ||||
|  | @ -30,7 +29,6 @@ COPY ./crates/always_equal/Cargo.toml           ./crates/always_equal/ | |||
| COPY ./crates/ptth_core/Cargo.toml              ./crates/ptth_core/ | ||||
| COPY ./crates/ptth_relay/Cargo.toml             ./crates/ptth_relay/ | ||||
| COPY ./crates/ptth_quic/Cargo.toml              ./crates/ptth_quic/ | ||||
| COPY ./crates/udp_over_tcp/Cargo.toml           ./crates/udp_over_tcp/ | ||||
| 
 | ||||
| # this build step will cache your dependencies | ||||
| RUN cargo build --release -p ptth_relay | ||||
|  |  | |||
|  | @ -10,28 +10,23 @@ repository = "https://six-five-six-four.com/git/reactor/ptth" | |||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||||
| 
 | ||||
| [dependencies] | ||||
| anyhow = "1.0.66" | ||||
| arc-swap = "1.5.1" | ||||
| base64 = "0.20.0" | ||||
| ctrlc = "3.2.4" | ||||
| futures-util = "0.3.25" | ||||
| hyper = { version = "0.14.23", features = ["http1", "server", "stream", "tcp"] } | ||||
| quinn = "0.9.3" | ||||
| rand = "0.8.5" | ||||
| rcgen = "0.10.0" | ||||
| ring = "0.16.20" | ||||
| rmp-serde = "1.1.1" | ||||
| rustls = "0.20.7" | ||||
| rusty_ulid = "1.0.0" | ||||
| serde = "1.0.151" | ||||
| serde_json = "1.0.89" | ||||
| structopt = "0.3.26" | ||||
| tokio = { version = "1.23.0", features = ["full"] } | ||||
| tracing-subscriber = "0.3.16" | ||||
| tracing = "0.1.37" | ||||
| udp_over_tcp = { path = "../udp_over_tcp" } | ||||
| anyhow = "1.0.38" | ||||
| base64 = "0.13.0" | ||||
| ctrlc = "3.2.1" | ||||
| # fltk = "1.1.1" | ||||
| futures-util = "0.3.9" | ||||
| hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] } | ||||
| quinn = "0.8.5" | ||||
| rand = "0.8.4" | ||||
| rcgen = "0.8.11" | ||||
| rmp-serde = "0.15.5" | ||||
| rustls = "0.20.4" | ||||
| structopt = "0.3.20" | ||||
| tokio = { version = "1.8.1", features = ["full"] } | ||||
| tracing-subscriber = "0.2.16" | ||||
| tracing = "0.1.25" | ||||
| 
 | ||||
| [dependencies.reqwest] | ||||
| version = "0.11.13"  | ||||
| version = "0.11.10"  | ||||
| default-features = false | ||||
| features = ["stream", "rustls-tls", "hyper-rustls"]  | ||||
|  |  | |||
|  | @ -1,8 +1,8 @@ | |||
| # https://whitfin.io/speeding-up-rust-docker-builds/ | ||||
| # TODO: https://stackoverflow.com/questions/57389547/how-to-define-the-context-for-a-docker-build-as-a-specific-commit-on-one-of-the | ||||
| 
 | ||||
| # docker pull rust:1.66-slim-buster | ||||
| FROM rust@sha256:98c9b1fca0c9a6183369daf9efadb57c634340ae877bb027aeadf72afdd086a3 as build | ||||
| # rust:1.64-slim-buster | ||||
| FROM rust@sha256:7da4fbd2dc7176746e8e5c371aeb0bbe742598c4906fa48cb2d87a4b89d50357 as build | ||||
| 
 | ||||
| WORKDIR / | ||||
| ENV USER root | ||||
|  | @ -20,8 +20,7 @@ cargo new --bin crates/ptth_server && \ | |||
| cargo new --bin crates/ptth_file_server_bin && \ | ||||
| cargo new --bin tools/ptth_tail && \ | ||||
| cargo new --bin crates/debug_proxy && \ | ||||
| cargo new --bin crates/ptth_quic && \ | ||||
| cargo new --lib crates/udp_over_tcp | ||||
| cargo new --bin crates/ptth_quic | ||||
| 
 | ||||
| # copy over your manifests | ||||
| COPY ./Cargo.lock                               ./ | ||||
|  | @ -30,7 +29,6 @@ COPY ./crates/always_equal/Cargo.toml           ./crates/always_equal/ | |||
| COPY ./crates/ptth_core/Cargo.toml              ./crates/ptth_core/ | ||||
| COPY ./crates/ptth_relay/Cargo.toml             ./crates/ptth_relay/ | ||||
| COPY ./crates/ptth_quic/Cargo.toml              ./crates/ptth_quic/ | ||||
| COPY ./crates/udp_over_tcp/Cargo.toml           ./crates/udp_over_tcp/ | ||||
| 
 | ||||
| # this build step will cache your dependencies | ||||
| RUN cargo build --release -p ptth_quic | ||||
|  | @ -41,8 +39,7 @@ src/*.rs \ | |||
| crates/always_equal/src/*.rs \ | ||||
| crates/ptth_core/src/*.rs \ | ||||
| crates/ptth_relay/src/*.rs \ | ||||
| crates/ptth_quic/src/*.rs \ | ||||
| crates/udp_over_tcp/src/*.rs | ||||
| crates/ptth_quic/src/*.rs | ||||
| 
 | ||||
| # Copy source tree | ||||
| # Yes, I tried a few variations on the syntax. Dockerfiles are just rough. | ||||
|  | @ -53,7 +50,6 @@ COPY ./crates/ptth_core     ./crates/ptth_core | |||
| COPY ./crates/ptth_relay    ./crates/ptth_relay | ||||
| COPY ./handlebars/          ./handlebars | ||||
| COPY ./crates/ptth_quic     ./crates/ptth_quic | ||||
| COPY ./crates/udp_over_tcp  ./crates/udp_over_tcp | ||||
| 
 | ||||
| # Bug in cargo's incremental build logic, triggered by | ||||
| # Docker doing something funny with mtimes? Maybe? | ||||
|  | @ -62,7 +58,6 @@ RUN touch crates/ptth_core/src/lib.rs | |||
| # build for release | ||||
| # gate only on ptth_relay tests for now | ||||
| RUN \ | ||||
| find . && \ | ||||
| cargo build --release -p ptth_quic --bin ptth_quic_relay_server && \ | ||||
| cargo test --release -p ptth_quic --bin ptth_quic_relay_server | ||||
| 
 | ||||
|  |  | |||
|  | @ -61,7 +61,10 @@ impl P2Client { | |||
| 		
 | ||||
| 		let conf = Arc::clone (&self.conf); | ||||
| 		
 | ||||
| 		let connection = protocol::p2_connect_to_p3 (&self.endpoint, conf.relay_addr, &conf.client_id).await?; | ||||
| 		let quinn::NewConnection { | ||||
| 			connection, | ||||
| 			.. | ||||
| 		} = protocol::p2_connect_to_p3 (&self.endpoint, conf.relay_addr, &conf.client_id).await?; | ||||
| 		
 | ||||
| 		let client_tcp_port = conf.client_tcp_port; | ||||
| 		
 | ||||
|  |  | |||
|  | @ -1,13 +1,16 @@ | |||
| use std::{ | ||||
| 	iter::FromIterator, | ||||
| }; | ||||
| 
 | ||||
| use tokio::sync::watch; | ||||
| 
 | ||||
| use ptth_quic::prelude::*; | ||||
| use ptth_quic::executable_end_server as server; | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main () -> anyhow::Result <()> { | ||||
| 	tracing_subscriber::fmt::init (); | ||||
| 	
 | ||||
| 	let args: Vec <_> = std::env::args_os ().collect (); | ||||
| 	let args = Vec::from_iter (std::env::args_os ()); | ||||
| 	
 | ||||
| 	let (shutdown_tx, shutdown_rx) = watch::channel (false); | ||||
| 	
 | ||||
|  | @ -16,5 +19,5 @@ async fn main () -> anyhow::Result <()> { | |||
| 	})?; | ||||
| 	trace! ("Set Ctrl+C handler"); | ||||
| 	
 | ||||
| 	server::main (&args, Some (shutdown_rx)).await | ||||
| 	ptth_quic::executable_end_server::main (&args, Some (shutdown_rx)).await | ||||
| } | ||||
|  |  | |||
|  | @ -1,7 +1,6 @@ | |||
| use tokio::sync::watch; | ||||
| 
 | ||||
| use ptth_quic::prelude::*; | ||||
| use ptth_quic::executable_relay_server as relay; | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main () -> anyhow::Result <()> { | ||||
|  | @ -9,7 +8,7 @@ async fn main () -> anyhow::Result <()> { | |||
| 	
 | ||||
| 	tracing_subscriber::fmt::init (); | ||||
| 	
 | ||||
| 	let opt = relay::Opt::from_args (); | ||||
| 	let opt = ptth_quic::executable_relay_server::Opt::from_args (); | ||||
| 	
 | ||||
| 	let (running_tx, mut running_rx) = watch::channel (true); | ||||
| 	
 | ||||
|  | @ -18,15 +17,8 @@ async fn main () -> anyhow::Result <()> { | |||
| 	})?; | ||||
| 	trace! ("Set Ctrl+C handler"); | ||||
| 	
 | ||||
| 	let app = relay::App::new (opt).await?; | ||||
| 	println! ("Base64 cert: {}", base64::encode (app.server_cert ())); | ||||
| 	println! ("Listening on {}", app.listen_addr ()); | ||||
| 	
 | ||||
| 	tokio::fs::create_dir_all ("ptth_quic_output").await?; | ||||
| 	tokio::fs::write ("ptth_quic_output/quic_server.crt", app.server_cert ()).await?; | ||||
| 	
 | ||||
| 	tokio::select! { | ||||
| 		val = app.run () => { | ||||
| 		val = ptth_quic::executable_relay_server::main (opt) => { | ||||
| 			
 | ||||
| 		}, | ||||
| 		val = running_rx.changed () => { | ||||
|  |  | |||
|  | @ -1,32 +0,0 @@ | |||
| 
 | ||||
| #[cfg (test)] | ||||
| mod test { | ||||
| 	#[test] | ||||
| 	fn signing () -> anyhow::Result <()> { | ||||
| 		use std::fs; | ||||
| 		use ring::{ | ||||
| 			signature::{ | ||||
| 				self, | ||||
| 				Ed25519KeyPair, | ||||
| 				KeyPair, | ||||
| 			}, | ||||
| 		}; | ||||
| 		
 | ||||
| 		fs::create_dir_all ("untracked")?; | ||||
| 		
 | ||||
| 		let rng = ring::rand::SystemRandom::new (); | ||||
| 		let pkcs8_bytes = Ed25519KeyPair::generate_pkcs8 (&rng).map_err (|_| anyhow::anyhow! ("generate_pkcs8"))?; | ||||
| 		
 | ||||
| 		let key_pair = Ed25519KeyPair::from_pkcs8 (pkcs8_bytes.as_ref ()).map_err (|_| anyhow::anyhow! ("from_pkcs8"))?; | ||||
| 		
 | ||||
| 		const MESSAGE: &[u8] = b":V"; | ||||
| 		let sig = key_pair.sign (MESSAGE); | ||||
| 		
 | ||||
| 		let peer_public_key_bytes = key_pair.public_key ().as_ref (); | ||||
| 		let peer_public_key = signature::UnparsedPublicKey::new (&signature::ED25519, peer_public_key_bytes); | ||||
| 		
 | ||||
| 		peer_public_key.verify (MESSAGE, sig.as_ref ()).map_err (|_| anyhow::anyhow! ("verify"))?; | ||||
| 		
 | ||||
| 		Ok (()) | ||||
| 	} | ||||
| } | ||||
|  | @ -10,7 +10,7 @@ use protocol::PeerId; | |||
| /// A partially-filled-out config that structopt can deal with
 | ||||
| /// Try to turn this into a Config as soon as possible.
 | ||||
| #[derive (Debug, StructOpt)] | ||||
| pub struct Opt { | ||||
| struct Opt { | ||||
| 	#[structopt (long)] | ||||
| 	relay_addr: Option <String>, | ||||
| 	#[structopt (long)] | ||||
|  | @ -19,18 +19,6 @@ pub struct Opt { | |||
| 	debug_echo: bool, | ||||
| 	#[structopt (long)] | ||||
| 	cert_url: Option <String>, | ||||
| 	#[structopt (long)] | ||||
| 	use_udp_over_tcp: Option <bool>, | ||||
| } | ||||
| 
 | ||||
| /// A filled-out config for constructing an end server
 | ||||
| #[derive (Clone)] | ||||
| pub (crate) struct Config { | ||||
| 	pub debug_echo: bool, | ||||
| 	pub id: String, | ||||
| 	pub relay_addr: SocketAddr, | ||||
| 	pub relay_cert: Vec <u8>, | ||||
| 	pub use_udp_over_tcp: bool, | ||||
| } | ||||
| 
 | ||||
| pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool>>) -> anyhow::Result <()> { | ||||
|  | @ -38,9 +26,10 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool | |||
| 	let opt = Opt::from_iter (args); | ||||
| 	let conf = opt.into_config ().await?; | ||||
| 	
 | ||||
| 	let (end_server, shutdown_tx) = P4EndServer::connect (conf).await?; | ||||
| 	let end_server = Arc::new (P4EndServer::connect (conf)?); | ||||
| 	
 | ||||
| 	let run_task = { | ||||
| 		let end_server = Arc::clone (&end_server); | ||||
| 		tokio::spawn (async move { | ||||
| 			end_server.run ().await?; | ||||
| 			Ok::<_, anyhow::Error> (()) | ||||
|  | @ -51,8 +40,7 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool | |||
| 		while ! *shutdown_rx.borrow () { | ||||
| 			shutdown_rx.changed ().await?; | ||||
| 		} | ||||
| 		trace! ("P4 end server shutting down..."); | ||||
| 		shutdown_tx.send (true)? | ||||
| 		end_server.shut_down ()?; | ||||
| 	} | ||||
| 	
 | ||||
| 	run_task.await??; | ||||
|  | @ -62,12 +50,21 @@ pub async fn main (args: &[OsString], shutdown_rx: Option <watch::Receiver <bool | |||
| 	Ok (()) | ||||
| } | ||||
| 
 | ||||
| /// A filled-out config for constructing an end server
 | ||||
| #[derive (Clone)] | ||||
| pub struct Config { | ||||
| 	pub debug_echo: bool, | ||||
| 	pub id: String, | ||||
| 	pub relay_addr: SocketAddr, | ||||
| 	pub relay_cert: Vec <u8>, | ||||
| } | ||||
| 
 | ||||
| impl Opt { | ||||
| 	/// Converts self into a Config that the server can use.
 | ||||
| 	/// Performs I/O to load the relay cert from disk or from HTTP.
 | ||||
| 	/// Fails if arguments can't be parsed or if I/O fails.
 | ||||
| 	
 | ||||
| 	pub (crate) async fn into_config (self) -> anyhow::Result <Config> { | ||||
| 	pub async fn into_config (self) -> anyhow::Result <Config> { | ||||
| 		let id = self.server_id.clone ().unwrap_or_else (|| "bogus_server".to_string ()); | ||||
| 		
 | ||||
| 		let relay_addr: SocketAddr = self.relay_addr.clone ().unwrap_or_else (|| String::from ("127.0.0.1:30380")).parse ()?; | ||||
|  | @ -86,74 +83,53 @@ impl Opt { | |||
| 			id, | ||||
| 			relay_addr, | ||||
| 			relay_cert, | ||||
| 			use_udp_over_tcp: self.use_udp_over_tcp.unwrap_or (false), | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| pub struct P4EndServer { | ||||
| 	conf: Config, | ||||
| 	conn: quinn::Connection, | ||||
| 	endpoint: quinn::Endpoint, | ||||
| 	conf: Arc <Config>, | ||||
| 	shutdown_tx: watch::Sender <bool>, | ||||
| 	shutdown_rx: watch::Receiver <bool>, | ||||
| } | ||||
| 
 | ||||
| impl P4EndServer { | ||||
| 	pub (crate) async fn connect (conf: Config) -> anyhow::Result <(Self, watch::Sender <bool>)> { | ||||
| 		debug! ("P4 end server making its QUIC endpoint"); | ||||
| 	pub fn connect (conf: Config) -> anyhow::Result <Self> { | ||||
| 		trace! ("P4 end server making its QUIC endpoint"); | ||||
| 		let endpoint = make_client_endpoint ("0.0.0.0:0".parse ()?, &[&conf.relay_cert])?; | ||||
| 		
 | ||||
| 		let conf = if conf.use_udp_over_tcp { | ||||
| 			let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?; | ||||
| 			udp_sock.connect ((Ipv4Addr::LOCALHOST, endpoint.local_addr ()?.port ())).await?; | ||||
| 			
 | ||||
| 			let udp_local_server_port = udp_sock.local_addr ()?.port (); | ||||
| 			
 | ||||
| 			let tcp_sock = TcpSocket::new_v4 ()?; | ||||
| 			let tcp_conn = tcp_sock.connect (conf.relay_addr).await?; | ||||
| 			
 | ||||
| 			tokio::spawn (async move { | ||||
| 				udp_over_tcp::client::main_with_sockets (udp_sock, tcp_conn).await | ||||
| 			}); | ||||
| 			
 | ||||
| 			Config { | ||||
| 				debug_echo: conf.debug_echo, | ||||
| 				id: conf.id, | ||||
| 				relay_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, udp_local_server_port)), | ||||
| 				relay_cert: conf.relay_cert, | ||||
| 				use_udp_over_tcp: true, | ||||
| 			} | ||||
| 		} | ||||
| 		else { | ||||
| 			conf | ||||
| 		}; | ||||
| 		let (shutdown_tx, shutdown_rx) = watch::channel (false); | ||||
| 		
 | ||||
| 		debug! ("P4 end server connecting to P3 relay server"); | ||||
| 		let conn = protocol::p4_connect_to_p3 ( | ||||
| 			&endpoint, 
 | ||||
| 			conf.relay_addr, 
 | ||||
| 			&conf.id | ||||
| 		Ok (P4EndServer { | ||||
| 			conf: Arc::new (conf), | ||||
| 			endpoint, | ||||
| 			shutdown_tx, | ||||
| 			shutdown_rx, | ||||
| 		}) | ||||
| 	} | ||||
| 	
 | ||||
| 	pub fn config (&self) -> &Config { | ||||
| 		&*self.conf | ||||
| 	} | ||||
| 	
 | ||||
| 	pub async fn run (&self) -> anyhow::Result <()> { | ||||
| 		trace! ("P4 end server connecting to P3 relay server"); | ||||
| 		let quinn::NewConnection { | ||||
| 			mut bi_streams, | ||||
| 			.. | ||||
| 		} = protocol::p4_connect_to_p3 ( | ||||
| 			&self.endpoint, 
 | ||||
| 			self.conf.relay_addr, 
 | ||||
| 			&self.conf.id | ||||
| 		).await?; | ||||
| 		
 | ||||
| 		debug! ("Connected to relay server"); | ||||
| 		
 | ||||
| 		let (shutdown_tx, shutdown_rx) = watch::channel (false); | ||||
| 		
 | ||||
| 		Ok ((P4EndServer { | ||||
| 			conf, | ||||
| 			conn, | ||||
| 			endpoint, | ||||
| 			shutdown_rx, | ||||
| 		}, shutdown_tx)) | ||||
| 	} | ||||
| 	
 | ||||
| 	pub (crate) async fn run (self) -> anyhow::Result <()> { | ||||
| 		trace! ("Accepting bi streams from P3"); | ||||
| 		
 | ||||
| 		let mut shutdown_rx = self.shutdown_rx.clone (); | ||||
| 		
 | ||||
| 		let conf = Arc::new (self.conf); | ||||
| 		
 | ||||
| 		loop { | ||||
| 			tokio::select! { | ||||
| 				_ = shutdown_rx.changed () => { | ||||
|  | @ -162,10 +138,10 @@ impl P4EndServer { | |||
| 						break; | ||||
| 					} | ||||
| 				} | ||||
| 				stream_opt = self.conn.accept_bi () => { | ||||
| 					let (relay_send, relay_recv) = stream_opt?; | ||||
| 				stream_opt = bi_streams.next () => { | ||||
| 					let (relay_send, relay_recv) = stream_opt.ok_or_else (|| anyhow::anyhow! ("P4 ran out of incoming streams. Maybe P3 shut down or disconnected?"))??; | ||||
| 					
 | ||||
| 					tokio::spawn (handle_bi_stream (Arc::clone (&conf), relay_send, relay_recv)); | ||||
| 					tokio::spawn (handle_bi_stream (Arc::clone (&self.conf), relay_send, relay_recv)); | ||||
| 				} | ||||
| 			}; | ||||
| 		} | ||||
|  | @ -173,6 +149,11 @@ impl P4EndServer { | |||
| 		Ok (()) | ||||
| 	} | ||||
| 	
 | ||||
| 	pub fn shut_down (&self) -> anyhow::Result <()> { | ||||
| 		trace! ("P4 end server shutting down..."); | ||||
| 		Ok (self.shutdown_tx.send (true)?) | ||||
| 	} | ||||
| 	
 | ||||
| 	pub fn shutting_down (&self) -> bool { | ||||
| 		*self.shutdown_rx.borrow () | ||||
| 	} | ||||
|  |  | |||
|  | @ -20,218 +20,142 @@ use protocol::PeerId; | |||
| #[derive (Debug, StructOpt)] | ||||
| pub struct Opt { | ||||
| 	#[structopt (long)] | ||||
| 	pub (crate) listen_addr: Option <String>, | ||||
| 	#[structopt (long)] | ||||
| 	pub (crate) tcp_listen_port: Option <u16>, | ||||
| 	listen_addr: Option <String>, | ||||
| } | ||||
| 
 | ||||
| pub struct App { | ||||
| 	endpoint: quinn::Endpoint, | ||||
| 	listen_addr: SocketAddr, | ||||
| 	pub (crate) metrics: Arc <RwLock <Metrics>>, | ||||
| 	server_cert: Vec <u8>, | ||||
| 	tcp_listener: Option <udp_over_tcp::server::Listener>, | ||||
| } | ||||
| 
 | ||||
| #[derive (Default)] | ||||
| pub (crate) struct Metrics { | ||||
| 	pub (crate) connected_end_servers: usize, | ||||
| } | ||||
| 
 | ||||
| impl App { | ||||
| 	pub async fn new (opt: Opt) -> anyhow::Result <Self> { | ||||
| 		let config = load_config ().await.ok (); | ||||
| 		
 | ||||
| 		let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; | ||||
| 		let (endpoint, server_cert) = make_server_endpoint (listen_addr)?; | ||||
| 		
 | ||||
| 		let listen_addr = endpoint.local_addr ()?; | ||||
| 		let tcp_port = opt.tcp_listen_port.or (config.map (|cfg| cfg.tcp_listen_port).flatten ()); | ||||
| 		
 | ||||
| 		let tcp_listener = if let Some (tcp_port) = tcp_port { | ||||
| 			let cfg = udp_over_tcp::server::Config { | ||||
| 				tcp_port, | ||||
| 				udp_port: listen_addr.port (), | ||||
| 			}; | ||||
| pub async fn main (opt: Opt) -> anyhow::Result <()> 
 | ||||
| { | ||||
| 	let listen_addr = opt.listen_addr.unwrap_or_else (|| String::from ("0.0.0.0:30380")).parse ()?; | ||||
| 	let (mut incoming, server_cert) = make_server_endpoint (listen_addr)?; | ||||
| 	println! ("Base64 cert: {}", base64::encode (&server_cert)); | ||||
| 	
 | ||||
| 	tokio::fs::create_dir_all ("ptth_quic_output").await?; | ||||
| 	tokio::fs::write ("ptth_quic_output/quic_server.crt", &server_cert).await?; | ||||
| 	
 | ||||
| 	let relay_state = Arc::new (RelayState::default ()); | ||||
| 	
 | ||||
| 	let make_svc = { | ||||
| 		let relay_state = Arc::clone (&relay_state); | ||||
| 		make_service_fn (move |_conn| { | ||||
| 			let relay_state = Arc::clone (&relay_state); | ||||
| 			
 | ||||
| 			Some (udp_over_tcp::server::Listener::new (cfg).await?) | ||||
| 		} | ||||
| 		else { | ||||
| 			None | ||||
| 		}; | ||||
| 		
 | ||||
| 		Ok (Self { | ||||
| 			endpoint, | ||||
| 			listen_addr, | ||||
| 			metrics: Default::default (), | ||||
| 			server_cert, | ||||
| 			tcp_listener, | ||||
| 		}) | ||||
| 	} | ||||
| 	
 | ||||
| 	pub fn listen_addr (&self) -> SocketAddr { | ||||
| 		self.listen_addr | ||||
| 	} | ||||
| 	
 | ||||
| 	pub fn server_cert (&self) -> &[u8] { | ||||
| 		&self.server_cert | ||||
| 	} | ||||
| 	
 | ||||
| 	pub fn tcp_listen_port (&self) -> anyhow::Result <Option <u16>> { | ||||
| 		match self.tcp_listener.as_ref () { | ||||
| 			None => Ok (None), | ||||
| 			Some (tcp_listener) => Ok (tcp_listener.tcp_port ()?.into ()), | ||||
| 		} | ||||
| 	} | ||||
| 	
 | ||||
| 	pub async fn run (self) -> anyhow::Result <()> { | ||||
| 		let Self { | ||||
| 			endpoint, | ||||
| 			listen_addr, | ||||
| 			metrics, | ||||
| 			server_cert, | ||||
| 			tcp_listener, | ||||
| 		} = self; | ||||
| 		
 | ||||
| 		let mut relay_state = RelayState::default (); | ||||
| 		relay_state.metrics = metrics; | ||||
| 		if let Err (e) = relay_state.reload_config ().await { | ||||
| 			error! ("{:?}", e); | ||||
| 		} | ||||
| 		let relay_state = Arc::new (relay_state); | ||||
| 		
 | ||||
| 		let make_svc = { | ||||
| 			let relay_state = Arc::clone (&relay_state); | ||||
| 			make_service_fn (move |_conn| { | ||||
| 				let relay_state = Arc::clone (&relay_state); | ||||
| 				
 | ||||
| 				async move { | ||||
| 					Ok::<_, String> (service_fn (move |req| { | ||||
| 						let relay_state = Arc::clone (&relay_state); | ||||
| 						
 | ||||
| 						handle_http (req, relay_state) | ||||
| 					})) | ||||
| 				} | ||||
| 			}) | ||||
| 		}; | ||||
| 		
 | ||||
| 		let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); | ||||
| 		let http_server = Server::bind (&http_addr); | ||||
| 		
 | ||||
| 		let _task_reload_config = { | ||||
| 			let relay_state = Arc::clone (&relay_state); | ||||
| 			tokio::spawn (async move { | ||||
| 				let mut interval = tokio::time::interval (std::time::Duration::from_secs (60)); | ||||
| 				interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); | ||||
| 				
 | ||||
| 				loop { | ||||
| 					interval.tick ().await; | ||||
| 					
 | ||||
| 					relay_state.reload_config ().await.ok (); | ||||
| 				} | ||||
| 			}) | ||||
| 		}; | ||||
| 		
 | ||||
| 		let task_quic_server = { | ||||
| 			let relay_state = Arc::clone (&relay_state); | ||||
| 			tokio::spawn (async move { | ||||
| 				while let Some (conn) = endpoint.accept ().await { | ||||
| 			async move { | ||||
| 				Ok::<_, String> (service_fn (move |req| { | ||||
| 					let relay_state = Arc::clone (&relay_state); | ||||
| 					
 | ||||
| 					// Each new peer QUIC connection gets its own task
 | ||||
| 					tokio::spawn (async move { | ||||
| 						let active = relay_state.stats.quic.connect (); | ||||
| 						
 | ||||
| 						debug! ("QUIC connections: {}", active); | ||||
| 						
 | ||||
| 						match handle_quic_connection (Arc::clone (&relay_state), conn).await { | ||||
| 							Ok (_) => (), | ||||
| 							Err (e) => warn! ("handle_quic_connection `{:?}`", e), | ||||
| 						} | ||||
| 						
 | ||||
| 						let active = relay_state.stats.quic.disconnect (); | ||||
| 						debug! ("QUIC connections: {}", active); | ||||
| 					}); | ||||
| 				} | ||||
| 					handle_http (req, relay_state) | ||||
| 				})) | ||||
| 			} | ||||
| 		}) | ||||
| 	}; | ||||
| 	
 | ||||
| 	let http_addr = SocketAddr::from (([0, 0, 0, 0], 4004)); | ||||
| 	let http_server = Server::bind (&http_addr); | ||||
| 	
 | ||||
| 	let tcp_port = 30382; | ||||
| 	let tcp_listener = TcpListener::bind (("127.0.0.1", tcp_port)).await?; | ||||
| 	
 | ||||
| 	let task_quic_server = { | ||||
| 		let relay_state = Arc::clone (&relay_state); | ||||
| 		tokio::spawn (async move { | ||||
| 			while let Some (conn) = incoming.next ().await { | ||||
| 				let relay_state = Arc::clone (&relay_state); | ||||
| 				
 | ||||
| 				Ok::<_, anyhow::Error> (()) | ||||
| 			}) | ||||
| 		}; | ||||
| 		
 | ||||
| 		let task_direc_server = { | ||||
| 			let relay_state = Arc::clone (&relay_state); | ||||
| 			
 | ||||
| 			tokio::spawn (async move { | ||||
| 				let sock = UdpSocket::bind("0.0.0.0:30379").await?; | ||||
| 				let mut buf = [0; 2048]; | ||||
| 				loop { | ||||
| 					let (len, addr) = sock.recv_from (&mut buf).await?; | ||||
| 					debug! ("{:?} bytes received from {:?}", len, addr); | ||||
| 					
 | ||||
| 					let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x)); | ||||
| 					
 | ||||
| 					{ | ||||
| 						let mut direc_cookies = relay_state.direc_cookies.lock ().await; | ||||
| 						
 | ||||
| 						if let Some (direc_state) = direc_cookies.remove (&packet) { | ||||
| 							debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id); | ||||
| 							direc_state.p2_addr.send (addr).ok (); | ||||
| 						} | ||||
| 						else { | ||||
| 							debug! ("UDP packet didn't match any PTTH_DIREC cookie"); | ||||
| 						} | ||||
| 					} | ||||
| 				} | ||||
| 				
 | ||||
| 				Ok::<_, anyhow::Error> (()) | ||||
| 			}) | ||||
| 		}; | ||||
| 		
 | ||||
| 		let task_http_server = tokio::spawn (async move { | ||||
| 			http_server.serve (make_svc).await?; | ||||
| 			Ok::<_, anyhow::Error> (()) | ||||
| 		}); | ||||
| 		
 | ||||
| 		debug! ("Serving HTTP on {:?}", http_addr); | ||||
| 		
 | ||||
| 		if let Some (tcp_listener) = tcp_listener { | ||||
| 			tokio::spawn (async move { | ||||
| 				if let Err (e) = tcp_listener.run ().await { | ||||
| 					eprintln! ("udp_over_tcp::server::main exited with err {:?}", e); | ||||
| 				} | ||||
| 				
 | ||||
| 				Ok::<_, anyhow::Error> (()) | ||||
| 			}); | ||||
| 		} | ||||
| 		
 | ||||
| 		{ | ||||
| 			let config = relay_state.config.load (); | ||||
| 			dbg! (&config.webhook_url); | ||||
| 			if let Some (webhook_url) = config.webhook_url.clone () { | ||||
| 				let j = json! ({ | ||||
| 					"text": "Booting up", | ||||
| 				}).to_string (); | ||||
| 				let http_client = relay_state.http_client.clone (); | ||||
| 				// Each new peer QUIC connection gets its own task
 | ||||
| 				tokio::spawn (async move { | ||||
| 					http_client.post (webhook_url).body (j).send ().await | ||||
| 					let active = relay_state.stats.quic.connect (); | ||||
| 					debug! ("QUIC connections: {}", active); | ||||
| 					
 | ||||
| 					match handle_quic_connection (Arc::clone (&relay_state), conn).await { | ||||
| 						Ok (_) => (), | ||||
| 						Err (e) => warn! ("handle_quic_connection `{:?}`", e), | ||||
| 					} | ||||
| 					
 | ||||
| 					let active = relay_state.stats.quic.disconnect (); | ||||
| 					debug! ("QUIC connections: {}", active); | ||||
| 				}); | ||||
| 			} | ||||
| 		} | ||||
| 			
 | ||||
| 			Ok::<_, anyhow::Error> (()) | ||||
| 		}) | ||||
| 	}; | ||||
| 	
 | ||||
| 	let task_direc_server = { | ||||
| 		let relay_state = Arc::clone (&relay_state); | ||||
| 		
 | ||||
| 		tokio::select! { | ||||
| 			_val = task_quic_server => { | ||||
| 				eprintln! ("QUIC relay server exited, exiting"); | ||||
| 			}, | ||||
| 			_val = task_http_server => { | ||||
| 				eprintln! ("HTTP server exited, exiting"); | ||||
| 			}, | ||||
| 			_val = task_direc_server => { | ||||
| 				eprintln! ("PTTH_DIREC server exited, exiting"); | ||||
| 			}, | ||||
| 		} | ||||
| 		
 | ||||
| 		Ok (()) | ||||
| 	} | ||||
| 		tokio::spawn (async move { | ||||
| 			let sock = UdpSocket::bind("0.0.0.0:30379").await?; | ||||
| 			let mut buf = [0; 2048]; | ||||
| 			loop { | ||||
| 				let (len, addr) = sock.recv_from (&mut buf).await?; | ||||
| 				debug! ("{:?} bytes received from {:?}", len, addr); | ||||
| 				
 | ||||
| 				let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x)); | ||||
| 				
 | ||||
| 				{ | ||||
| 					let mut direc_cookies = relay_state.direc_cookies.lock ().await; | ||||
| 					
 | ||||
| 					if let Some (direc_state) = direc_cookies.remove (&packet) { | ||||
| 						debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id); | ||||
| 						direc_state.p2_addr.send (addr).ok (); | ||||
| 					} | ||||
| 					else { | ||||
| 						debug! ("UDP packet didn't match any PTTH_DIREC cookie"); | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 			
 | ||||
| 			Ok::<_, anyhow::Error> (()) | ||||
| 		}) | ||||
| 	}; | ||||
| 	
 | ||||
| 	let task_http_server = tokio::spawn (async move { | ||||
| 		http_server.serve (make_svc).await?; | ||||
| 		Ok::<_, anyhow::Error> (()) | ||||
| 	}); | ||||
| 	
 | ||||
| 	let task_tcp_server = { | ||||
| 		let relay_state = Arc::clone (&relay_state); | ||||
| 		tokio::spawn (async move { | ||||
| 			loop { | ||||
| 				let (tcp_socket, _) = tcp_listener.accept ().await?; | ||||
| 				
 | ||||
| 				let relay_state = Arc::clone (&relay_state); | ||||
| 				tokio::spawn (async move { | ||||
| 					let (_client_recv, _client_send) = tcp_socket.into_split (); | ||||
| 					
 | ||||
| 					debug! ("Accepted direct TCP connection P1 --> P3"); | ||||
| 					
 | ||||
| 					let p4_server_proxies = relay_state.p4_server_proxies.lock ().await; | ||||
| 					let _p4 = match p4_server_proxies.get ("bogus_server") { | ||||
| 						Some (x) => x, | ||||
| 						None => bail! ("That server isn't connected"), | ||||
| 					}; | ||||
| 					
 | ||||
| 					// unimplemented! ();
 | ||||
| 					/* | ||||
| 					p4.req_channel.send (RequestP2ToP4 { | ||||
| 						client_send, | ||||
| 						client_recv, | ||||
| 						client_id: "bogus_client".to_string (), | ||||
| 					}).await.map_err (|_| anyhow::anyhow! ("Can't send request to P4 server"))?; | ||||
| 					*/ | ||||
| 					Ok (()) | ||||
| 				}); | ||||
| 			} | ||||
| 			
 | ||||
| 			Ok::<_, anyhow::Error> (()) | ||||
| 		}) | ||||
| 	}; | ||||
| 	
 | ||||
| 	debug! ("Serving HTTP on {:?}", http_addr); | ||||
| 	
 | ||||
| 	task_quic_server.await??; | ||||
| 	task_http_server.await??; | ||||
| 	task_tcp_server.await??; | ||||
| 	task_direc_server.await??; | ||||
| 	
 | ||||
| 	Ok (()) | ||||
| } | ||||
| 
 | ||||
| async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>) | ||||
|  | @ -254,36 +178,9 @@ async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>) | |||
| 
 | ||||
| #[derive (Default)] | ||||
| struct RelayState { | ||||
| 	config: arc_swap::ArcSwap <Config>, | ||||
| 	p4_server_proxies: Mutex <HashMap <PeerId, P4State>>, | ||||
| 	direc_cookies: Mutex <HashMap <Vec <u8>, DirecState>>, | ||||
| 	metrics: Arc <RwLock <Metrics>>, | ||||
| 	stats: Stats, | ||||
| 	http_client: reqwest::Client, | ||||
| } | ||||
| 
 | ||||
| #[derive (Default)] | ||||
| struct Config { | ||||
| 	ip_nicknames: BTreeMap <[u8; 4], String>, | ||||
| 	tcp_listen_port: Option <u16>, | ||||
| 	webhook_url: Option <String>, | ||||
| } | ||||
| 
 | ||||
| impl From <ConfigFile> for Config { | ||||
| 	fn from (x: ConfigFile) -> Self { | ||||
| 		Self { | ||||
| 			ip_nicknames: x.ip_nicknames.into_iter ().collect (), | ||||
| 			tcp_listen_port: x.tcp_listen_port, | ||||
| 			webhook_url: x.webhook_url, | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| #[derive (Deserialize)] | ||||
| struct ConfigFile { | ||||
| 	ip_nicknames: Vec <([u8; 4], String)>, | ||||
| 	tcp_listen_port: Option <u16>, | ||||
| 	webhook_url: Option <String>, | ||||
| } | ||||
| 
 | ||||
| struct DirecState { | ||||
|  | @ -325,24 +222,11 @@ impl ConnectEvents { | |||
| 
 | ||||
| struct P4State { | ||||
| 	req_channel: mpsc::Sender <RequestP2ToP4>, | ||||
| } | ||||
| 
 | ||||
| async fn load_config () -> anyhow::Result <ConfigFile> | ||||
| { | ||||
| 	let s = tokio::fs::read_to_string ("config/ptth_quic_relay_server.json").await?; | ||||
| 	let cfg: ConfigFile = serde_json::from_str (&s)?; | ||||
| 	Ok (cfg) | ||||
| 	
 | ||||
| } | ||||
| 
 | ||||
| impl RelayState { | ||||
| 	async fn reload_config (&self) -> anyhow::Result <()> { | ||||
| 		let config = load_config ().await?; | ||||
| 		let config = Arc::new (Config::from (config)); | ||||
| 		
 | ||||
| 		self.config.store (config); | ||||
| 		
 | ||||
| 		Ok (()) | ||||
| 	} | ||||
| 	
 | ||||
| } | ||||
| 
 | ||||
| struct RequestP2ToP4 { | ||||
|  | @ -417,69 +301,30 @@ async fn handle_quic_connection ( | |||
| 	conn: quinn::Connecting, | ||||
| ) -> anyhow::Result <()> 
 | ||||
| { | ||||
| 	let id = Ulid::generate (); | ||||
| 	
 | ||||
| 	let config = relay_state.config.load (); | ||||
| 	
 | ||||
| 	let remote_addr = conn.remote_address (); | ||||
| 	let ip_nickname = match remote_addr { | ||||
| 		SocketAddr::V4 (x) => { | ||||
| 			let ip = x.ip ().octets (); | ||||
| 			
 | ||||
| 			match config.ip_nicknames.get (&ip) { | ||||
| 				Some (nick) => nick.as_str (), | ||||
| 				_ => "Unknown", | ||||
| 			} | ||||
| 		}, | ||||
| 		_ => "Unknown, not IPv4", | ||||
| 	}; | ||||
| 	
 | ||||
| 	debug! ("EHG7NVUD Incoming QUIC connection {} from {:?} ({})", id, remote_addr, ip_nickname); | ||||
| 	
 | ||||
| 	if let Some (webhook_url) = config.webhook_url.clone () { | ||||
| 		let j = json! ({ | ||||
| 			"text": format! ("Incoming QUIC connection from {:?} ({})", remote_addr, ip_nickname), | ||||
| 		}).to_string (); | ||||
| 		let http_client = relay_state.http_client.clone (); | ||||
| 		tokio::spawn (async move { | ||||
| 			http_client.post (webhook_url).body (j).send ().await | ||||
| 		}); | ||||
| 	} | ||||
| 	
 | ||||
| 	let conn = conn.await?; | ||||
| 	let mut conn = conn.await?; | ||||
| 	
 | ||||
| 	// Everyone who connects must identify themselves with the first
 | ||||
| 	// bi stream
 | ||||
| 	// TODO: Timeout
 | ||||
| 	
 | ||||
| 	let (mut send, mut recv) = conn.accept_bi ().await?; | ||||
| 	let (mut send, mut recv) = conn.bi_streams.next ().await.ok_or_else (|| anyhow::anyhow! ("QUIC client didn't identify itself"))??; | ||||
| 	
 | ||||
| 	let peer = protocol::p3_accept_peer (&mut recv).await?; | ||||
| 	
 | ||||
| 	match peer { | ||||
| 		protocol::P3Peer::P2ClientProxy (peer) => { | ||||
| 			trace! ("H36JTVE5 Handling connection {} as P2 client", id); | ||||
| 			trace! ("Accepting connection from P2 client"); | ||||
| 			// TODO: Check authorization for P2 peers
 | ||||
| 			
 | ||||
| 			protocol::p3_authorize_p2_peer (&mut send).await?; | ||||
| 			handle_p2_connection (relay_state, conn, peer).await?; | ||||
| 		}, | ||||
| 		protocol::P3Peer::P4ServerProxy (peer) => { | ||||
| 			trace! ("LRHUKB7K Handling connection {} as P4 end server", id); | ||||
| 			trace! ("Accepting connection from P4 end server"); | ||||
| 			// TODO: Check authorization for P4 peers
 | ||||
| 			
 | ||||
| 			protocol::p3_authorize_p4_peer (&mut send).await?; | ||||
| 			let metrics = Arc::clone (&relay_state.metrics); | ||||
| 			
 | ||||
| 			{ | ||||
| 				let mut m = metrics.write ().await; | ||||
| 				m.connected_end_servers += 1; | ||||
| 			} | ||||
| 			handle_p4_connection (relay_state, conn, peer).await?; | ||||
| 			{ | ||||
| 				let mut m = metrics.write ().await; | ||||
| 				m.connected_end_servers -= 1; | ||||
| 			} | ||||
| 		}, | ||||
| 	} | ||||
| 	
 | ||||
|  | @ -488,13 +333,19 @@ async fn handle_quic_connection ( | |||
| 
 | ||||
| async fn handle_p2_connection ( | ||||
| 	relay_state: Arc <RelayState>, | ||||
| 	conn: quinn::Connection, | ||||
| 	conn: quinn::NewConnection, | ||||
| 	peer: protocol::P2ClientProxy, | ||||
| ) -> anyhow::Result <()> | ||||
| { | ||||
| 	let client_id = peer.id; | ||||
| 	
 | ||||
| 	while let Ok ((send, mut recv)) = conn.accept_bi ().await { | ||||
| 	let quinn::NewConnection { | ||||
| 		mut bi_streams, | ||||
| 		.. | ||||
| 	} = conn; | ||||
| 	
 | ||||
| 	while let Some (bi_stream) = bi_streams.next ().await { | ||||
| 		let (send, mut recv) = bi_stream?; | ||||
| 		let relay_state = Arc::clone (&relay_state); | ||||
| 		let client_id = client_id.clone (); | ||||
| 		
 | ||||
|  | @ -606,11 +457,15 @@ async fn handle_direc_p2_to_p4 ( | |||
| 
 | ||||
| async fn handle_p4_connection ( | ||||
| 	relay_state: Arc <RelayState>, | ||||
| 	connection: quinn::Connection, | ||||
| 	conn: quinn::NewConnection, | ||||
| 	peer: protocol::P4ServerProxy, | ||||
| ) -> anyhow::Result <()> | ||||
| { | ||||
| 	let server_id = peer.id; | ||||
| 	let quinn::NewConnection { | ||||
| 		connection, | ||||
| 		.. | ||||
| 	} = conn; | ||||
| 	let (tx, mut rx) = mpsc::channel (2); | ||||
| 	
 | ||||
| 	let p4_state = P4State { | ||||
|  |  | |||
|  | @ -1,11 +1,7 @@ | |||
| pub mod client_proxy; | ||||
| pub mod connection; | ||||
| pub mod crypto; | ||||
| pub mod executable_end_server; | ||||
| pub mod executable_relay_server; | ||||
| pub mod prelude; | ||||
| pub mod protocol; | ||||
| pub mod quinn_utils; | ||||
| 
 | ||||
| #[cfg (test)] | ||||
| mod tests; | ||||
|  |  | |||
|  | @ -2,11 +2,7 @@ pub use std::{ | |||
| 	collections::*, | ||||
| 	ffi::OsString, | ||||
| 	iter::FromIterator, | ||||
| 	net::{ | ||||
| 		Ipv4Addr, | ||||
| 		SocketAddr, | ||||
| 		SocketAddrV4, | ||||
| 	}, | ||||
| 	net::SocketAddr, | ||||
| 	sync::{ | ||||
| 		Arc, | ||||
| 		atomic::{ | ||||
|  | @ -30,14 +26,9 @@ pub use tokio::{ | |||
| 		AsyncReadExt, | ||||
| 		AsyncWriteExt, | ||||
| 	}, | ||||
| 	net::{ | ||||
| 		TcpListener, | ||||
| 		TcpSocket, | ||||
| 		UdpSocket, | ||||
| 	}, | ||||
| 	net::TcpListener, | ||||
| 	sync::{ | ||||
| 		Mutex, | ||||
| 		RwLock, | ||||
| 		mpsc, | ||||
| 	}, | ||||
| 	task::JoinHandle, | ||||
|  | @ -46,9 +37,6 @@ pub use rand::{ | |||
| 	Rng, | ||||
| 	RngCore, | ||||
| }; | ||||
| pub use rusty_ulid::Ulid; | ||||
| pub use serde::Deserialize; | ||||
| pub use serde_json::json; | ||||
| pub use tracing::{ | ||||
| 	debug, | ||||
| 	error, | ||||
|  |  | |||
|  | @ -33,14 +33,14 @@ pub async fn p2_connect_to_p3 ( | |||
| 	endpoint: &quinn::Endpoint, | ||||
| 	relay_addr: std::net::SocketAddr, | ||||
| 	client_id: &str, | ||||
| ) -> Result <quinn::Connection> 
 | ||||
| ) -> Result <quinn::NewConnection> 
 | ||||
| { | ||||
| 	if client_id.as_bytes ().len () > MAX_ID_LENGTH { | ||||
| 		bail! ("Client ID is longer than MAX_ID_LENGTH"); | ||||
| 	} | ||||
| 	
 | ||||
| 	let new_conn = endpoint.connect (relay_addr, "localhost")?.await?; | ||||
| 	let (mut send, mut recv) = new_conn.open_bi ().await?; | ||||
| 	let (mut send, mut recv) = new_conn.connection.open_bi ().await?; | ||||
| 	let cmd_type = Command::CONNECT_P2_TO_P3.0; | ||||
| 	
 | ||||
| 	send.write_all (&[cmd_type, 0, 0, 0]).await?; | ||||
|  | @ -251,21 +251,21 @@ pub async fn p4_connect_to_p3 ( | |||
| 	endpoint: &quinn::Endpoint, | ||||
| 	relay_addr: std::net::SocketAddr, | ||||
| 	server_id: &str, | ||||
| ) -> Result <quinn::Connection> 
 | ||||
| ) -> Result <quinn::NewConnection> 
 | ||||
| { | ||||
| 	if server_id.as_bytes ().len () > MAX_ID_LENGTH { | ||||
| 		bail! ("Server ID is longer than MAX_ID_LENGTH"); | ||||
| 	} | ||||
| 	
 | ||||
| 	let new_conn = endpoint.connect (relay_addr, "localhost")?.await.context ("UXTDVL2V quinn::Endpoint::connect")?; | ||||
| 	let (mut send, mut recv) = new_conn.open_bi ().await?; | ||||
| 	let new_conn = endpoint.connect (relay_addr, "localhost")?.await?; | ||||
| 	let (mut send, mut recv) = new_conn.connection.open_bi ().await?; | ||||
| 	let cmd_type = Command::CONNECT_P4_TO_P3.0; | ||||
| 	
 | ||||
| 	send.write_all (&[cmd_type, 0, 0, 0]).await?; | ||||
| 	send_lv_string (&mut send, server_id).await?; | ||||
| 	
 | ||||
| 	expect_exact_response (&mut recv, [Command::OKAY.0, cmd_type, 0, 0]).await | ||||
| 	.context ("WMGW2RXU P4 didn't get OK response when connecting to P3")?; | ||||
| 	.context ("P4 didn't get OK response when connecting to P3")?; | ||||
| 	
 | ||||
| 	Ok (new_conn) | ||||
| } | ||||
|  |  | |||
|  | @ -8,7 +8,7 @@ use std::{ | |||
| }; | ||||
| 
 | ||||
| use quinn::{ | ||||
| 	ClientConfig, Endpoint, | ||||
| 	ClientConfig, Endpoint, Incoming, | ||||
| 	ServerConfig, TransportConfig, | ||||
| }; | ||||
| 
 | ||||
|  | @ -26,7 +26,7 @@ pub fn make_client_endpoint( | |||
| 	let mut transport = quinn::TransportConfig::default (); | ||||
| 	transport.keep_alive_interval (Some (Duration::from_millis (5_000))); | ||||
| 	
 | ||||
| 	client_cfg.transport_config (Arc::new (transport)); | ||||
| 	client_cfg.transport = Arc::new (transport); | ||||
| 	
 | ||||
| 	let mut endpoint = Endpoint::client (bind_addr)?; | ||||
| 	endpoint.set_default_client_config (client_cfg); | ||||
|  | @ -41,10 +41,10 @@ pub fn make_client_endpoint( | |||
| /// - a stream of incoming QUIC connections
 | ||||
| /// - server certificate serialized into DER format
 | ||||
| #[allow(unused)] | ||||
| pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Endpoint, Vec<u8>)> { | ||||
| pub fn make_server_endpoint(bind_addr: SocketAddr) -> anyhow::Result<(Incoming, Vec<u8>)> { | ||||
| 	let (server_config, server_cert) = configure_server()?; | ||||
| 	let endpoint = Endpoint::server (server_config, bind_addr)?; | ||||
| 	Ok((endpoint, server_cert)) | ||||
| 	let (_endpoint, incoming) = Endpoint::server (server_config, bind_addr)?; | ||||
| 	Ok((incoming, server_cert)) | ||||
| } | ||||
| 
 | ||||
| /// Builds default quinn client config and trusts given certificates.
 | ||||
|  |  | |||
|  | @ -1,110 +0,0 @@ | |||
| use crate::prelude::*; | ||||
| 
 | ||||
| #[test] | ||||
| fn end_to_end () -> anyhow::Result <()> { | ||||
| 	let rt = tokio::runtime::Runtime::new ()?; | ||||
| 	rt.block_on (end_to_end_async ())?; | ||||
| 	Ok (()) | ||||
| } | ||||
| 
 | ||||
| async fn end_to_end_async () -> anyhow::Result <()> { | ||||
| 	use crate::executable_end_server as server; | ||||
| 	use crate::executable_relay_server as relay; | ||||
| 	
 | ||||
| 	let relay_opt = relay::Opt { | ||||
| 		listen_addr: "127.0.0.1:0".to_string ().into (), | ||||
| 		tcp_listen_port: Some (0), | ||||
| 	}; | ||||
| 	let relay_app = relay::App::new (relay_opt).await?; | ||||
| 	
 | ||||
| 	let relay_quic_port = relay_app.listen_addr ().port (); | ||||
| 	let relay_cert = Vec::from (relay_app.server_cert ()); | ||||
| 	let relay_metrics = Arc::clone (&relay_app.metrics); | ||||
| 	let tcp_listen_port = relay_app.tcp_listen_port ()?.unwrap (); | ||||
| 	
 | ||||
| 	assert_ne! (tcp_listen_port, 0); | ||||
| 	
 | ||||
| 	let task_relay = tokio::spawn (async move { | ||||
| 		relay_app.run ().await | ||||
| 	}); | ||||
| 	
 | ||||
| 	{ | ||||
| 		let m = relay_metrics.read ().await; | ||||
| 		assert_eq! (m.connected_end_servers, 0); | ||||
| 	} | ||||
| 	
 | ||||
| 	// Connect with wrong port, should fail
 | ||||
| 	
 | ||||
| 	let server_conf = server::Config { | ||||
| 		debug_echo: false, | ||||
| 		id: "bogus".into (), | ||||
| 		relay_addr: "127.0.0.1:80".parse ()?, | ||||
| 		relay_cert: relay_cert.clone (), | ||||
| 		use_udp_over_tcp: false, | ||||
| 	}; | ||||
| 	
 | ||||
| 	let server_err = server::P4EndServer::connect (server_conf).await; | ||||
| 	
 | ||||
| 	assert! (server_err.is_err ()); | ||||
| 	
 | ||||
| 	// Connect with wrong cert, should fail
 | ||||
| 	
 | ||||
| 	let server_conf = server::Config { | ||||
| 		debug_echo: false, | ||||
| 		id: "bogus".into (), | ||||
| 		relay_addr: ([127, 0, 0, 1], relay_quic_port).into (), | ||||
| 		relay_cert: vec! [], | ||||
| 		use_udp_over_tcp: false, | ||||
| 	}; | ||||
| 	
 | ||||
| 	let server_err = server::P4EndServer::connect (server_conf).await; | ||||
| 	
 | ||||
| 	assert! (server_err.is_err ()); | ||||
| 	
 | ||||
| 	{ | ||||
| 		let m = relay_metrics.read ().await; | ||||
| 		assert_eq! (m.connected_end_servers, 0); | ||||
| 	} | ||||
| 	
 | ||||
| 	// Connect over UDP
 | ||||
| 	
 | ||||
| 	let server_conf = server::Config { | ||||
| 		debug_echo: false, | ||||
| 		id: "bogus_VZBNRUA5".into (), | ||||
| 		relay_addr: ([127, 0, 0, 1], relay_quic_port).into (), | ||||
| 		relay_cert: relay_cert.clone (), | ||||
| 		use_udp_over_tcp: false, | ||||
| 	}; | ||||
| 	
 | ||||
| 	let t = Instant::now (); | ||||
| 	let (server, _) = server::P4EndServer::connect (server_conf).await?; | ||||
| 	let dur = t.elapsed (); | ||||
| 	assert! (dur < Duration::from_millis (1_000), "{:?}", dur); | ||||
| 	
 | ||||
| 	{ | ||||
| 		let m = relay_metrics.read ().await; | ||||
| 		assert_eq! (m.connected_end_servers, 1); | ||||
| 	} | ||||
| 	
 | ||||
| 	// Connect over TCP
 | ||||
| 	
 | ||||
| 	let server_conf = server::Config { | ||||
| 		debug_echo: false, | ||||
| 		id: "bogus_6E5CZIAI".into (), | ||||
| 		relay_addr: ([127, 0, 0, 1], tcp_listen_port).into (), | ||||
| 		relay_cert: relay_cert.clone (), | ||||
| 		use_udp_over_tcp: true, | ||||
| 	}; | ||||
| 	
 | ||||
| 	let t = Instant::now (); | ||||
| 	let (server, _) = server::P4EndServer::connect (server_conf).await?; | ||||
| 	let dur = t.elapsed (); | ||||
| 	assert! (dur < Duration::from_millis (1_000), "{:?}", dur); | ||||
| 	
 | ||||
| 	{ | ||||
| 		let m = relay_metrics.read ().await; | ||||
| 		assert_eq! (m.connected_end_servers, 2); | ||||
| 	} | ||||
| 	
 | ||||
| 	Ok (()) | ||||
| } | ||||
|  | @ -12,19 +12,15 @@ repository = "https://six-five-six-four.com/git/reactor/ptth" | |||
| [dependencies] | ||||
| anyhow = "1.0.38" | ||||
| blake3 = "1.0.0" | ||||
| fltk = "1.3.24" | ||||
| fltk = "1.2.8" | ||||
| ptth_quic = { path = "../ptth_quic" } | ||||
| quinn = "0.9.3" | ||||
| quinn = "0.8.5" | ||||
| rand = "0.8.4" | ||||
| rand_chacha = "0.3.1" | ||||
| reqwest = "0.11.4" | ||||
| rmp-serde = "0.15.5" | ||||
| serde = "1.0.130" | ||||
| structopt = "0.3.20" | ||||
| tokio = { version = "1.8.1", features = ["full"] } | ||||
| tracing-subscriber = "0.2.16" | ||||
| tracing = "0.1.25" | ||||
| 
 | ||||
| [dependencies.reqwest] | ||||
| version = "0.11.4"  | ||||
| default-features = false | ||||
| features = ["stream", "rustls-tls", "hyper-rustls"]  | ||||
|  |  | |||
|  | @ -235,7 +235,10 @@ fn main () -> anyhow::Result <()> { | |||
| 		
 | ||||
| 		let client_id = opt.client_id.unwrap_or_else (|| "bogus_client".to_string ()); | ||||
| 		
 | ||||
| 		let connection = protocol::p2_connect_to_p3 (&endpoint, relay_addr, &client_id).await | ||||
| 		let quinn::NewConnection { | ||||
| 			connection, | ||||
| 			.. | ||||
| 		} = protocol::p2_connect_to_p3 (&endpoint, relay_addr, &client_id).await | ||||
| 		.context ("P2 can't connect to P3")?; | ||||
| 		
 | ||||
| 		Ok::<_, anyhow::Error> (connection) | ||||
|  |  | |||
|  | @ -11,35 +11,30 @@ repository = "https://six-five-six-four.com/git/reactor/ptth" | |||
| 
 | ||||
| [dependencies] | ||||
| 
 | ||||
| anyhow = "1.0.66" | ||||
| anyhow = "1.0.38" | ||||
| base64 = "0.13.0" | ||||
| blake3 = "1.0.0" | ||||
| chrono = { version = "0.4.23", features = ["serde"] } | ||||
| chrono = { version = "0.4.19", features = ["serde"] } | ||||
| clap = "2.33.3" | ||||
| dashmap = "4.0.2" | ||||
| futures = "0.3.7" | ||||
| futures-util = "0.3.8" | ||||
| handlebars = "3.5.3" | ||||
| http = "0.2.3" | ||||
| hyper = { version = "0.14.23", features = ["http1", "http2", "server", "stream", "tcp"] } | ||||
| hyper = { version = "0.14.20", features = ["http1", "http2", "server", "stream", "tcp"] } | ||||
| itertools = "0.9.0" | ||||
| rand = "0.8.5" | ||||
| rand = "0.8.3" | ||||
| rmp-serde = "0.15.5" | ||||
| rusty_ulid = "1.0.0" | ||||
| serde = { version = "1.0.150", features = ["derive"] } | ||||
| serde_json = "1.0.89" | ||||
| serde_urlencoded = "0.7.1" | ||||
| thiserror = "1.0.37" | ||||
| tokio = { version = "1.23.0", features = [] } | ||||
| tokio-stream = "0.1.11" | ||||
| toml = "0.5.10" | ||||
| tracing = "0.1.37" | ||||
| rusty_ulid = "0.10.1" | ||||
| serde = { version = "1.0.117", features = ["derive"] } | ||||
| serde_json = "1.0.60" | ||||
| serde_urlencoded = "0.7.0" | ||||
| thiserror = "1.0.22" | ||||
| tokio = { version = "1.8.1", features = [] } | ||||
| tokio-stream = "0.1.3" | ||||
| toml = "0.5.7" | ||||
| tracing = "0.1.25" | ||||
| tracing-futures = "0.2.4" | ||||
| tracing-subscriber = "0.2.15" | ||||
| 
 | ||||
| ptth_core = { path = "../ptth_core", version = "2.0.0" } | ||||
| 
 | ||||
| [dependencies.reqwest] | ||||
| version = "0.11.13"  | ||||
| default-features = false | ||||
| features = ["stream", "rustls-tls", "hyper-rustls"]  | ||||
|  |  | |||
|  | @ -144,8 +144,6 @@ pub mod file { | |||
| 		
 | ||||
| 		pub news_url: Option <String>, | ||||
| 		pub hide_audit_log: Option <bool>, | ||||
| 		pub webhook_url: Option <String>, | ||||
| 		pub webhook_interval_s: Option <u32>, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  | @ -160,8 +158,6 @@ pub struct Config { | |||
| 	pub scraper_keys: HashMap <String, ScraperKey>, | ||||
| 	pub news_url: Option <String>, | ||||
| 	pub hide_audit_log: bool, | ||||
| 	pub webhook_url: Option <String>, | ||||
| 	pub webhook_interval_s: u32, | ||||
| } | ||||
| 
 | ||||
| impl Default for Config { | ||||
|  | @ -174,8 +170,6 @@ impl Default for Config { | |||
| 			scraper_keys: Default::default (), | ||||
| 			news_url: None, | ||||
| 			hide_audit_log: false, | ||||
| 			webhook_url: None, | ||||
| 			webhook_interval_s: 7200, | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | @ -205,8 +199,6 @@ impl TryFrom <file::Config> for Config { | |||
| 			scraper_keys, | ||||
| 			news_url: f.news_url, | ||||
| 			hide_audit_log: f.hide_audit_log.unwrap_or (false), | ||||
| 			webhook_url: f.webhook_url, | ||||
| 			webhook_interval_s: f.webhook_interval_s.unwrap_or (7200), | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -589,10 +589,6 @@ async fn handle_all ( | |||
| 	use routing::Route::*; | ||||
| 	
 | ||||
| 	let state = &*state; | ||||
| 	{ | ||||
| 		let mut counters = state.webhook_counters.write ().await; | ||||
| 		counters.requests_total += 1; | ||||
| 	} | ||||
| 	
 | ||||
| 	// The path is cloned here, so it's okay to consume the request
 | ||||
| 	// later.
 | ||||
|  | @ -822,7 +818,7 @@ pub async fn run_relay ( | |||
| 					
 | ||||
| 					let mut request_rendezvous = state_2.request_rendezvous.lock ().await; | ||||
| 					request_rendezvous.iter_mut () | ||||
| 					.for_each (|(_k, v)| { | ||||
| 					.for_each (|(k, v)| { | ||||
| 						match v { | ||||
| 							RequestRendezvous::ParkedServer (_) => (), | ||||
| 							RequestRendezvous::ParkedClients (requests) => requests.retain (|req| req.id.as_str () >= timeout_ulid.as_str ()), | ||||
|  | @ -882,11 +878,6 @@ pub async fn run_relay ( | |||
| 	
 | ||||
| 	state.audit_log.push (AuditEvent::new (AuditData::RelayStart)).await; | ||||
| 	
 | ||||
| 	{ | ||||
| 		let state = state.clone (); | ||||
| 		tokio::spawn (webhook_task (state)); | ||||
| 	} | ||||
| 	
 | ||||
| 	trace! ("Serving relay on {:?}", addr); | ||||
| 	
 | ||||
| 	server.with_graceful_shutdown (async { | ||||
|  | @ -922,64 +913,5 @@ pub async fn run_relay ( | |||
| 	Ok (()) | ||||
| } | ||||
| 
 | ||||
| async fn webhook_task (state: Arc <Relay>) { | ||||
| 	use crate::relay_state::MonitoringCounters; | ||||
| 	
 | ||||
| 	let client = reqwest::Client::default (); | ||||
| 	
 | ||||
| 	let webhook_interval_s = { | ||||
| 		let config = state.config.read ().await; | ||||
| 		config.webhook_interval_s | ||||
| 	}; | ||||
| 	dbg! (webhook_interval_s); | ||||
| 	
 | ||||
| 	let mut interval = tokio::time::interval (std::time::Duration::from_secs (webhook_interval_s.into ())); | ||||
| 	interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); | ||||
| 	
 | ||||
| 	let mut tick_seq = 1; | ||||
| 	let mut last_counters_reported = (MonitoringCounters::default (), Utc::now (), 0); | ||||
| 	
 | ||||
| 	loop { | ||||
| 		interval.tick ().await; | ||||
| 		
 | ||||
| 		let webhook_url = { | ||||
| 			let config = state.config.read ().await; | ||||
| 			config.webhook_url.clone () | ||||
| 		}; | ||||
| 		
 | ||||
| 		let webhook_url = match webhook_url { | ||||
| 			Some (x) => x, | ||||
| 			None => { | ||||
| 				continue; | ||||
| 			}, | ||||
| 		}; | ||||
| 		
 | ||||
| 		let now = Utc::now (); | ||||
| 		
 | ||||
| 		let counters = { | ||||
| 			state.webhook_counters.read ().await.clone () | ||||
| 		}; | ||||
| 		
 | ||||
| 		let requests_total_diff = counters.requests_total - last_counters_reported.0.requests_total; | ||||
| 		
 | ||||
| 		let j = serde_json::json! ({ | ||||
| 			"text": format! ("From tick {} to {}: Handled {} requests", last_counters_reported.2, tick_seq, requests_total_diff), | ||||
| 		}).to_string (); | ||||
| 		
 | ||||
| 		match client.post (webhook_url).body (j).send ().await { | ||||
| 			Ok (resp) => { | ||||
| 				if resp.status () == StatusCode::OK { | ||||
| 					last_counters_reported = (counters, now, tick_seq); | ||||
| 				} | ||||
| 				else { | ||||
| 					dbg! (resp.status ()); | ||||
| 				} | ||||
| 			}, | ||||
| 			Err (e) => { dbg! (e); }, | ||||
| 		} | ||||
| 		tick_seq += 1; | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| #[cfg (test)] | ||||
| mod tests; | ||||
|  |  | |||
|  | @ -101,16 +101,6 @@ pub struct Relay { | |||
| 	/// Memory backend for audit logging
 | ||||
| 	// TODO: Add file / database / network server logging backend
 | ||||
| 	pub (crate) audit_log: BoundedVec <AuditEvent>, | ||||
| 	
 | ||||
| 	/// Counters for webhook reporting
 | ||||
| 	pub (crate) webhook_counters: RwLock <MonitoringCounters>, | ||||
| } | ||||
| 
 | ||||
| #[derive (Clone, Default)] | ||||
| pub (crate) struct MonitoringCounters { | ||||
| 	pub (crate) requests_total: u64, | ||||
| 	pub (crate) requests_by_scraper_api: HashMap <String, u64>, | ||||
| 	pub (crate) requests_by_email: HashMap <String, u64>, | ||||
| } | ||||
| 
 | ||||
| #[derive (Clone)] | ||||
|  | @ -214,7 +204,6 @@ impl TryFrom <Config> for Relay { | |||
| 			shutdown_watch_rx, | ||||
| 			unregistered_servers: BoundedVec::new (20), | ||||
| 			audit_log: BoundedVec::new (256), | ||||
| 			webhook_counters: Default::default (), | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  |  | |||
|  | @ -558,7 +558,7 @@ pub mod executable { | |||
| 			name: opt.name.or (config_file.name).ok_or (anyhow::anyhow! ("`name` must be provided in command line or config file"))?, | ||||
| 			api_key: config_file.api_key, | ||||
| 			relay_url: opt.relay_url.or (config_file.relay_url).ok_or (anyhow::anyhow! ("`--relay-url` must be provided in command line or `relay_url` in config file"))?, | ||||
| 			file_server_root: opt.file_server_root.or (config_file.file_server_root).unwrap_or_else (|| PathBuf::from (".")), | ||||
| 			file_server_root: opt.file_server_root.or (config_file.file_server_root).unwrap_or_else (PathBuf::new), | ||||
| 			file_server_roots, | ||||
| 			throttle_upload: opt.throttle_upload, | ||||
| 			allow_any_client: true, | ||||
|  |  | |||
|  | @ -1,11 +0,0 @@ | |||
| [package] | ||||
| name = "udp_over_tcp" | ||||
| version = "0.1.0" | ||||
| edition = "2021" | ||||
| 
 | ||||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||||
| 
 | ||||
| [dependencies] | ||||
| anyhow = "1.0.66" | ||||
| tokio = { version = "1.23.0", features = ["full"] } | ||||
| tracing = "0.1.37" | ||||
|  | @ -1,59 +0,0 @@ | |||
| use std::{ | ||||
| 	net::{ | ||||
| 		Ipv4Addr, | ||||
| 		SocketAddr, | ||||
| 		SocketAddrV4, | ||||
| 	}, | ||||
| 	sync::Arc, | ||||
| }; | ||||
| 
 | ||||
| use tokio::{ | ||||
| 	net::{ | ||||
| 		TcpSocket, | ||||
| 		TcpStream, | ||||
| 		UdpSocket, | ||||
| 	}, | ||||
| 	spawn, | ||||
| }; | ||||
| 
 | ||||
| use crate::loops; | ||||
| 
 | ||||
| pub struct Config { | ||||
| 	pub udp_eph_port: u16, | ||||
| 	pub udp_local_server_port: u16, | ||||
| 	pub tcp_server_addr: SocketAddr, | ||||
| } | ||||
| 
 | ||||
| pub async fn main (cfg: Config) -> anyhow::Result <()> { | ||||
| 	let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, cfg.udp_local_server_port)).await?; | ||||
| 	udp_sock.connect ((Ipv4Addr::LOCALHOST, cfg.udp_eph_port)).await?; | ||||
| 	
 | ||||
| 	let tcp_sock = TcpSocket::new_v4 ()?; | ||||
| 	let tcp_conn = tcp_sock.connect (cfg.tcp_server_addr).await?; | ||||
| 	
 | ||||
| 	main_with_sockets (udp_sock, tcp_conn).await | ||||
| } | ||||
| 
 | ||||
| pub async fn main_with_sockets (udp_sock: UdpSocket, tcp_conn: TcpStream) -> anyhow::Result <()> { | ||||
| 	let (tcp_read, tcp_write) = tcp_conn.into_split (); | ||||
| 	
 | ||||
| 	let tx_task; | ||||
| 	let rx_task; | ||||
| 	
 | ||||
| 	{ | ||||
| 		let udp_sock = Arc::new (udp_sock); | ||||
| 		rx_task = spawn (loops::rx (Arc::clone (&udp_sock), tcp_read)); | ||||
| 		tx_task = spawn (loops::tx (Arc::clone (&udp_sock), tcp_write)); | ||||
| 	} | ||||
| 	
 | ||||
| 	tokio::select! { | ||||
| 		_val = tx_task => { | ||||
| 			println! ("client_main: tx_task exited, exiting"); | ||||
| 		} | ||||
| 		_val = rx_task => { | ||||
| 			println! ("client_main: rx_task exited, exiting"); | ||||
| 		} | ||||
| 	} | ||||
| 	
 | ||||
| 	Ok (()) | ||||
| } | ||||
|  | @ -1,4 +0,0 @@ | |||
| pub mod client; | ||||
| pub mod server; | ||||
| 
 | ||||
| mod loops; | ||||
|  | @ -1,84 +0,0 @@ | |||
| use std::{ | ||||
| 	sync::Arc, | ||||
| }; | ||||
| 
 | ||||
| use anyhow::bail; | ||||
| use tokio::{ | ||||
| 	io::{ | ||||
| 		AsyncReadExt, | ||||
| 		AsyncWriteExt, | ||||
| 	}, | ||||
| 	net::{ | ||||
| 		UdpSocket, | ||||
| 		tcp, | ||||
| 	} | ||||
| }; | ||||
| 
 | ||||
| pub async fn rx ( | ||||
| 	udp_sock: Arc <UdpSocket>, | ||||
| 	mut tcp_read: tcp::OwnedReadHalf, | ||||
| ) -> anyhow::Result <()> { | ||||
| 	for i in 0u64.. { | ||||
| 		// Optimizes down to a bitwise AND
 | ||||
| 		if i % 8_192 == 0 { | ||||
| 			tracing::trace! ("rx loop"); | ||||
| 		} | ||||
| 		
 | ||||
| 		let mut tag = [0u8, 0, 0, 0]; | ||||
| 		let bytes_read = tcp_read.read (&mut tag).await?; | ||||
| 		if bytes_read != 4 { | ||||
| 			bail! ("loops::rx: Couldn't read 4 bytes for tag"); | ||||
| 		} | ||||
| 		if tag != [1, 0, 0, 0] { | ||||
| 			bail! ("loops::rx: unexpected tag in framing"); | ||||
| 		} | ||||
| 		
 | ||||
| 		let mut length = [0u8, 0, 0, 0]; | ||||
| 		let bytes_read = tcp_read.read (&mut length).await?; | ||||
| 		if bytes_read != 4 { | ||||
| 			bail! ("loops::rx: Couldn't read 4 bytes for tag"); | ||||
| 		} | ||||
| 		
 | ||||
| 		let length = usize::try_from (u32::from_le_bytes (length))?; | ||||
| 		if length >= 8_192 { | ||||
| 			bail! ("loops::rx: Length too big for UDP packets"); | ||||
| 		} | ||||
| 		
 | ||||
| 		let mut buf = vec! [0u8; length]; | ||||
| 		let bytes_read = tcp_read.read_exact (&mut buf).await?; | ||||
| 		if length != bytes_read { | ||||
| 			bail! ("loops::rx: read_exact failed"); | ||||
| 		} | ||||
| 		buf.truncate (bytes_read); | ||||
| 		
 | ||||
| 		udp_sock.send (&buf).await?; | ||||
| 	} | ||||
| 	
 | ||||
| 	Ok (()) | ||||
| } | ||||
| 
 | ||||
| pub async fn tx ( | ||||
| 	udp_sock: Arc <UdpSocket>, | ||||
| 	mut tcp_write: tcp::OwnedWriteHalf, | ||||
| ) -> anyhow::Result <()> 
 | ||||
| { | ||||
| 	for i in 0u64.. { | ||||
| 		// Optimizes down to a bitwise AND
 | ||||
| 		if i % 8_192 == 0 { | ||||
| 			tracing::trace! ("tx loop"); | ||||
| 		} | ||||
| 		
 | ||||
| 		let mut buf = vec! [0u8; 8_192]; | ||||
| 		let bytes_read = udp_sock.recv (&mut buf).await?; | ||||
| 		buf.truncate (bytes_read); | ||||
| 		
 | ||||
| 		let tag = [1u8, 0, 0, 0]; | ||||
| 		let length = u32::try_from (bytes_read)?.to_le_bytes (); | ||||
| 		
 | ||||
| 		tcp_write.write_all (&tag).await?; | ||||
| 		tcp_write.write_all (&length).await?; | ||||
| 		tcp_write.write_all (&buf).await?; | ||||
| 	} | ||||
| 	
 | ||||
| 	Ok (()) | ||||
| } | ||||
|  | @ -1,76 +0,0 @@ | |||
| /* | ||||
| 
 | ||||
| To test manually, run this 3 commands: | ||||
| 
 | ||||
| - Terminal A: `nc -l -u -p 9502` | ||||
| - Terminal B: `cargo run -p udp_over_tcp` | ||||
| - Terminal C: `nc -p 9500 -u 127.0.0.1 9501` | ||||
| 
 | ||||
| Terminals A and C should be connected through the UDP-over-TCP connection | ||||
| 
 | ||||
| */ | ||||
| 
 | ||||
| use std::{ | ||||
| 	net::{ | ||||
| 		Ipv4Addr, | ||||
| 		SocketAddr, | ||||
| 		SocketAddrV4, | ||||
| 	}, | ||||
| }; | ||||
| 
 | ||||
| use tokio::{ | ||||
| 	runtime, | ||||
| 	spawn, | ||||
| }; | ||||
| 
 | ||||
| mod client; | ||||
| mod loops; | ||||
| mod server; | ||||
| 
 | ||||
| // The ephemeral UDP port that the PTTH_QUIC client will bind
 | ||||
| const PORT_0: u16 = 9500; | ||||
| 
 | ||||
| // The well-known UDP port that the UDP-over-TCP client will bind
 | ||||
| // The PTTH_QUIC client must connect to this instead of the real relay address
 | ||||
| const PORT_1: u16 = 9501; | ||||
| 
 | ||||
| // The well-known TCP port that the UDP-over-TCP server will bind
 | ||||
| const PORT_2: u16 = 9502; | ||||
| 
 | ||||
| // The well-known UDP port that the PTTH_QUIC relay will bind
 | ||||
| const PORT_3: u16 = 9502; | ||||
| 
 | ||||
| fn main () -> anyhow::Result <()> { | ||||
| 	let rt = runtime::Runtime::new ()?; | ||||
| 	
 | ||||
| 	rt.block_on (async_main ())?; | ||||
| 	
 | ||||
| 	Ok (()) | ||||
| } | ||||
| 
 | ||||
| async fn async_main () -> anyhow::Result <()> { | ||||
| 	let server_cfg = server::Config { | ||||
| 		tcp_port: PORT_2, | ||||
| 		udp_port: PORT_3, | ||||
| 	}; | ||||
| 	let server_app = server::Listener::new (server_cfg).await?; | ||||
| 	let server_task = spawn (server_app.run ()); | ||||
| 	
 | ||||
| 	let client_cfg = client::Config { | ||||
| 		udp_eph_port: PORT_0, | ||||
| 		udp_local_server_port: PORT_1, | ||||
| 		tcp_server_addr: SocketAddr::V4 (SocketAddrV4::new (Ipv4Addr::LOCALHOST, PORT_2)), | ||||
| 	}; | ||||
| 	let client_task = spawn (client::main (client_cfg)); | ||||
| 	
 | ||||
| 	tokio::select! { | ||||
| 		_val = client_task => { | ||||
| 			println! ("Client exited, exiting"); | ||||
| 		}, | ||||
| 		_val = server_task => { | ||||
| 			println! ("Server exited, exiting"); | ||||
| 		}, | ||||
| 	} | ||||
| 	
 | ||||
| 	Ok (()) | ||||
| } | ||||
|  | @ -1,88 +0,0 @@ | |||
| use std::{ | ||||
| 	net::{ | ||||
| 		Ipv4Addr, | ||||
| 		SocketAddrV4, | ||||
| 	}, | ||||
| 	sync::Arc, | ||||
| }; | ||||
| 
 | ||||
| use tokio::{ | ||||
| 	net::{ | ||||
| 		TcpListener, | ||||
| 		TcpStream, | ||||
| 		UdpSocket, | ||||
| 	}, | ||||
| 	spawn, | ||||
| }; | ||||
| 
 | ||||
| use crate::loops; | ||||
| 
 | ||||
| #[derive (Clone)] | ||||
| pub struct Config { | ||||
| 	/// The well-known TCP port that the UDP-over-TCP server will bind
 | ||||
| 	pub tcp_port: u16, | ||||
| 	
 | ||||
| 	/// The well-known UDP port that the PTTH_QUIC relay will bind
 | ||||
| 	pub udp_port: u16, | ||||
| } | ||||
| 
 | ||||
| pub struct Listener { | ||||
| 	cfg: Config, | ||||
| 	tcp_listener: TcpListener, | ||||
| } | ||||
| 
 | ||||
| impl Listener { | ||||
| 	pub async fn new (cfg: Config) -> anyhow::Result <Self> { | ||||
| 		let tcp_listener = TcpListener::bind ((Ipv4Addr::UNSPECIFIED, cfg.tcp_port)).await?; | ||||
| 		
 | ||||
| 		Ok (Self { | ||||
| 			cfg, | ||||
| 			tcp_listener, | ||||
| 		}) | ||||
| 	} | ||||
| 	
 | ||||
| 	pub fn tcp_port (&self) -> anyhow::Result <u16> { | ||||
| 		Ok (self.tcp_listener.local_addr ()?.port ()) | ||||
| 	} | ||||
| 	
 | ||||
| 	pub async fn run (self) -> anyhow::Result <()> { | ||||
| 		let Self { | ||||
| 			cfg, | ||||
| 			tcp_listener, | ||||
| 		} = self; | ||||
| 		
 | ||||
| 		loop { 
 | ||||
| 			let (conn, _peer_addr) = tcp_listener.accept ().await?; | ||||
| 			
 | ||||
| 			let cfg = cfg.clone (); | ||||
| 			spawn (handle_connection (cfg, conn)); | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| async fn handle_connection (cfg: Config, conn: TcpStream) -> anyhow::Result <()> { | ||||
| 	let udp_sock = UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?; | ||||
| 	udp_sock.connect ((Ipv4Addr::LOCALHOST, cfg.udp_port)).await?; | ||||
| 	
 | ||||
| 	let (tcp_read, tcp_write) = conn.into_split (); | ||||
| 	
 | ||||
| 	let rx_task; | ||||
| 	let tx_task; | ||||
| 	
 | ||||
| 	{ | ||||
| 		let udp_sock = Arc::new (udp_sock); | ||||
| 		rx_task = spawn (loops::rx (Arc::clone (&udp_sock), tcp_read)); | ||||
| 		tx_task = spawn (loops::tx (Arc::clone (&udp_sock), tcp_write)); | ||||
| 	} | ||||
| 	
 | ||||
| 	tokio::select! { | ||||
| 		_val = tx_task => { | ||||
| 			println! ("server_handle_connection: tx_task exited, exiting"); | ||||
| 		} | ||||
| 		_val = rx_task => { | ||||
| 			println! ("server_handle_connection: rx_task exited, exiting"); | ||||
| 		} | ||||
| 	} | ||||
| 	
 | ||||
| 	Ok (()) | ||||
| } | ||||
		Loading…
	
		Reference in New Issue