diff --git a/crates/ptth_relay/Cargo.toml b/crates/ptth_relay/Cargo.toml index a44c080..28dae70 100644 --- a/crates/ptth_relay/Cargo.toml +++ b/crates/ptth_relay/Cargo.toml @@ -20,7 +20,7 @@ futures = "0.3.7" futures-util = "0.3.8" handlebars = "3.5.3" http = "0.2.3" -hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] } +hyper = { version = "0.14.20", features = ["http1", "http2", "server", "stream", "tcp"] } itertools = "0.9.0" rand = "0.8.3" rmp-serde = "0.15.5" diff --git a/crates/ptth_relay/src/lib.rs b/crates/ptth_relay/src/lib.rs index 2f29ca8..80512c1 100644 --- a/crates/ptth_relay/src/lib.rs +++ b/crates/ptth_relay/src/lib.rs @@ -135,6 +135,11 @@ async fn handle_http_request ( let (tx, rx) = oneshot::channel (); + let tx = relay_state::ResponseRendezvous { + timeout: Instant::now () + Duration::from_secs (120), + tx, + }; + let req_id = rusty_ulid::generate_ulid_string (); debug! ("Forwarding {}", req_id); @@ -791,6 +796,45 @@ pub async fn run_relay ( }); } + // Set a task to periodically sweep and time-out requests where the client + // and server are never going to rendezvous + + let state_2 = Arc::clone (&state); + tokio::spawn (async move { + let mut interval = tokio::time::interval (Duration::from_secs (60)); + interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); + + loop { + use std::convert::TryFrom; + + use rusty_ulid::Ulid; + + interval.tick ().await; + + { + let timeout_ms = Utc::now ().timestamp () - 120_000; + if let Ok (timeout_ms) = u64::try_from (timeout_ms) { + let timeout_ulid = Ulid::from_timestamp_with_rng (timeout_ms, &mut rand::thread_rng ()).to_string (); + + let mut request_rendezvous = state_2.request_rendezvous.lock ().await; + request_rendezvous.iter_mut () + .for_each (|(k, v)| { + match v { + RequestRendezvous::ParkedServer (_) => (), + RequestRendezvous::ParkedClients (requests) => requests.retain (|req| req.id.as_str () >= timeout_ulid.as_str ()), + } + }); + } + } + + { + let now = Instant::now (); + let response_rendezvous = state_2.response_rendezvous.read ().await; + response_rendezvous.retain (|_, v| v.timeout >= now); + } + } + }); + let make_svc = make_service_fn (|_conn| { let state = state.clone (); let handlebars = handlebars.clone (); @@ -849,7 +893,7 @@ pub async fn run_relay ( std::mem::swap (&mut swapped, &mut response_rendezvous); for (_, sender) in swapped { - sender.send (Err (ShuttingDown)).ok (); + sender.tx.send (Err (ShuttingDown)).ok (); } let mut request_rendezvous = state.request_rendezvous.lock ().await; diff --git a/crates/ptth_relay/src/relay_state.rs b/crates/ptth_relay/src/relay_state.rs index 9f66ffa..4ad3bcb 100644 --- a/crates/ptth_relay/src/relay_state.rs +++ b/crates/ptth_relay/src/relay_state.rs @@ -61,7 +61,10 @@ pub enum RequestRendezvous { ParkedServer (oneshot::Sender >), } -type ResponseRendezvous = oneshot::Sender >; +pub (crate) struct ResponseRendezvous { + pub timeout: Instant, + pub tx: oneshot::Sender >, +} #[derive (Clone)] pub struct ServerStatus { diff --git a/crates/ptth_relay/src/scraper_api.rs b/crates/ptth_relay/src/scraper_api.rs index ddb2e93..b41ac80 100644 --- a/crates/ptth_relay/src/scraper_api.rs +++ b/crates/ptth_relay/src/scraper_api.rs @@ -119,6 +119,21 @@ pub async fn v1_server_list (state: &Relay) } } +fn get_api_key (headers: &hyper::HeaderMap) -> Option <&str> +{ + if let Some (key) = headers.get ("X-ApiKey").and_then (|v| v.to_str ().ok ()) { + return Some (key); + } + + if let Some (s) = headers.get ("Authorization").and_then (|v| v.to_str ().ok ()) { + if let Some (key) = s.strip_prefix ("Bearer ") { + return Some (key); + } + } + + None +} + #[instrument (level = "trace", skip (req, state))] async fn api_v1 ( req: Request , @@ -132,7 +147,7 @@ async fn api_v1 ( AuditEvent, }; - let api_key = req.headers ().get ("X-ApiKey"); + let api_key = get_api_key (req.headers ()); let api_key = match api_key { None => return Ok (error_reply (StatusCode::FORBIDDEN, strings::NO_API_KEY)?), @@ -176,7 +191,10 @@ async fn api_v1 ( path: path_rest.to_string (), })).await; - if path_rest == "test" { + if path_rest == "metrics" { + Ok (metrics (req, state).await?) + } + else if path_rest == "test" { Ok (error_reply (StatusCode::OK, "You're valid!")?) } else if path_rest == "server_list" { @@ -205,6 +223,65 @@ async fn api_v1 ( } } +#[instrument (level = "trace", skip (req, state))] +async fn metrics ( + req: Request , + state: &Relay, +) +-> Result , RequestError> +{ + let mut s = String::with_capacity (4 * 1_024); + + let mut push_metric = |name, help, kind, value| { + if let Some (help) = help { + s.push_str (format! ("# HELP {} {}\n", name, help).as_str ()); + } + s.push_str (format! ("# TYPE {} {}\n", name, kind).as_str ()); + s.push_str (format! ("{} {}\n", name, value).as_str ()); + }; + + let request_rendezvous_count = { + let g = state.request_rendezvous.lock ().await; + g.len () + }; + + let server_status_count; + let connected_server_count; + + let now = Utc::now (); + + { + let g = state.server_status.lock ().await; + server_status_count = g.len (); + connected_server_count = g.iter () + .filter (|(_, s)| now - s.last_seen < chrono::Duration::seconds (60)) + .count (); + } + + let response_rendezvous_count = { + let g = state.response_rendezvous.read ().await; + g.len () + }; + + push_metric ("request_rendezvous_count", None, "gauge", request_rendezvous_count.to_string ()); + push_metric ("server_status_count", None, "gauge", server_status_count.to_string ()); + push_metric ("connected_server_count", None, "gauge", connected_server_count.to_string ()); + push_metric ("response_rendezvous_count", None, "gauge", response_rendezvous_count.to_string ()); + + #[cfg (target_os = "linux")] + { + if let Some (rss) = tokio::fs::read_to_string ("/proc/self/status").await + .ok () + .and_then (|s| get_rss_from_status (s.as_str ())) + { + push_metric ("relay_vm_rss", Some ("VmRSS of the relay process, in kB"), "gauge", rss.to_string ()); + } + } + + Ok (Response::builder () + .body (Body::from (s))?) +} + #[instrument (level = "trace", skip (req, state))] pub async fn handle ( req: Request , @@ -230,6 +307,20 @@ pub async fn handle ( } } +fn get_rss_from_status (proc_status: &str) -> Option +{ + use std::str::FromStr; + + for line in proc_status.lines () { + if let Some (rest) = line.strip_prefix ("VmRSS:\t").and_then (|s| s.strip_suffix (" kB")) + { + return u64::from_str (rest.trim_start ()).ok (); + } + } + + None +} + #[cfg (test)] mod tests { use std::{ @@ -246,8 +337,9 @@ mod tests { struct TestCase { // Inputs path_rest: &'static str, + auth_header: Option <&'static str>, valid_key: Option <&'static str>, - input_key: Option <&'static str>, + x_api_key: Option <&'static str>, // Expected expected_status: StatusCode, @@ -268,9 +360,15 @@ mod tests { x } - fn input_key (&self, v: Option <&'static str>) -> Self { + fn auth_header (&self, v: Option <&'static str>) -> Self { let mut x = self.clone (); - x.input_key = v; + x.auth_header = v; + x + } + + fn x_api_key (&self, v: Option <&'static str>) -> Self { + let mut x = self.clone (); + x.x_api_key = v; x } @@ -298,13 +396,16 @@ mod tests { .expected_body (format! ("{}\n", body)) } - async fn test (&self) { + async fn test (&self, name: &str) { let mut input = Request::builder () .method ("GET") .uri (format! ("http://127.0.0.1:4000/scraper/{}", self.path_rest)); - if let Some (input_key) = self.input_key { - input = input.header ("X-ApiKey", input_key); + if let Some (auth_header) = self.auth_header { + input = input.header ("Authorization", auth_header); + } + if let Some (x_api_key) = self.x_api_key { + input = input.header ("X-ApiKey", x_api_key); } let input = input.body (Body::empty ()).unwrap (); @@ -331,15 +432,15 @@ mod tests { expected_headers.insert (*key, (*value).try_into ().expect ("Couldn't convert header value")); } - assert_eq! (actual_head.status, self.expected_status); - assert_eq! (actual_head.headers, expected_headers); + assert_eq! (actual_head.status, self.expected_status, "{}", name); + assert_eq! (actual_head.headers, expected_headers, "{}", name); let actual_body = hyper::body::to_bytes (actual_body).await; let actual_body = actual_body.expect ("Body should be convertible to bytes"); let actual_body = actual_body.to_vec (); let actual_body = String::from_utf8 (actual_body).expect ("Body should be UTF-8"); - assert_eq! (actual_body, self.expected_body); + assert_eq! (actual_body, self.expected_body, "{}", name); } } @@ -351,7 +452,8 @@ mod tests { let base_case = TestCase { path_rest: "v1/test", valid_key: Some ("bogus"), - input_key: Some ("bogus"), + auth_header: None, + x_api_key: Some ("bogus"), expected_status: StatusCode::OK, expected_headers: vec! [ ("content-type", "text/plain"), @@ -359,21 +461,47 @@ mod tests { expected_body: "You're valid!\n".to_string (), }; - for case in &[ - base_case.clone (), - base_case.path_rest ("v9999/test") - .expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_VERSION), - base_case.valid_key (None) - .expected (StatusCode::FORBIDDEN, strings::FORBIDDEN), - base_case.input_key (Some ("borgus")) - .expected (StatusCode::FORBIDDEN, strings::FORBIDDEN), - base_case.path_rest ("v1/toast") - .expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_ENDPOINT), - base_case.input_key (None) - .expected (StatusCode::FORBIDDEN, strings::NO_API_KEY), - ] { - case.test ().await; - } + base_case + .test ("00").await; + + base_case + .path_rest ("v9999/test") + .expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_VERSION) + .test ("01").await; + + base_case + .valid_key (None) + .expected (StatusCode::FORBIDDEN, strings::FORBIDDEN) + .test ("02").await; + + base_case + .x_api_key (Some ("borgus")) + .expected (StatusCode::FORBIDDEN, strings::FORBIDDEN) + .test ("03").await; + + base_case + .path_rest ("v1/toast") + .expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_ENDPOINT) + .test ("04").await; + + base_case + .x_api_key (None) + .expected (StatusCode::FORBIDDEN, strings::NO_API_KEY) + .test ("05").await; + + base_case + .x_api_key (None) + .auth_header (Some ("Bearer bogus")) + .expected (StatusCode::OK, "You're valid!") + .test ("06").await; + }); } + + #[test] + fn rss () { + let input = "VmHWM: 584 kB\nVmRSS: 584 kB\nRssAnon: 68 kB\n"; + + assert_eq! (get_rss_from_status (input), Some (584)); + } } diff --git a/crates/ptth_relay/src/server_endpoint.rs b/crates/ptth_relay/src/server_endpoint.rs index 84333dd..71e8b6a 100644 --- a/crates/ptth_relay/src/server_endpoint.rs +++ b/crates/ptth_relay/src/server_endpoint.rs @@ -188,7 +188,7 @@ pub async fn handle_response ( }; // UKAUFFY4 (Send half) - if tx.send (Ok ((resp_parts, body))).is_err () { + if tx.tx.send (Ok ((resp_parts, body))).is_err () { let msg = "Failed to connect to client"; error! (msg); return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?); diff --git a/crates/ptth_server_gui/src/main.rs b/crates/ptth_server_gui/src/main.rs index a990c7e..31d852c 100644 --- a/crates/ptth_server_gui/src/main.rs +++ b/crates/ptth_server_gui/src/main.rs @@ -47,13 +47,12 @@ fn main () let app = app::App::default(); let mut wind = Window::new (100, 100, 500, 180, "PTTH server"); - let config_file_opt = match ptth_server::load_toml::load:: ("./config/ptth_server.toml") - { + let config_file_opt = match ptth_server::load_toml::load:: ("./config/ptth_server.toml") { Ok (x) => Some (x), Err (e) => { - eprintln! ("Error in config TOML: {:?}", e); + eprintln! ("Error in `./config/ptth_server.toml`: {:?}", e); None - } + }, }; let (hit_tx, mut hit_rx) = mpsc::channel (1); @@ -61,9 +60,7 @@ fn main () let fltk_tx = fltk_tx; rt.spawn (async move { - eprintln! ("Entering channel task"); while hit_rx.recv ().await.is_some () { - eprintln! ("fltk_tx"); fltk_tx.send (Message::Hit); } }); diff --git a/handlebars/server/file_server_dir.html b/handlebars/server/file_server_dir.html index 857daca..3b837ab 100644 --- a/handlebars/server/file_server_dir.html +++ b/handlebars/server/file_server_dir.html @@ -115,7 +115,7 @@ AIABAACAAQAAgAEAAIABAACAAQAAgAEAAIABAACAAQAA" rel="icon" type="image/x-icon" /> {{#each entries}} - + {{this.icon}} {{this.file_name}}{{this.trailing_slash}} {{this.size}}