Compare commits

...

6 Commits

Author SHA1 Message Date
_ 80bd4abad3 🔊 change logs a bit 2022-02-06 20:03:27 +00:00
_ 6ea8b5c30f 🐛 bug: fix Cache-Control header so Nginx can cache my blog 2022-02-05 17:28:30 +00:00
_ e5d157e1af 🐛 bug: don't send Content-Length to relay if status code is 304 Not Modified
Thanks to Nginx for noticing this
2022-02-05 17:00:17 +00:00
_ 2656d16264 ♻️ refactor: extract `http_listen` fn 2022-02-05 16:52:56 +00:00
_ d96bf801c6 🔇 make logs quieter 2022-02-05 15:57:40 +00:00
_ c47ab6d282 🚧 wip: messing with cache parameters 2022-01-09 00:22:16 +00:00
4 changed files with 73 additions and 69 deletions

View File

@ -63,9 +63,15 @@ pub enum ServerError {
#[error ("Can't build HTTP client")] #[error ("Can't build HTTP client")]
CantBuildHttpClient (reqwest::Error), CantBuildHttpClient (reqwest::Error),
#[error ("Can't get response from server in Step 3")]
Step3Response (reqwest::Error),
#[error ("Can't collect non-200 error response body in Step 3")] #[error ("Can't collect non-200 error response body in Step 3")]
Step3CollectBody (reqwest::Error), Step3CollectBody (reqwest::Error),
#[error ("Step 3 unknown error")]
Step3Unknown,
#[error ("Can't collect wrapped requests in Step 3")] #[error ("Can't collect wrapped requests in Step 3")]
CantCollectWrappedRequests (reqwest::Error), CantCollectWrappedRequests (reqwest::Error),

View File

@ -32,7 +32,6 @@ use tokio::{
channel, channel,
}, },
}; };
use tracing::instrument;
use ptth_core::{ use ptth_core::{
http_serde::{ http_serde::{
@ -139,7 +138,7 @@ async fn serve_dir_json (
Ok (response) Ok (response)
} }
#[instrument (level = "debug", skip (f))] // #[instrument (level = "debug", skip (f))]
async fn serve_file ( async fn serve_file (
uri: &str, uri: &str,
mut f: File, mut f: File,
@ -166,11 +165,11 @@ async fn serve_file (
let (range, range_requested) = (range.range, range.range_requested); let (range, range_requested) = (range.range, range.range_requested);
info! ("Serving range {}-{}", range.start, range.end);
let content_length = range.end - range.start; let content_length = range.end - range.start;
let body = if decision.should_send_body { let body = if decision.should_send_body {
info! ("Sending range {}-{}", range.start, range.end);
let seek = SeekFrom::Start (range.start); let seek = SeekFrom::Start (range.start);
f.seek (seek).await?; f.seek (seek).await?;
@ -182,6 +181,8 @@ async fn serve_file (
Some (rx) Some (rx)
} }
else { else {
info! ("Not sending body");
None None
}; };
@ -198,11 +199,12 @@ async fn serve_file (
// - no-cache - Clients and the relay can store this, but should revalidate // - no-cache - Clients and the relay can store this, but should revalidate
// with the origin server (us) because only we can check if the file // with the origin server (us) because only we can check if the file
// changed on disk. // changed on disk.
// - max-age=0 - The file might change at any point during or after the // - max-age=5 - Caches can keep the file for 5 seconds. This is just enough
// request, so for proper invalidation, the client should immediately // to let a cache like Nginx or Varnish on a relay soak up some of a
// consider it stale. // slashdotting for us, but not so much that a low-traffic site would
// suffer from seeing stale data.
response.header ("cache-control".to_string (), b"no-cache,max-age=0".to_vec ()); response.header ("cache-control".to_string (), b"max-age=2".to_vec ());
if let Some (etag) = input.actual_etag { if let Some (etag) = input.actual_etag {
response.header ("etag".to_string (), etag); response.header ("etag".to_string (), etag);
}; };
@ -246,6 +248,7 @@ fn serve_file_decision (input: &ServeFileInput) -> ServeFileOutput
if let (Some (if_none_match), Some (actual_etag)) = (&input.if_none_match, &input.actual_etag) if let (Some (if_none_match), Some (actual_etag)) = (&input.if_none_match, &input.actual_etag)
{ {
if actual_etag == if_none_match { if actual_etag == if_none_match {
info! ("Not Modified");
return ServeFileOutput { return ServeFileOutput {
status_code: StatusCode::NotModified, status_code: StatusCode::NotModified,
should_send_body: false, should_send_body: false,
@ -325,7 +328,7 @@ async fn stream_file (
bytes_left -= bytes_read_64; bytes_left -= bytes_read_64;
if bytes_left == 0 { if bytes_left == 0 {
debug! ("Finished"); // debug! ("Finished");
break; break;
} }
@ -345,7 +348,7 @@ impl FileServer {
/// Passes a request to the internal file server logic. /// Passes a request to the internal file server logic.
/// Returns an HTTP response as HTML or JSON, depending on the request. /// Returns an HTTP response as HTML or JSON, depending on the request.
#[instrument (level = "debug", skip (self, headers))] // #[instrument (level = "debug", skip (self, headers))]
pub async fn serve_all ( pub async fn serve_all (
&self, &self,
method: Method, method: Method,

View File

@ -259,7 +259,7 @@ pub async fn serve_all (
use std::str::FromStr; use std::str::FromStr;
use Response::*; use Response::*;
info! ("Client requested {}", uri); trace! ("Client requested {}", uri);
let uri = http::Uri::from_str (uri).map_err (FileServerError::InvalidUri)?; let uri = http::Uri::from_str (uri).map_err (FileServerError::InvalidUri)?;
@ -300,7 +300,7 @@ pub async fn serve_all (
let full_path = root.join (path); let full_path = root.join (path);
debug! ("full_path = {:?}", full_path); trace! ("full_path = {:?}", full_path);
if let Some (hidden_path) = hidden_path { if let Some (hidden_path) = hidden_path {
if full_path == hidden_path { if full_path == hidden_path {

View File

@ -91,14 +91,18 @@ async fn handle_one_req (
response: http_serde::Response, response: http_serde::Response,
) -> Result <(), ServerError> ) -> Result <(), ServerError>
{ {
let url = format! ("{}/http_response/{}", state.config.relay_url, req_id);
let mut resp_req = state.client let mut resp_req = state.client
.post (&format! ("{}/http_response/{}", state.config.relay_url, req_id)) .post (&url)
.header (ptth_core::PTTH_MAGIC_HEADER, base64::encode (rmp_serde::to_vec (&response.parts).map_err (ServerError::MessagePackEncodeResponse)?)) .header (ptth_core::PTTH_MAGIC_HEADER, base64::encode (rmp_serde::to_vec (&response.parts).map_err (ServerError::MessagePackEncodeResponse)?))
.header ("X-PTTH-SERVER-NAME", &state.config.name); .header ("X-PTTH-SERVER-NAME", &state.config.name);
if let Some (length) = response.content_length { if response.parts.status_code != ptth_core::http_serde::StatusCode::NotModified {
resp_req = resp_req.header ("Content-Length", length.to_string ()); if let Some (length) = response.content_length {
resp_req = resp_req.header ("Content-Length", length.to_string ());
}
} }
if let Some (mut body) = response.body { if let Some (mut body) = response.body {
if state.config.throttle_upload { if state.config.throttle_upload {
// Spawn another task to throttle the chunks // Spawn another task to throttle the chunks
@ -129,14 +133,14 @@ async fn handle_one_req (
let req = resp_req.build ().map_err (ServerError::Step5Responding)?; let req = resp_req.build ().map_err (ServerError::Step5Responding)?;
debug! ("{:?}", req.headers ()); trace! ("{:?}", req.headers ());
//println! ("Step 6"); //println! ("Step 6");
match state.client.execute (req).await { match state.client.execute (req).await {
Ok (r) => { Ok (r) => {
let status = r.status (); let status = r.status ();
let text = r.text ().await.map_err (ServerError::Step7AfterResponse)?; let text = r.text ().await.map_err (ServerError::Step7AfterResponse)?;
debug! ("{:?} {:?}", status, text); debug! ("http_response {} {:?} {:?}", req_id, status, text);
}, },
Err (e) => { Err (e) => {
if e.is_request () { if e.is_request () {
@ -152,7 +156,7 @@ async fn handle_one_req (
async fn handle_requests <F, H, SH> ( async fn handle_requests <F, H, SH> (
state: &Arc <State>, state: &Arc <State>,
req_resp: reqwest::Response, wrapped_reqs: Vec <http_serde::WrappedRequest>,
spawn_handler: &mut SH, spawn_handler: &mut SH,
) -> Result <(), ServerError> ) -> Result <(), ServerError>
where where
@ -162,18 +166,6 @@ SH: Send + FnMut () -> H
{ {
//println! ("Step 1"); //println! ("Step 1");
let body = req_resp.bytes ().await.map_err (ServerError::CantCollectWrappedRequests)?;
let wrapped_reqs: Vec <http_serde::WrappedRequest> = match rmp_serde::from_read_ref (&body)
{
Ok (x) => x,
Err (e) => {
error! ("Can't parse wrapped requests: {:?}", e);
return Err (ServerError::CantParseWrappedRequests (e));
},
};
debug! ("Unwrapped {} requests", wrapped_reqs.len ());
for wrapped_req in wrapped_reqs { for wrapped_req in wrapped_reqs {
let state = Arc::clone (&state); let state = Arc::clone (&state);
let handler = spawn_handler (); let handler = spawn_handler ();
@ -183,7 +175,7 @@ SH: Send + FnMut () -> H
tokio::spawn (async move { tokio::spawn (async move {
let (req_id, parts) = (wrapped_req.id, wrapped_req.req); let (req_id, parts) = (wrapped_req.id, wrapped_req.req);
debug! ("Handling request {}", req_id); info! ("Req {} {}", req_id, parts.uri);
let f = handler (parts); let f = handler (parts);
let response = f.await?; let response = f.await?;
@ -385,6 +377,35 @@ impl State {
Ok (state) Ok (state)
} }
async fn http_listen (
state: &Arc <Self>,
) -> Result <Vec <http_serde::WrappedRequest>, ServerError>
{
use http::status::StatusCode;
let req_resp = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.config.name))
.timeout (Duration::from_secs (30))
.send ().await.map_err (ServerError::Step3Response)?;
if req_resp.status () == StatusCode::NO_CONTENT {
return Ok (Vec::new ());
}
if req_resp.status () != StatusCode::OK {
error! ("{}", req_resp.status ());
let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?;
let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?;
error! ("{}", body);
return Err (ServerError::Step3Unknown);
}
let body = req_resp.bytes ().await.map_err (ServerError::CantCollectWrappedRequests)?;
let wrapped_reqs: Vec <http_serde::WrappedRequest> = rmp_serde::from_read_ref (&body)
.map_err (ServerError::CantParseWrappedRequests)?;
Ok (wrapped_reqs)
}
pub async fn run <F, H, SH> ( pub async fn run <F, H, SH> (
state: &Arc <Self>, state: &Arc <Self>,
@ -396,12 +417,10 @@ impl State {
H: Send + 'static + FnOnce (http_serde::RequestParts) -> F, H: Send + 'static + FnOnce (http_serde::RequestParts) -> F,
SH: Send + FnMut () -> H SH: Send + FnMut () -> H
{ {
use http::status::StatusCode;
let mut backoff_delay = 0; let mut backoff_delay = 0;
let mut shutdown_oneshot = shutdown_oneshot.fuse (); let mut shutdown_oneshot = shutdown_oneshot.fuse ();
loop { for i in 0u64.. {
// TODO: Extract loop body to function? // TODO: Extract loop body to function?
if backoff_delay > 0 { if backoff_delay > 0 {
@ -417,61 +436,37 @@ impl State {
} }
} }
debug! ("http_listen"); debug! ("http_listen {}...", i);
let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.config.name)) let http_listen_fut = Self::http_listen (state);
.timeout (Duration::from_secs (30))
.send ();
let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500); let http_listen = futures::select! {
r = http_listen_fut.fuse () => r,
let req_req = futures::select! {
r = req_req.fuse () => r,
_ = shutdown_oneshot => { _ = shutdown_oneshot => {
info! ("Received graceful shutdown"); info! ("Received graceful shutdown");
break; break;
}, },
}; };
let req_resp = match req_req { let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500);
let reqs = match http_listen {
Err (e) => { Err (e) => {
if e.is_timeout () { backoff_delay = err_backoff_delay;
error! ("Client-side timeout. Is an overly-aggressive firewall closing long-lived connections? Is the network flakey?"); error! ("http_listen {} error, backing off... {:?}", i, e);
}
else {
error! ("Err: {:?}", e);
if backoff_delay != err_backoff_delay {
error! ("Non-timeout issue, increasing backoff_delay");
backoff_delay = err_backoff_delay;
}
}
continue; continue;
}, },
Ok (x) => x, Ok (x) => x,
}; };
if req_resp.status () == StatusCode::NO_CONTENT { debug! ("http_listen {} unwrapped {} requests", i, reqs.len ());
debug! ("http_listen long poll timed out on the server, good.");
continue;
}
else if req_resp.status () != StatusCode::OK {
error! ("{}", req_resp.status ());
let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?;
let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?;
error! ("{}", body);
if backoff_delay != err_backoff_delay {
error! ("Non-timeout issue, increasing backoff_delay");
backoff_delay = err_backoff_delay;
}
continue;
}
// Unpack the requests, spawn them into new tasks, then loop back // Unpack the requests, spawn them into new tasks, then loop back
// around. // around.
if handle_requests ( if handle_requests (
&state, &state,
req_resp, reqs,
spawn_handler, spawn_handler,
).await.is_err () { ).await.is_err () {
backoff_delay = err_backoff_delay; backoff_delay = err_backoff_delay;