Compare commits
No commits in common. "80bd4abad3eec0a03644005ff8ca93ccadc1624f" and "b64214233236917bb1b525043fd4812793a49fd1" have entirely different histories.
80bd4abad3
...
b642142332
|
@ -63,15 +63,9 @@ pub enum ServerError {
|
||||||
#[error ("Can't build HTTP client")]
|
#[error ("Can't build HTTP client")]
|
||||||
CantBuildHttpClient (reqwest::Error),
|
CantBuildHttpClient (reqwest::Error),
|
||||||
|
|
||||||
#[error ("Can't get response from server in Step 3")]
|
|
||||||
Step3Response (reqwest::Error),
|
|
||||||
|
|
||||||
#[error ("Can't collect non-200 error response body in Step 3")]
|
#[error ("Can't collect non-200 error response body in Step 3")]
|
||||||
Step3CollectBody (reqwest::Error),
|
Step3CollectBody (reqwest::Error),
|
||||||
|
|
||||||
#[error ("Step 3 unknown error")]
|
|
||||||
Step3Unknown,
|
|
||||||
|
|
||||||
#[error ("Can't collect wrapped requests in Step 3")]
|
#[error ("Can't collect wrapped requests in Step 3")]
|
||||||
CantCollectWrappedRequests (reqwest::Error),
|
CantCollectWrappedRequests (reqwest::Error),
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ use tokio::{
|
||||||
channel,
|
channel,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
use tracing::instrument;
|
||||||
|
|
||||||
use ptth_core::{
|
use ptth_core::{
|
||||||
http_serde::{
|
http_serde::{
|
||||||
|
@ -138,7 +139,7 @@ async fn serve_dir_json (
|
||||||
Ok (response)
|
Ok (response)
|
||||||
}
|
}
|
||||||
|
|
||||||
// #[instrument (level = "debug", skip (f))]
|
#[instrument (level = "debug", skip (f))]
|
||||||
async fn serve_file (
|
async fn serve_file (
|
||||||
uri: &str,
|
uri: &str,
|
||||||
mut f: File,
|
mut f: File,
|
||||||
|
@ -165,11 +166,11 @@ async fn serve_file (
|
||||||
|
|
||||||
let (range, range_requested) = (range.range, range.range_requested);
|
let (range, range_requested) = (range.range, range.range_requested);
|
||||||
|
|
||||||
|
info! ("Serving range {}-{}", range.start, range.end);
|
||||||
|
|
||||||
let content_length = range.end - range.start;
|
let content_length = range.end - range.start;
|
||||||
|
|
||||||
let body = if decision.should_send_body {
|
let body = if decision.should_send_body {
|
||||||
info! ("Sending range {}-{}", range.start, range.end);
|
|
||||||
|
|
||||||
let seek = SeekFrom::Start (range.start);
|
let seek = SeekFrom::Start (range.start);
|
||||||
f.seek (seek).await?;
|
f.seek (seek).await?;
|
||||||
|
|
||||||
|
@ -181,8 +182,6 @@ async fn serve_file (
|
||||||
Some (rx)
|
Some (rx)
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
info! ("Not sending body");
|
|
||||||
|
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -199,12 +198,11 @@ async fn serve_file (
|
||||||
// - no-cache - Clients and the relay can store this, but should revalidate
|
// - no-cache - Clients and the relay can store this, but should revalidate
|
||||||
// with the origin server (us) because only we can check if the file
|
// with the origin server (us) because only we can check if the file
|
||||||
// changed on disk.
|
// changed on disk.
|
||||||
// - max-age=5 - Caches can keep the file for 5 seconds. This is just enough
|
// - max-age=0 - The file might change at any point during or after the
|
||||||
// to let a cache like Nginx or Varnish on a relay soak up some of a
|
// request, so for proper invalidation, the client should immediately
|
||||||
// slashdotting for us, but not so much that a low-traffic site would
|
// consider it stale.
|
||||||
// suffer from seeing stale data.
|
|
||||||
|
|
||||||
response.header ("cache-control".to_string (), b"max-age=2".to_vec ());
|
response.header ("cache-control".to_string (), b"no-cache,max-age=0".to_vec ());
|
||||||
if let Some (etag) = input.actual_etag {
|
if let Some (etag) = input.actual_etag {
|
||||||
response.header ("etag".to_string (), etag);
|
response.header ("etag".to_string (), etag);
|
||||||
};
|
};
|
||||||
|
@ -248,7 +246,6 @@ fn serve_file_decision (input: &ServeFileInput) -> ServeFileOutput
|
||||||
if let (Some (if_none_match), Some (actual_etag)) = (&input.if_none_match, &input.actual_etag)
|
if let (Some (if_none_match), Some (actual_etag)) = (&input.if_none_match, &input.actual_etag)
|
||||||
{
|
{
|
||||||
if actual_etag == if_none_match {
|
if actual_etag == if_none_match {
|
||||||
info! ("Not Modified");
|
|
||||||
return ServeFileOutput {
|
return ServeFileOutput {
|
||||||
status_code: StatusCode::NotModified,
|
status_code: StatusCode::NotModified,
|
||||||
should_send_body: false,
|
should_send_body: false,
|
||||||
|
@ -328,7 +325,7 @@ async fn stream_file (
|
||||||
|
|
||||||
bytes_left -= bytes_read_64;
|
bytes_left -= bytes_read_64;
|
||||||
if bytes_left == 0 {
|
if bytes_left == 0 {
|
||||||
// debug! ("Finished");
|
debug! ("Finished");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -348,7 +345,7 @@ impl FileServer {
|
||||||
/// Passes a request to the internal file server logic.
|
/// Passes a request to the internal file server logic.
|
||||||
/// Returns an HTTP response as HTML or JSON, depending on the request.
|
/// Returns an HTTP response as HTML or JSON, depending on the request.
|
||||||
|
|
||||||
// #[instrument (level = "debug", skip (self, headers))]
|
#[instrument (level = "debug", skip (self, headers))]
|
||||||
pub async fn serve_all (
|
pub async fn serve_all (
|
||||||
&self,
|
&self,
|
||||||
method: Method,
|
method: Method,
|
||||||
|
|
|
@ -259,7 +259,7 @@ pub async fn serve_all (
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use Response::*;
|
use Response::*;
|
||||||
|
|
||||||
trace! ("Client requested {}", uri);
|
info! ("Client requested {}", uri);
|
||||||
|
|
||||||
let uri = http::Uri::from_str (uri).map_err (FileServerError::InvalidUri)?;
|
let uri = http::Uri::from_str (uri).map_err (FileServerError::InvalidUri)?;
|
||||||
|
|
||||||
|
@ -300,7 +300,7 @@ pub async fn serve_all (
|
||||||
|
|
||||||
let full_path = root.join (path);
|
let full_path = root.join (path);
|
||||||
|
|
||||||
trace! ("full_path = {:?}", full_path);
|
debug! ("full_path = {:?}", full_path);
|
||||||
|
|
||||||
if let Some (hidden_path) = hidden_path {
|
if let Some (hidden_path) = hidden_path {
|
||||||
if full_path == hidden_path {
|
if full_path == hidden_path {
|
||||||
|
|
|
@ -91,18 +91,14 @@ async fn handle_one_req (
|
||||||
response: http_serde::Response,
|
response: http_serde::Response,
|
||||||
) -> Result <(), ServerError>
|
) -> Result <(), ServerError>
|
||||||
{
|
{
|
||||||
let url = format! ("{}/http_response/{}", state.config.relay_url, req_id);
|
|
||||||
let mut resp_req = state.client
|
let mut resp_req = state.client
|
||||||
.post (&url)
|
.post (&format! ("{}/http_response/{}", state.config.relay_url, req_id))
|
||||||
.header (ptth_core::PTTH_MAGIC_HEADER, base64::encode (rmp_serde::to_vec (&response.parts).map_err (ServerError::MessagePackEncodeResponse)?))
|
.header (ptth_core::PTTH_MAGIC_HEADER, base64::encode (rmp_serde::to_vec (&response.parts).map_err (ServerError::MessagePackEncodeResponse)?))
|
||||||
.header ("X-PTTH-SERVER-NAME", &state.config.name);
|
.header ("X-PTTH-SERVER-NAME", &state.config.name);
|
||||||
|
|
||||||
if response.parts.status_code != ptth_core::http_serde::StatusCode::NotModified {
|
if let Some (length) = response.content_length {
|
||||||
if let Some (length) = response.content_length {
|
resp_req = resp_req.header ("Content-Length", length.to_string ());
|
||||||
resp_req = resp_req.header ("Content-Length", length.to_string ());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some (mut body) = response.body {
|
if let Some (mut body) = response.body {
|
||||||
if state.config.throttle_upload {
|
if state.config.throttle_upload {
|
||||||
// Spawn another task to throttle the chunks
|
// Spawn another task to throttle the chunks
|
||||||
|
@ -133,14 +129,14 @@ async fn handle_one_req (
|
||||||
|
|
||||||
let req = resp_req.build ().map_err (ServerError::Step5Responding)?;
|
let req = resp_req.build ().map_err (ServerError::Step5Responding)?;
|
||||||
|
|
||||||
trace! ("{:?}", req.headers ());
|
debug! ("{:?}", req.headers ());
|
||||||
|
|
||||||
//println! ("Step 6");
|
//println! ("Step 6");
|
||||||
match state.client.execute (req).await {
|
match state.client.execute (req).await {
|
||||||
Ok (r) => {
|
Ok (r) => {
|
||||||
let status = r.status ();
|
let status = r.status ();
|
||||||
let text = r.text ().await.map_err (ServerError::Step7AfterResponse)?;
|
let text = r.text ().await.map_err (ServerError::Step7AfterResponse)?;
|
||||||
debug! ("http_response {} {:?} {:?}", req_id, status, text);
|
debug! ("{:?} {:?}", status, text);
|
||||||
},
|
},
|
||||||
Err (e) => {
|
Err (e) => {
|
||||||
if e.is_request () {
|
if e.is_request () {
|
||||||
|
@ -156,7 +152,7 @@ async fn handle_one_req (
|
||||||
|
|
||||||
async fn handle_requests <F, H, SH> (
|
async fn handle_requests <F, H, SH> (
|
||||||
state: &Arc <State>,
|
state: &Arc <State>,
|
||||||
wrapped_reqs: Vec <http_serde::WrappedRequest>,
|
req_resp: reqwest::Response,
|
||||||
spawn_handler: &mut SH,
|
spawn_handler: &mut SH,
|
||||||
) -> Result <(), ServerError>
|
) -> Result <(), ServerError>
|
||||||
where
|
where
|
||||||
|
@ -166,6 +162,18 @@ SH: Send + FnMut () -> H
|
||||||
{
|
{
|
||||||
//println! ("Step 1");
|
//println! ("Step 1");
|
||||||
|
|
||||||
|
let body = req_resp.bytes ().await.map_err (ServerError::CantCollectWrappedRequests)?;
|
||||||
|
let wrapped_reqs: Vec <http_serde::WrappedRequest> = match rmp_serde::from_read_ref (&body)
|
||||||
|
{
|
||||||
|
Ok (x) => x,
|
||||||
|
Err (e) => {
|
||||||
|
error! ("Can't parse wrapped requests: {:?}", e);
|
||||||
|
return Err (ServerError::CantParseWrappedRequests (e));
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
debug! ("Unwrapped {} requests", wrapped_reqs.len ());
|
||||||
|
|
||||||
for wrapped_req in wrapped_reqs {
|
for wrapped_req in wrapped_reqs {
|
||||||
let state = Arc::clone (&state);
|
let state = Arc::clone (&state);
|
||||||
let handler = spawn_handler ();
|
let handler = spawn_handler ();
|
||||||
|
@ -175,7 +183,7 @@ SH: Send + FnMut () -> H
|
||||||
tokio::spawn (async move {
|
tokio::spawn (async move {
|
||||||
let (req_id, parts) = (wrapped_req.id, wrapped_req.req);
|
let (req_id, parts) = (wrapped_req.id, wrapped_req.req);
|
||||||
|
|
||||||
info! ("Req {} {}", req_id, parts.uri);
|
debug! ("Handling request {}", req_id);
|
||||||
|
|
||||||
let f = handler (parts);
|
let f = handler (parts);
|
||||||
let response = f.await?;
|
let response = f.await?;
|
||||||
|
@ -377,35 +385,6 @@ impl State {
|
||||||
|
|
||||||
Ok (state)
|
Ok (state)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn http_listen (
|
|
||||||
state: &Arc <Self>,
|
|
||||||
) -> Result <Vec <http_serde::WrappedRequest>, ServerError>
|
|
||||||
{
|
|
||||||
use http::status::StatusCode;
|
|
||||||
|
|
||||||
let req_resp = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.config.name))
|
|
||||||
.timeout (Duration::from_secs (30))
|
|
||||||
.send ().await.map_err (ServerError::Step3Response)?;
|
|
||||||
|
|
||||||
if req_resp.status () == StatusCode::NO_CONTENT {
|
|
||||||
return Ok (Vec::new ());
|
|
||||||
}
|
|
||||||
|
|
||||||
if req_resp.status () != StatusCode::OK {
|
|
||||||
error! ("{}", req_resp.status ());
|
|
||||||
let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?;
|
|
||||||
let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?;
|
|
||||||
error! ("{}", body);
|
|
||||||
return Err (ServerError::Step3Unknown);
|
|
||||||
}
|
|
||||||
|
|
||||||
let body = req_resp.bytes ().await.map_err (ServerError::CantCollectWrappedRequests)?;
|
|
||||||
let wrapped_reqs: Vec <http_serde::WrappedRequest> = rmp_serde::from_read_ref (&body)
|
|
||||||
.map_err (ServerError::CantParseWrappedRequests)?;
|
|
||||||
|
|
||||||
Ok (wrapped_reqs)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run <F, H, SH> (
|
pub async fn run <F, H, SH> (
|
||||||
state: &Arc <Self>,
|
state: &Arc <Self>,
|
||||||
|
@ -417,10 +396,12 @@ impl State {
|
||||||
H: Send + 'static + FnOnce (http_serde::RequestParts) -> F,
|
H: Send + 'static + FnOnce (http_serde::RequestParts) -> F,
|
||||||
SH: Send + FnMut () -> H
|
SH: Send + FnMut () -> H
|
||||||
{
|
{
|
||||||
|
use http::status::StatusCode;
|
||||||
|
|
||||||
let mut backoff_delay = 0;
|
let mut backoff_delay = 0;
|
||||||
let mut shutdown_oneshot = shutdown_oneshot.fuse ();
|
let mut shutdown_oneshot = shutdown_oneshot.fuse ();
|
||||||
|
|
||||||
for i in 0u64.. {
|
loop {
|
||||||
// TODO: Extract loop body to function?
|
// TODO: Extract loop body to function?
|
||||||
|
|
||||||
if backoff_delay > 0 {
|
if backoff_delay > 0 {
|
||||||
|
@ -436,37 +417,61 @@ impl State {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug! ("http_listen {}...", i);
|
debug! ("http_listen");
|
||||||
|
|
||||||
let http_listen_fut = Self::http_listen (state);
|
let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.config.name))
|
||||||
|
.timeout (Duration::from_secs (30))
|
||||||
|
.send ();
|
||||||
|
|
||||||
let http_listen = futures::select! {
|
let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500);
|
||||||
r = http_listen_fut.fuse () => r,
|
|
||||||
|
let req_req = futures::select! {
|
||||||
|
r = req_req.fuse () => r,
|
||||||
_ = shutdown_oneshot => {
|
_ = shutdown_oneshot => {
|
||||||
info! ("Received graceful shutdown");
|
info! ("Received graceful shutdown");
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500);
|
let req_resp = match req_req {
|
||||||
|
|
||||||
let reqs = match http_listen {
|
|
||||||
Err (e) => {
|
Err (e) => {
|
||||||
backoff_delay = err_backoff_delay;
|
if e.is_timeout () {
|
||||||
error! ("http_listen {} error, backing off... {:?}", i, e);
|
error! ("Client-side timeout. Is an overly-aggressive firewall closing long-lived connections? Is the network flakey?");
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
error! ("Err: {:?}", e);
|
||||||
|
if backoff_delay != err_backoff_delay {
|
||||||
|
error! ("Non-timeout issue, increasing backoff_delay");
|
||||||
|
backoff_delay = err_backoff_delay;
|
||||||
|
}
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
},
|
},
|
||||||
Ok (x) => x,
|
Ok (x) => x,
|
||||||
};
|
};
|
||||||
|
|
||||||
debug! ("http_listen {} unwrapped {} requests", i, reqs.len ());
|
if req_resp.status () == StatusCode::NO_CONTENT {
|
||||||
|
debug! ("http_listen long poll timed out on the server, good.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else if req_resp.status () != StatusCode::OK {
|
||||||
|
error! ("{}", req_resp.status ());
|
||||||
|
let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?;
|
||||||
|
let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?;
|
||||||
|
error! ("{}", body);
|
||||||
|
if backoff_delay != err_backoff_delay {
|
||||||
|
error! ("Non-timeout issue, increasing backoff_delay");
|
||||||
|
backoff_delay = err_backoff_delay;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// Unpack the requests, spawn them into new tasks, then loop back
|
// Unpack the requests, spawn them into new tasks, then loop back
|
||||||
// around.
|
// around.
|
||||||
|
|
||||||
if handle_requests (
|
if handle_requests (
|
||||||
&state,
|
&state,
|
||||||
reqs,
|
req_resp,
|
||||||
spawn_handler,
|
spawn_handler,
|
||||||
).await.is_err () {
|
).await.is_err () {
|
||||||
backoff_delay = err_backoff_delay;
|
backoff_delay = err_backoff_delay;
|
||||||
|
|
Loading…
Reference in New Issue