♻️ refactor: extract `http_listen` fn
							parent
							
								
									d96bf801c6
								
							
						
					
					
						commit
						2656d16264
					
				| 
						 | 
				
			
			@ -63,9 +63,15 @@ pub enum ServerError {
 | 
			
		|||
	#[error ("Can't build HTTP client")]
 | 
			
		||||
	CantBuildHttpClient (reqwest::Error),
 | 
			
		||||
	
 | 
			
		||||
	#[error ("Can't get response from server in Step 3")]
 | 
			
		||||
	Step3Response (reqwest::Error),
 | 
			
		||||
	
 | 
			
		||||
	#[error ("Can't collect non-200 error response body in Step 3")]
 | 
			
		||||
	Step3CollectBody (reqwest::Error),
 | 
			
		||||
	
 | 
			
		||||
	#[error ("Step 3 unknown error")]
 | 
			
		||||
	Step3Unknown,
 | 
			
		||||
	
 | 
			
		||||
	#[error ("Can't collect wrapped requests in Step 3")]
 | 
			
		||||
	CantCollectWrappedRequests (reqwest::Error),
 | 
			
		||||
	
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -32,7 +32,6 @@ use tokio::{
 | 
			
		|||
		channel,
 | 
			
		||||
	},
 | 
			
		||||
};
 | 
			
		||||
use tracing::instrument;
 | 
			
		||||
 | 
			
		||||
