ptth_server v2.1.0 will allow custom servers with a user-supplied request handler

main
_ 2021-05-09 19:29:15 +00:00
parent 08009de043
commit 6c826b0cc5
4 changed files with 254 additions and 138 deletions

2
Cargo.lock generated
View File

@ -1575,7 +1575,7 @@ dependencies = [
[[package]] [[package]]
name = "ptth_server" name = "ptth_server"
version = "2.0.0" version = "2.1.0"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"always_equal", "always_equal",

View File

@ -1,7 +1,7 @@
[package] [package]
name = "ptth_server" name = "ptth_server"
version = "2.0.0" version = "2.1.0"
authors = ["Trish"] authors = ["Trish"]
edition = "2018" edition = "2018"
license = "AGPL-3.0" license = "AGPL-3.0"

View File

@ -0,0 +1,49 @@
#[tokio::main]
async fn main () -> anyhow::Result <()> {
use std::{
fs,
sync::Arc,
};
use ptth_core::{
graceful_shutdown,
http_serde::{
RequestParts,
Response,
},
};
use ptth_server::{
Builder,
State,
};
let api_key = fs::read_to_string ("config/ptth_server_custom_key.txt")?
.trim_end ()
.to_string ();
let state = Builder::new (
"ptth_server_custom".to_string (),
"http://127.0.0.1:4000/7ZSFUKGV".to_string ()
)
.api_key (api_key)
.build ()?;
let state = Arc::new (state);
let mut spawn_handler = || {
|req: RequestParts| async move {
let mut resp = Response::default ();
resp.body_bytes (req.uri.as_bytes ().to_vec ());
Ok (resp)
}
};
State::run (
&state,
graceful_shutdown::init (),
&mut spawn_handler,
).await?;
Ok (())
}

View File

@ -72,7 +72,7 @@ pub mod load_toml;
use errors::ServerError; use errors::ServerError;
struct State { pub struct State {
file_server: file_server::FileServer, file_server: file_server::FileServer,
config: Config, config: Config,
client: Client, client: Client,
@ -145,15 +145,15 @@ async fn handle_one_req (
Ok::<(), ServerError> (()) Ok::<(), ServerError> (())
} }
async fn handle_requests <F, F2, H> ( async fn handle_requests <F, H, SH> (
state: &Arc <State>, state: &Arc <State>,
req_resp: reqwest::Response, req_resp: reqwest::Response,
mut spawn_handler: H, spawn_handler: &mut SH,
) -> Result <(), ServerError> ) -> Result <(), ServerError>
where where
F: Send + Future <Output = anyhow::Result <http_serde::Response>>, F: Send + Future <Output = anyhow::Result <http_serde::Response>>,
F2: Send + 'static + FnOnce (http_serde::RequestParts) -> F, H: Send + 'static + FnOnce (http_serde::RequestParts) -> F,
H: Send + FnMut () -> F2 SH: Send + FnMut () -> H
{ {
//println! ("Step 1"); //println! ("Step 1");
@ -253,7 +253,46 @@ pub struct Config {
pub throttle_upload: bool, pub throttle_upload: bool,
} }
/// Runs a PTTH file server pub struct Builder {
config_file: ConfigFile,
hidden_path: Option <PathBuf>,
asset_root: Option <PathBuf>,
}
impl Builder {
pub fn new (
name: String,
relay_url: String,
) -> Self {
let config_file = ConfigFile {
name,
api_key: ptth_core::gen_key (),
relay_url,
file_server_root: None,
throttle_upload: false,
};
Self {
config_file,
hidden_path: None,
asset_root: None,
}
}
pub fn build (self) -> Result <State, ServerError>
{
State::new (
self.config_file,
self.hidden_path,
self.asset_root
)
}
pub fn api_key (mut self, key: String) -> Self {
self.config_file.api_key = key;
self
}
}
pub async fn run_server ( pub async fn run_server (
config_file: ConfigFile, config_file: ConfigFile,
@ -261,155 +300,183 @@ pub async fn run_server (
hidden_path: Option <PathBuf>, hidden_path: Option <PathBuf>,
asset_root: Option <PathBuf> asset_root: Option <PathBuf>
) )
-> Result <(), ServerError> -> Result <(), ServerError>
{ {
use std::{ let state = Arc::new (State::new (
convert::TryInto, config_file,
hidden_path,
asset_root,
)?);
let state_2 = Arc::clone (&state);
let mut spawn_handler = || {
let state = Arc::clone (&state_2);
|req: http_serde::RequestParts| async move {
Ok (state.file_server.serve_all (req.method, &req.uri, &req.headers).await?)
}
}; };
use arc_swap::ArcSwap; State::run (
&state,
let asset_root = asset_root.unwrap_or_else (PathBuf::new); shutdown_oneshot,
&mut spawn_handler,
info! ("Server name is {}", config_file.name); ).await
info! ("Tripcode is {}", config_file.tripcode ());
let mut headers = reqwest::header::HeaderMap::new ();
headers.insert ("X-ApiKey", config_file.api_key.try_into ().map_err (ServerError::ApiKeyInvalid)?);
let client = Client::builder ()
.default_headers (headers)
.connect_timeout (Duration::from_secs (30))
.build ().map_err (ServerError::CantBuildHttpClient)?;
let metrics_interval = Arc::new (ArcSwap::default ());
let interval_writer = Arc::clone (&metrics_interval);
tokio::spawn (async move {
file_server::metrics::Interval::monitor (interval_writer).await;
});
let state = Arc::new (State {
file_server: file_server::FileServer::new (
config_file.file_server_root,
&asset_root,
config_file.name,
metrics_interval,
hidden_path,
)?,
config: Config {
relay_url: config_file.relay_url,
throttle_upload: config_file.throttle_upload,
},
client,
});
run_server_loop (state, shutdown_oneshot).await
} }
async fn run_server_loop ( impl State {
state: Arc <State>, pub fn new (
shutdown_oneshot: oneshot::Receiver <()>, config_file: ConfigFile,
) -> Result <(), ServerError> { hidden_path: Option <PathBuf>,
use http::status::StatusCode; asset_root: Option <PathBuf>
)
let mut backoff_delay = 0; -> Result <Self, ServerError>
let mut shutdown_oneshot = shutdown_oneshot.fuse (); {
use std::convert::TryInto;
loop {
// TODO: Extract loop body to function?
if backoff_delay > 0 { use arc_swap::ArcSwap;
let sleep = tokio::time::sleep (Duration::from_millis (backoff_delay));
tokio::pin! (sleep); let asset_root = asset_root.unwrap_or_else (PathBuf::new);
info! ("Server name is {}", config_file.name);
info! ("Tripcode is {}", config_file.tripcode ());
let mut headers = reqwest::header::HeaderMap::new ();
headers.insert ("X-ApiKey", config_file.api_key.try_into ().map_err (ServerError::ApiKeyInvalid)?);
let client = Client::builder ()
.default_headers (headers)
.connect_timeout (Duration::from_secs (30))
.build ().map_err (ServerError::CantBuildHttpClient)?;
let metrics_interval = Arc::new (ArcSwap::default ());
let interval_writer = Arc::clone (&metrics_interval);
tokio::spawn (async move {
file_server::metrics::Interval::monitor (interval_writer).await;
});
let state = State {
file_server: file_server::FileServer::new (
config_file.file_server_root,
&asset_root,
config_file.name,
metrics_interval,
hidden_path,
)?,
config: Config {
relay_url: config_file.relay_url,
throttle_upload: config_file.throttle_upload,
},
client,
};
Ok (state)
}
pub async fn run <F, H, SH> (
state: &Arc <Self>,
shutdown_oneshot: oneshot::Receiver <()>,
spawn_handler: &mut SH,
) -> Result <(), ServerError>
where
F: Send + Future <Output = anyhow::Result <http_serde::Response>>,
H: Send + 'static + FnOnce (http_serde::RequestParts) -> F,
SH: Send + FnMut () -> H
{
use http::status::StatusCode;
let mut backoff_delay = 0;
let mut shutdown_oneshot = shutdown_oneshot.fuse ();
loop {
// TODO: Extract loop body to function?
tokio::select! { if backoff_delay > 0 {
_ = &mut sleep => {}, let sleep = tokio::time::sleep (Duration::from_millis (backoff_delay));
_ = &mut shutdown_oneshot => { tokio::pin! (sleep);
tokio::select! {
_ = &mut sleep => {},
_ = &mut shutdown_oneshot => {
info! ("Received graceful shutdown");
break;
},
}
}
debug! ("http_listen");
let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.file_server.metrics_startup.server_name))
.timeout (Duration::from_secs (30))
.send ();
let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500);
let req_req = futures::select! {
r = req_req.fuse () => r,
_ = shutdown_oneshot => {
info! ("Received graceful shutdown"); info! ("Received graceful shutdown");
break; break;
}, },
} };
}
let req_resp = match req_req {
debug! ("http_listen"); Err (e) => {
if e.is_timeout () {
let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.file_server.metrics_startup.server_name)) error! ("Client-side timeout. Is an overly-aggressive firewall closing long-lived connections? Is the network flakey?");
.timeout (Duration::from_secs (30))
.send ();
let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500);
let req_req = futures::select! {
r = req_req.fuse () => r,
_ = shutdown_oneshot => {
info! ("Received graceful shutdown");
break;
},
};
let req_resp = match req_req {
Err (e) => {
if e.is_timeout () {
error! ("Client-side timeout. Is an overly-aggressive firewall closing long-lived connections? Is the network flakey?");
}
else {
error! ("Err: {:?}", e);
if backoff_delay != err_backoff_delay {
error! ("Non-timeout issue, increasing backoff_delay");
backoff_delay = err_backoff_delay;
} }
else {
error! ("Err: {:?}", e);
if backoff_delay != err_backoff_delay {
error! ("Non-timeout issue, increasing backoff_delay");
backoff_delay = err_backoff_delay;
}
}
continue;
},
Ok (x) => x,
};
if req_resp.status () == StatusCode::NO_CONTENT {
debug! ("http_listen long poll timed out on the server, good.");
continue;
}
else if req_resp.status () != StatusCode::OK {
error! ("{}", req_resp.status ());
let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?;
let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?;
error! ("{}", body);
if backoff_delay != err_backoff_delay {
error! ("Non-timeout issue, increasing backoff_delay");
backoff_delay = err_backoff_delay;
} }
continue; continue;
},
Ok (x) => x,
};
if req_resp.status () == StatusCode::NO_CONTENT {
debug! ("http_listen long poll timed out on the server, good.");
continue;
}
else if req_resp.status () != StatusCode::OK {
error! ("{}", req_resp.status ());
let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?;
let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?;
error! ("{}", body);
if backoff_delay != err_backoff_delay {
error! ("Non-timeout issue, increasing backoff_delay");
backoff_delay = err_backoff_delay;
} }
continue;
}
// Unpack the requests, spawn them into new tasks, then loop back
// around.
let spawn_handler = || {
let state = Arc::clone (&state);
|req: http_serde::RequestParts| async move { // Unpack the requests, spawn them into new tasks, then loop back
Ok (state.file_server.serve_all (req.method, &req.uri, &req.headers).await?) // around.
if handle_requests (
&state,
req_resp,
spawn_handler,
).await.is_err () {
backoff_delay = err_backoff_delay;
continue;
}
if backoff_delay != 0 {
debug! ("backoff_delay = 0");
backoff_delay = 0;
} }
};
if handle_requests (
&state,
req_resp,
spawn_handler,
).await.is_err () {
backoff_delay = err_backoff_delay;
continue;
} }
if backoff_delay != 0 { info! ("Exiting");
debug! ("backoff_delay = 0");
backoff_delay = 0; Ok (())
}
} }
info! ("Exiting");
Ok (())
} }
#[cfg (test)] #[cfg (test)]