diff --git a/Cargo.lock b/Cargo.lock index 983c08d..c3bddeb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1575,7 +1575,7 @@ dependencies = [ [[package]] name = "ptth_server" -version = "2.0.0" +version = "2.1.0" dependencies = [ "aho-corasick", "always_equal", diff --git a/crates/ptth_server/Cargo.toml b/crates/ptth_server/Cargo.toml index 3173c08..e3d1b38 100644 --- a/crates/ptth_server/Cargo.toml +++ b/crates/ptth_server/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ptth_server" -version = "2.0.0" +version = "2.1.0" authors = ["Trish"] edition = "2018" license = "AGPL-3.0" diff --git a/crates/ptth_server/src/bin/ptth_server_custom.rs b/crates/ptth_server/src/bin/ptth_server_custom.rs new file mode 100644 index 0000000..b829805 --- /dev/null +++ b/crates/ptth_server/src/bin/ptth_server_custom.rs @@ -0,0 +1,49 @@ +#[tokio::main] +async fn main () -> anyhow::Result <()> { + use std::{ + fs, + sync::Arc, + }; + + use ptth_core::{ + graceful_shutdown, + http_serde::{ + RequestParts, + Response, + }, + }; + use ptth_server::{ + Builder, + State, + }; + + let api_key = fs::read_to_string ("config/ptth_server_custom_key.txt")? + .trim_end () + .to_string (); + + let state = Builder::new ( + "ptth_server_custom".to_string (), + "http://127.0.0.1:4000/7ZSFUKGV".to_string () + ) + .api_key (api_key) + .build ()?; + + let state = Arc::new (state); + + let mut spawn_handler = || { + |req: RequestParts| async move { + let mut resp = Response::default (); + resp.body_bytes (req.uri.as_bytes ().to_vec ()); + + Ok (resp) + } + }; + + State::run ( + &state, + graceful_shutdown::init (), + &mut spawn_handler, + ).await?; + + Ok (()) +} diff --git a/crates/ptth_server/src/lib.rs b/crates/ptth_server/src/lib.rs index 23a2767..14eabcf 100644 --- a/crates/ptth_server/src/lib.rs +++ b/crates/ptth_server/src/lib.rs @@ -72,7 +72,7 @@ pub mod load_toml; use errors::ServerError; -struct State { +pub struct State { file_server: file_server::FileServer, config: Config, client: Client, @@ -145,15 +145,15 @@ async fn handle_one_req ( Ok::<(), ServerError> (()) } -async fn handle_requests ( +async fn handle_requests ( state: &Arc , req_resp: reqwest::Response, - mut spawn_handler: H, + spawn_handler: &mut SH, ) -> Result <(), ServerError> where F: Send + Future >, -F2: Send + 'static + FnOnce (http_serde::RequestParts) -> F, -H: Send + FnMut () -> F2 +H: Send + 'static + FnOnce (http_serde::RequestParts) -> F, +SH: Send + FnMut () -> H { //println! ("Step 1"); @@ -253,7 +253,46 @@ pub struct Config { pub throttle_upload: bool, } -/// Runs a PTTH file server +pub struct Builder { + config_file: ConfigFile, + hidden_path: Option , + asset_root: Option , +} + +impl Builder { + pub fn new ( + name: String, + relay_url: String, + ) -> Self { + let config_file = ConfigFile { + name, + api_key: ptth_core::gen_key (), + relay_url, + file_server_root: None, + throttle_upload: false, + }; + + Self { + config_file, + hidden_path: None, + asset_root: None, + } + } + + pub fn build (self) -> Result + { + State::new ( + self.config_file, + self.hidden_path, + self.asset_root + ) + } + + pub fn api_key (mut self, key: String) -> Self { + self.config_file.api_key = key; + self + } +} pub async fn run_server ( config_file: ConfigFile, @@ -261,155 +300,183 @@ pub async fn run_server ( hidden_path: Option , asset_root: Option ) --> Result <(), ServerError> +-> Result <(), ServerError> { - use std::{ - convert::TryInto, + let state = Arc::new (State::new ( + config_file, + hidden_path, + asset_root, + )?); + + let state_2 = Arc::clone (&state); + + let mut spawn_handler = || { + let state = Arc::clone (&state_2); + + |req: http_serde::RequestParts| async move { + Ok (state.file_server.serve_all (req.method, &req.uri, &req.headers).await?) + } }; - use arc_swap::ArcSwap; - - let asset_root = asset_root.unwrap_or_else (PathBuf::new); - - info! ("Server name is {}", config_file.name); - info! ("Tripcode is {}", config_file.tripcode ()); - - let mut headers = reqwest::header::HeaderMap::new (); - headers.insert ("X-ApiKey", config_file.api_key.try_into ().map_err (ServerError::ApiKeyInvalid)?); - - let client = Client::builder () - .default_headers (headers) - .connect_timeout (Duration::from_secs (30)) - .build ().map_err (ServerError::CantBuildHttpClient)?; - - let metrics_interval = Arc::new (ArcSwap::default ()); - - let interval_writer = Arc::clone (&metrics_interval); - tokio::spawn (async move { - file_server::metrics::Interval::monitor (interval_writer).await; - }); - - let state = Arc::new (State { - file_server: file_server::FileServer::new ( - config_file.file_server_root, - &asset_root, - config_file.name, - metrics_interval, - hidden_path, - )?, - config: Config { - relay_url: config_file.relay_url, - throttle_upload: config_file.throttle_upload, - }, - client, - }); - - run_server_loop (state, shutdown_oneshot).await + State::run ( + &state, + shutdown_oneshot, + &mut spawn_handler, + ).await } -async fn run_server_loop ( - state: Arc , - shutdown_oneshot: oneshot::Receiver <()>, -) -> Result <(), ServerError> { - use http::status::StatusCode; - - let mut backoff_delay = 0; - let mut shutdown_oneshot = shutdown_oneshot.fuse (); - - loop { - // TODO: Extract loop body to function? +impl State { + pub fn new ( + config_file: ConfigFile, + hidden_path: Option , + asset_root: Option + ) + -> Result + { + use std::convert::TryInto; - if backoff_delay > 0 { - let sleep = tokio::time::sleep (Duration::from_millis (backoff_delay)); - tokio::pin! (sleep); + use arc_swap::ArcSwap; + + let asset_root = asset_root.unwrap_or_else (PathBuf::new); + + info! ("Server name is {}", config_file.name); + info! ("Tripcode is {}", config_file.tripcode ()); + + let mut headers = reqwest::header::HeaderMap::new (); + headers.insert ("X-ApiKey", config_file.api_key.try_into ().map_err (ServerError::ApiKeyInvalid)?); + + let client = Client::builder () + .default_headers (headers) + .connect_timeout (Duration::from_secs (30)) + .build ().map_err (ServerError::CantBuildHttpClient)?; + + let metrics_interval = Arc::new (ArcSwap::default ()); + + let interval_writer = Arc::clone (&metrics_interval); + tokio::spawn (async move { + file_server::metrics::Interval::monitor (interval_writer).await; + }); + + let state = State { + file_server: file_server::FileServer::new ( + config_file.file_server_root, + &asset_root, + config_file.name, + metrics_interval, + hidden_path, + )?, + config: Config { + relay_url: config_file.relay_url, + throttle_upload: config_file.throttle_upload, + }, + client, + }; + + Ok (state) + } + + pub async fn run ( + state: &Arc , + shutdown_oneshot: oneshot::Receiver <()>, + spawn_handler: &mut SH, + ) -> Result <(), ServerError> + where + F: Send + Future >, + H: Send + 'static + FnOnce (http_serde::RequestParts) -> F, + SH: Send + FnMut () -> H + { + use http::status::StatusCode; + + let mut backoff_delay = 0; + let mut shutdown_oneshot = shutdown_oneshot.fuse (); + + loop { + // TODO: Extract loop body to function? - tokio::select! { - _ = &mut sleep => {}, - _ = &mut shutdown_oneshot => { + if backoff_delay > 0 { + let sleep = tokio::time::sleep (Duration::from_millis (backoff_delay)); + tokio::pin! (sleep); + + tokio::select! { + _ = &mut sleep => {}, + _ = &mut shutdown_oneshot => { + info! ("Received graceful shutdown"); + break; + }, + } + } + + debug! ("http_listen"); + + let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.file_server.metrics_startup.server_name)) + .timeout (Duration::from_secs (30)) + .send (); + + let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500); + + let req_req = futures::select! { + r = req_req.fuse () => r, + _ = shutdown_oneshot => { info! ("Received graceful shutdown"); break; }, - } - } - - debug! ("http_listen"); - - let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.file_server.metrics_startup.server_name)) - .timeout (Duration::from_secs (30)) - .send (); - - let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500); - - let req_req = futures::select! { - r = req_req.fuse () => r, - _ = shutdown_oneshot => { - info! ("Received graceful shutdown"); - break; - }, - }; - - let req_resp = match req_req { - Err (e) => { - if e.is_timeout () { - error! ("Client-side timeout. Is an overly-aggressive firewall closing long-lived connections? Is the network flakey?"); - } - else { - error! ("Err: {:?}", e); - if backoff_delay != err_backoff_delay { - error! ("Non-timeout issue, increasing backoff_delay"); - backoff_delay = err_backoff_delay; + }; + + let req_resp = match req_req { + Err (e) => { + if e.is_timeout () { + error! ("Client-side timeout. Is an overly-aggressive firewall closing long-lived connections? Is the network flakey?"); } + else { + error! ("Err: {:?}", e); + if backoff_delay != err_backoff_delay { + error! ("Non-timeout issue, increasing backoff_delay"); + backoff_delay = err_backoff_delay; + } + } + continue; + }, + Ok (x) => x, + }; + + if req_resp.status () == StatusCode::NO_CONTENT { + debug! ("http_listen long poll timed out on the server, good."); + continue; + } + else if req_resp.status () != StatusCode::OK { + error! ("{}", req_resp.status ()); + let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?; + let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?; + error! ("{}", body); + if backoff_delay != err_backoff_delay { + error! ("Non-timeout issue, increasing backoff_delay"); + backoff_delay = err_backoff_delay; } continue; - }, - Ok (x) => x, - }; - - if req_resp.status () == StatusCode::NO_CONTENT { - debug! ("http_listen long poll timed out on the server, good."); - continue; - } - else if req_resp.status () != StatusCode::OK { - error! ("{}", req_resp.status ()); - let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?; - let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?; - error! ("{}", body); - if backoff_delay != err_backoff_delay { - error! ("Non-timeout issue, increasing backoff_delay"); - backoff_delay = err_backoff_delay; } - continue; - } - - // Unpack the requests, spawn them into new tasks, then loop back - // around. - - let spawn_handler = || { - let state = Arc::clone (&state); - |req: http_serde::RequestParts| async move { - Ok (state.file_server.serve_all (req.method, &req.uri, &req.headers).await?) + // Unpack the requests, spawn them into new tasks, then loop back + // around. + + if handle_requests ( + &state, + req_resp, + spawn_handler, + ).await.is_err () { + backoff_delay = err_backoff_delay; + continue; + } + + if backoff_delay != 0 { + debug! ("backoff_delay = 0"); + backoff_delay = 0; } - }; - - if handle_requests ( - &state, - req_resp, - spawn_handler, - ).await.is_err () { - backoff_delay = err_backoff_delay; - continue; } - if backoff_delay != 0 { - debug! ("backoff_delay = 0"); - backoff_delay = 0; - } + info! ("Exiting"); + + Ok (()) } - - info! ("Exiting"); - - Ok (()) } #[cfg (test)]