use ptth_core::{
 | 
			
		||||
	http_serde::{
 | 
			
		||||
| 
						 | 
				
			
			@ -327,7 +326,7 @@ async fn stream_file (
 | 
			
		|||
		
 | 
			
		||||
		bytes_left -= bytes_read_64;
 | 
			
		||||
		if bytes_left == 0 {
 | 
			
		||||
			debug! ("Finished");
 | 
			
		||||
			// debug! ("Finished");
 | 
			
		||||
			break;
 | 
			
		||||
		}
 | 
			
		||||
		
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -300,7 +300,7 @@ pub async fn serve_all (
 | 
			
		|||
	
 | 
			
		||||
	let full_path = root.join (path);
 | 
			
		||||
	
 | 
			
		||||
	debug! ("full_path = {:?}", full_path);
 | 
			
		||||
	trace! ("full_path = {:?}", full_path);
 | 
			
		||||
	
 | 
			
		||||
	if let Some (hidden_path) = hidden_path {
 | 
			
		||||
		if full_path == hidden_path {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -153,7 +153,7 @@ async fn handle_one_req (
 | 
			
		|||
 | 
			
		||||
async fn handle_requests <F, H, SH> (
 | 
			
		||||
	state: &Arc <State>,
 | 
			
		||||
	req_resp: reqwest::Response,
 | 
			
		||||
	wrapped_reqs: Vec <http_serde::WrappedRequest>,
 | 
			
		||||
	spawn_handler: &mut SH,
 | 
			
		||||
) -> Result <(), ServerError>
 | 
			
		||||
where 
 | 
			
		||||
| 
						 | 
				
			
			@ -163,18 +163,6 @@ SH: Send + FnMut () -> H
 | 
			
		|||
{
 | 
			
		||||
	//println! ("Step 1");
 | 
			
		||||
	
 | 
			
		||||
	let body = req_resp.bytes ().await.map_err (ServerError::CantCollectWrappedRequests)?;
 | 
			
		||||
	let wrapped_reqs: Vec <http_serde::WrappedRequest> = match rmp_serde::from_read_ref (&body)
 | 
			
		||||
	{
 | 
			
		||||
		Ok (x) => x,
 | 
			
		||||
		Err (e) => {
 | 
			
		||||
			error! ("Can't parse wrapped requests: {:?}", e);
 | 
			
		||||
			return Err (ServerError::CantParseWrappedRequests (e));
 | 
			
		||||
		},
 | 
			
		||||
	};
 | 
			
		||||
	
 | 
			
		||||
	debug! ("Unwrapped {} requests", wrapped_reqs.len ());
 | 
			
		||||
	
 | 
			
		||||
	for wrapped_req in wrapped_reqs {
 | 
			
		||||
		let state = Arc::clone (&state);
 | 
			
		||||
		let handler = spawn_handler ();
 | 
			
		||||
| 
						 | 
				
			
			@ -386,6 +374,35 @@ impl State {
 | 
			
		|||
		
 | 
			
		||||
		Ok (state)
 | 
			
		||||
	}
 | 
			
		||||
	
 | 
			
		||||
	async fn http_listen (
 | 
			
		||||
		state: &Arc <Self>,
 | 
			
		||||
	) -> Result <Vec <http_serde::WrappedRequest>, ServerError> 
 | 
			
		||||
	{
 | 
			
		||||
		use http::status::StatusCode;
 | 
			
		||||
		
 | 
			
		||||
		let req_resp = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.config.name))
 | 
			
		||||
		.timeout (Duration::from_secs (30))
 | 
			
		||||
		.send ().await.map_err (ServerError::Step3Response)?;
 | 
			
		||||
		
 | 
			
		||||
		if req_resp.status () == StatusCode::NO_CONTENT {
 | 
			
		||||
			return Ok (Vec::new ());
 | 
			
		||||
		}
 | 
			
		||||
		
 | 
			
		||||
		if req_resp.status () != StatusCode::OK {
 | 
			
		||||
			error! ("{}", req_resp.status ());
 | 
			
		||||
			let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?;
 | 
			
		||||
			let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?;
 | 
			
		||||
			error! ("{}", body);
 | 
			
		||||
			return Err (ServerError::Step3Unknown);
 | 
			
		||||
		}
 | 
			
		||||
		
 | 
			
		||||
		let body = req_resp.bytes ().await.map_err (ServerError::CantCollectWrappedRequests)?;
 | 
			
		||||
		let wrapped_reqs: Vec <http_serde::WrappedRequest> = rmp_serde::from_read_ref (&body)
 | 
			
		||||
		.map_err (ServerError::CantParseWrappedRequests)?;
 | 
			
		||||
		
 | 
			
		||||
		Ok (wrapped_reqs)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pub async fn run <F, H, SH> (
 | 
			
		||||
		state: &Arc <Self>,
 | 
			
		||||
| 
						 | 
				
			
			@ -397,12 +414,10 @@ impl State {
 | 
			
		|||
	H: Send + 'static + FnOnce (http_serde::RequestParts) -> F,
 | 
			
		||||
	SH: Send + FnMut () -> H
 | 
			
		||||
	{
 | 
			
		||||
		use http::status::StatusCode;
 | 
			
		||||
		
 | 
			
		||||
		let mut backoff_delay = 0;
 | 
			
		||||
		let mut shutdown_oneshot = shutdown_oneshot.fuse ();
 | 
			
		||||
		
 | 
			
		||||
		loop {
 | 
			
		||||
		for i in 0u64.. {
 | 
			
		||||
			// TODO: Extract loop body to function?
 | 
			
		||||
			
 | 
			
		||||
			if backoff_delay > 0 {
 | 
			
		||||
| 
						 | 
				
			
			@ -418,61 +433,37 @@ impl State {
 | 
			
		|||
				}
 | 
			
		||||
			}
 | 
			
		||||
			
 | 
			
		||||
			debug! ("http_listen");
 | 
			
		||||
			debug! ("http_listen {}...", i);
 | 
			
		||||
			
 | 
			
		||||
			let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.config.name))
 | 
			
		||||
			.timeout (Duration::from_secs (30))
 | 
			
		||||
			.send ();
 | 
			
		||||
			let http_listen_fut = Self::http_listen (state);
 | 
			
		||||
			
 | 
			
		||||
			let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500);
 | 
			
		||||
			
 | 
			
		||||
			let req_req = futures::select! {
 | 
			
		||||
				r = req_req.fuse () => r,
 | 
			
		||||
			let http_listen = futures::select! {
 | 
			
		||||
				r = http_listen_fut.fuse () => r,
 | 
			
		||||
				_ = shutdown_oneshot => {
 | 
			
		||||
					info! ("Received graceful shutdown");
 | 
			
		||||
					break;
 | 
			
		||||
				},
 | 
			
		||||
			};
 | 
			
		||||
			
 | 
			
		||||
			let req_resp = match req_req {
 | 
			
		||||
			let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500);
 | 
			
		||||
			
 | 
			
		||||
			let reqs = match http_listen {
 | 
			
		||||
				Err (e) => {
 | 
			
		||||
					if e.is_timeout () {
 | 
			
		||||
						error! ("Client-side timeout. Is an overly-aggressive firewall closing long-lived connections? Is the network flakey?");
 | 
			
		||||
					}
 | 
			
		||||
					else {
 | 
			
		||||
						error! ("Err: {:?}", e);
 | 
			
		||||
						if backoff_delay != err_backoff_delay {
 | 
			
		||||
							error! ("Non-timeout issue, increasing backoff_delay");
 | 
			
		||||
							backoff_delay = err_backoff_delay;
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
					backoff_delay = err_backoff_delay;
 | 
			
		||||
					error! ("http_listen {} error, backing off... {:?}", i, e);
 | 
			
		||||
					continue;
 | 
			
		||||
				},
 | 
			
		||||
				Ok (x) => x,
 | 
			
		||||
			};
 | 
			
		||||
			
 | 
			
		||||
			if req_resp.status () == StatusCode::NO_CONTENT {
 | 
			
		||||
				debug! ("http_listen long poll timed out on the server, good.");
 | 
			
		||||
				continue;
 | 
			
		||||
			}
 | 
			
		||||
			else if req_resp.status () != StatusCode::OK {
 | 
			
		||||
				error! ("{}", req_resp.status ());
 | 
			
		||||
				let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?;
 | 
			
		||||
				let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?;
 | 
			
		||||
				error! ("{}", body);
 | 
			
		||||
				if backoff_delay != err_backoff_delay {
 | 
			
		||||
					error! ("Non-timeout issue, increasing backoff_delay");
 | 
			
		||||
					backoff_delay = err_backoff_delay;
 | 
			
		||||
				}
 | 
			
		||||
				continue;
 | 
			
		||||
			}
 | 
			
		||||
			debug! ("http_listen {} unwrapped {} requests", i, reqs.len ());
 | 
			
		||||
			
 | 
			
		||||
			// Unpack the requests, spawn them into new tasks, then loop back
 | 
			
		||||
			// around.
 | 
			
		||||
			
 | 
			
		||||
			if handle_requests (
 | 
			
		||||
				&state, 
 | 
			
		||||
				req_resp,
 | 
			
		||||
				reqs,
 | 
			
		||||
				spawn_handler,
 | 
			
		||||
			).await.is_err () {
 | 
			
		||||
				backoff_delay = err_backoff_delay;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue