Compare commits

...

15 Commits

Author SHA1 Message Date
_ 292ade0f46 Merge /run/media/user/d3de9062-a284-4b94-8900-0c416f57f9ac/projects/ptth 2022-10-26 03:50:51 +00:00
Trisha Lefler 7b9c6c7dc4 Merge branch 'main' of ssh://virtusense-dev.com:2200/Trisha/ptth into main 2022-10-25 10:31:08 -05:00
Trisha Lefler 89d4fdbcc3 🔊 add error message for config file 2022-10-25 10:28:45 -05:00
(on company time) 5b6adc1305 🐛 bug: fix colons in filenames not linking properly
https://stackoverflow.com/questions/1737575/are-colons-allowed-in-urls
2022-10-21 15:21:35 -05:00
(on company time) 863bbe18e4 ⬆️ update hyper dep and allow HTTP/2 for the relay
This makes it easier for a gateway like Nginx to terminate TLS for a PTTH relay
without needing an entire TCP connection for each connected PTTH server.
2022-09-14 14:54:42 -05:00
(on company time) 1f398462b7 🐛 bug: sweep request_rendezvous for timed-out requests, too 2022-08-02 10:32:19 -05:00
(on company time) ce7ce42168 time out requests if the server doesn't rendezvous in 2 minutes 2022-08-01 13:15:53 -05:00
(on company time) c30747d954 add more metrics 2022-08-01 12:22:30 -05:00
(on company time) 77f842485f support Bearer auth 2022-07-29 17:13:53 -05:00
(on company time) fff278a494 report VmRSS in metrics 2022-07-29 17:02:25 -05:00
_ 7fe4444b65 linux-only memory measurement 2022-07-29 16:56:45 -05:00
_ 70eb419fdc 🐛 bug: move the metrics page behind auth 2022-07-29 16:45:10 -05:00
_ 96fdf642c3 test: start adding Bearer Auth so Prometheus can connect to the scraper API 2022-07-27 17:31:37 -05:00
_ 140434cf66 🚧 wip: placeholder metrics page 2022-07-27 17:31:28 -05:00
Trisha Lefler 80c43caf71 🔇 remove debugging stuff 2022-03-25 16:53:51 -05:00
7 changed files with 210 additions and 38 deletions

View File

@ -20,7 +20,7 @@ futures = "0.3.7"
futures-util = "0.3.8" futures-util = "0.3.8"
handlebars = "3.5.3" handlebars = "3.5.3"
http = "0.2.3" http = "0.2.3"
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] } hyper = { version = "0.14.20", features = ["http1", "http2", "server", "stream", "tcp"] }
itertools = "0.9.0" itertools = "0.9.0"
rand = "0.8.3" rand = "0.8.3"
rmp-serde = "0.15.5" rmp-serde = "0.15.5"

View File

@ -135,6 +135,11 @@ async fn handle_http_request (
let (tx, rx) = oneshot::channel (); let (tx, rx) = oneshot::channel ();
let tx = relay_state::ResponseRendezvous {
timeout: Instant::now () + Duration::from_secs (120),
tx,
};
let req_id = rusty_ulid::generate_ulid_string (); let req_id = rusty_ulid::generate_ulid_string ();
debug! ("Forwarding {}", req_id); debug! ("Forwarding {}", req_id);
@ -791,6 +796,45 @@ pub async fn run_relay (
}); });
} }
// Set a task to periodically sweep and time-out requests where the client
// and server are never going to rendezvous
let state_2 = Arc::clone (&state);
tokio::spawn (async move {
let mut interval = tokio::time::interval (Duration::from_secs (60));
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
loop {
use std::convert::TryFrom;
use rusty_ulid::Ulid;
interval.tick ().await;
{
let timeout_ms = Utc::now ().timestamp () - 120_000;
if let Ok (timeout_ms) = u64::try_from (timeout_ms) {
let timeout_ulid = Ulid::from_timestamp_with_rng (timeout_ms, &mut rand::thread_rng ()).to_string ();
let mut request_rendezvous = state_2.request_rendezvous.lock ().await;
request_rendezvous.iter_mut ()
.for_each (|(k, v)| {
match v {
RequestRendezvous::ParkedServer (_) => (),
RequestRendezvous::ParkedClients (requests) => requests.retain (|req| req.id.as_str () >= timeout_ulid.as_str ()),
}
});
}
}
{
let now = Instant::now ();
let response_rendezvous = state_2.response_rendezvous.read ().await;
response_rendezvous.retain (|_, v| v.timeout >= now);
}
}
});
let make_svc = make_service_fn (|_conn| { let make_svc = make_service_fn (|_conn| {
let state = state.clone (); let state = state.clone ();
let handlebars = handlebars.clone (); let handlebars = handlebars.clone ();
@ -849,7 +893,7 @@ pub async fn run_relay (
std::mem::swap (&mut swapped, &mut response_rendezvous); std::mem::swap (&mut swapped, &mut response_rendezvous);
for (_, sender) in swapped { for (_, sender) in swapped {
sender.send (Err (ShuttingDown)).ok (); sender.tx.send (Err (ShuttingDown)).ok ();
} }
let mut request_rendezvous = state.request_rendezvous.lock ().await; let mut request_rendezvous = state.request_rendezvous.lock ().await;

View File

@ -61,7 +61,10 @@ pub enum RequestRendezvous {
ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, ShuttingDownError>>), ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, ShuttingDownError>>),
} }
type ResponseRendezvous = oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>; pub (crate) struct ResponseRendezvous {
pub timeout: Instant,
pub tx: oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>,
}
#[derive (Clone)] #[derive (Clone)]
pub struct ServerStatus { pub struct ServerStatus {

View File

@ -119,6 +119,21 @@ pub async fn v1_server_list (state: &Relay)
} }
} }
fn get_api_key (headers: &hyper::HeaderMap) -> Option <&str>
{
if let Some (key) = headers.get ("X-ApiKey").and_then (|v| v.to_str ().ok ()) {
return Some (key);
}
if let Some (s) = headers.get ("Authorization").and_then (|v| v.to_str ().ok ()) {
if let Some (key) = s.strip_prefix ("Bearer ") {
return Some (key);
}
}
None
}
#[instrument (level = "trace", skip (req, state))] #[instrument (level = "trace", skip (req, state))]
async fn api_v1 ( async fn api_v1 (
req: Request <Body>, req: Request <Body>,
@ -132,7 +147,7 @@ async fn api_v1 (
AuditEvent, AuditEvent,
}; };
let api_key = req.headers ().get ("X-ApiKey"); let api_key = get_api_key (req.headers ());
let api_key = match api_key { let api_key = match api_key {
None => return Ok (error_reply (StatusCode::FORBIDDEN, strings::NO_API_KEY)?), None => return Ok (error_reply (StatusCode::FORBIDDEN, strings::NO_API_KEY)?),
@ -176,7 +191,10 @@ async fn api_v1 (
path: path_rest.to_string (), path: path_rest.to_string (),
})).await; })).await;
if path_rest == "test" { if path_rest == "metrics" {
Ok (metrics (req, state).await?)
}
else if path_rest == "test" {
Ok (error_reply (StatusCode::OK, "You're valid!")?) Ok (error_reply (StatusCode::OK, "You're valid!")?)
} }
else if path_rest == "server_list" { else if path_rest == "server_list" {
@ -205,6 +223,65 @@ async fn api_v1 (
} }
} }
#[instrument (level = "trace", skip (req, state))]
async fn metrics (
req: Request <Body>,
state: &Relay,
)
-> Result <Response <Body>, RequestError>
{
let mut s = String::with_capacity (4 * 1_024);
let mut push_metric = |name, help, kind, value| {
if let Some (help) = help {
s.push_str (format! ("# HELP {} {}\n", name, help).as_str ());
}
s.push_str (format! ("# TYPE {} {}\n", name, kind).as_str ());
s.push_str (format! ("{} {}\n", name, value).as_str ());
};
let request_rendezvous_count = {
let g = state.request_rendezvous.lock ().await;
g.len ()
};
let server_status_count;
let connected_server_count;
let now = Utc::now ();
{
let g = state.server_status.lock ().await;
server_status_count = g.len ();
connected_server_count = g.iter ()
.filter (|(_, s)| now - s.last_seen < chrono::Duration::seconds (60))
.count ();
}
let response_rendezvous_count = {
let g = state.response_rendezvous.read ().await;
g.len ()
};
push_metric ("request_rendezvous_count", None, "gauge", request_rendezvous_count.to_string ());
push_metric ("server_status_count", None, "gauge", server_status_count.to_string ());
push_metric ("connected_server_count", None, "gauge", connected_server_count.to_string ());
push_metric ("response_rendezvous_count", None, "gauge", response_rendezvous_count.to_string ());
#[cfg (target_os = "linux")]
{
if let Some (rss) = tokio::fs::read_to_string ("/proc/self/status").await
.ok ()
.and_then (|s| get_rss_from_status (s.as_str ()))
{
push_metric ("relay_vm_rss", Some ("VmRSS of the relay process, in kB"), "gauge", rss.to_string ());
}
}
Ok (Response::builder ()
.body (Body::from (s))?)
}
#[instrument (level = "trace", skip (req, state))] #[instrument (level = "trace", skip (req, state))]
pub async fn handle ( pub async fn handle (
req: Request <Body>, req: Request <Body>,
@ -230,6 +307,20 @@ pub async fn handle (
} }
} }
fn get_rss_from_status (proc_status: &str) -> Option <u64>
{
use std::str::FromStr;
for line in proc_status.lines () {
if let Some (rest) = line.strip_prefix ("VmRSS:\t").and_then (|s| s.strip_suffix (" kB"))
{
return u64::from_str (rest.trim_start ()).ok ();
}
}
None
}
#[cfg (test)] #[cfg (test)]
mod tests { mod tests {
use std::{ use std::{
@ -246,8 +337,9 @@ mod tests {
struct TestCase { struct TestCase {
// Inputs // Inputs
path_rest: &'static str, path_rest: &'static str,
auth_header: Option <&'static str>,
valid_key: Option <&'static str>, valid_key: Option <&'static str>,
input_key: Option <&'static str>, x_api_key: Option <&'static str>,
// Expected // Expected
expected_status: StatusCode, expected_status: StatusCode,
@ -268,9 +360,15 @@ mod tests {
x x
} }
fn input_key (&self, v: Option <&'static str>) -> Self { fn auth_header (&self, v: Option <&'static str>) -> Self {
let mut x = self.clone (); let mut x = self.clone ();
x.input_key = v; x.auth_header = v;
x
}
fn x_api_key (&self, v: Option <&'static str>) -> Self {
let mut x = self.clone ();
x.x_api_key = v;
x x
} }
@ -298,13 +396,16 @@ mod tests {
.expected_body (format! ("{}\n", body)) .expected_body (format! ("{}\n", body))
} }
async fn test (&self) { async fn test (&self, name: &str) {
let mut input = Request::builder () let mut input = Request::builder ()
.method ("GET") .method ("GET")
.uri (format! ("http://127.0.0.1:4000/scraper/{}", self.path_rest)); .uri (format! ("http://127.0.0.1:4000/scraper/{}", self.path_rest));
if let Some (input_key) = self.input_key { if let Some (auth_header) = self.auth_header {
input = input.header ("X-ApiKey", input_key); input = input.header ("Authorization", auth_header);
}
if let Some (x_api_key) = self.x_api_key {
input = input.header ("X-ApiKey", x_api_key);
} }
let input = input.body (Body::empty ()).unwrap (); let input = input.body (Body::empty ()).unwrap ();
@ -331,15 +432,15 @@ mod tests {
expected_headers.insert (*key, (*value).try_into ().expect ("Couldn't convert header value")); expected_headers.insert (*key, (*value).try_into ().expect ("Couldn't convert header value"));
} }
assert_eq! (actual_head.status, self.expected_status); assert_eq! (actual_head.status, self.expected_status, "{}", name);
assert_eq! (actual_head.headers, expected_headers); assert_eq! (actual_head.headers, expected_headers, "{}", name);
let actual_body = hyper::body::to_bytes (actual_body).await; let actual_body = hyper::body::to_bytes (actual_body).await;
let actual_body = actual_body.expect ("Body should be convertible to bytes"); let actual_body = actual_body.expect ("Body should be convertible to bytes");
let actual_body = actual_body.to_vec (); let actual_body = actual_body.to_vec ();
let actual_body = String::from_utf8 (actual_body).expect ("Body should be UTF-8"); let actual_body = String::from_utf8 (actual_body).expect ("Body should be UTF-8");
assert_eq! (actual_body, self.expected_body); assert_eq! (actual_body, self.expected_body, "{}", name);
} }
} }
@ -351,7 +452,8 @@ mod tests {
let base_case = TestCase { let base_case = TestCase {
path_rest: "v1/test", path_rest: "v1/test",
valid_key: Some ("bogus"), valid_key: Some ("bogus"),
input_key: Some ("bogus"), auth_header: None,
x_api_key: Some ("bogus"),
expected_status: StatusCode::OK, expected_status: StatusCode::OK,
expected_headers: vec! [ expected_headers: vec! [
("content-type", "text/plain"), ("content-type", "text/plain"),
@ -359,21 +461,47 @@ mod tests {
expected_body: "You're valid!\n".to_string (), expected_body: "You're valid!\n".to_string (),
}; };
for case in &[ base_case
base_case.clone (), .test ("00").await;
base_case.path_rest ("v9999/test")
.expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_VERSION), base_case
base_case.valid_key (None) .path_rest ("v9999/test")
.expected (StatusCode::FORBIDDEN, strings::FORBIDDEN), .expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_VERSION)
base_case.input_key (Some ("borgus")) .test ("01").await;
.expected (StatusCode::FORBIDDEN, strings::FORBIDDEN),
base_case.path_rest ("v1/toast") base_case
.expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_ENDPOINT), .valid_key (None)
base_case.input_key (None) .expected (StatusCode::FORBIDDEN, strings::FORBIDDEN)
.expected (StatusCode::FORBIDDEN, strings::NO_API_KEY), .test ("02").await;
] {
case.test ().await; base_case
} .x_api_key (Some ("borgus"))
.expected (StatusCode::FORBIDDEN, strings::FORBIDDEN)
.test ("03").await;
base_case
.path_rest ("v1/toast")
.expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_ENDPOINT)
.test ("04").await;
base_case
.x_api_key (None)
.expected (StatusCode::FORBIDDEN, strings::NO_API_KEY)
.test ("05").await;
base_case
.x_api_key (None)
.auth_header (Some ("Bearer bogus"))
.expected (StatusCode::OK, "You're valid!")
.test ("06").await;
}); });
} }
#[test]
fn rss () {
let input = "VmHWM: 584 kB\nVmRSS: 584 kB\nRssAnon: 68 kB\n";
assert_eq! (get_rss_from_status (input), Some (584));
}
} }

View File

@ -188,7 +188,7 @@ pub async fn handle_response (
}; };
// UKAUFFY4 (Send half) // UKAUFFY4 (Send half)
if tx.send (Ok ((resp_parts, body))).is_err () { if tx.tx.send (Ok ((resp_parts, body))).is_err () {
let msg = "Failed to connect to client"; let msg = "Failed to connect to client";
error! (msg); error! (msg);
return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?); return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?);

View File

@ -47,13 +47,12 @@ fn main ()
let app = app::App::default(); let app = app::App::default();
let mut wind = Window::new (100, 100, 500, 180, "PTTH server"); let mut wind = Window::new (100, 100, 500, 180, "PTTH server");
let config_file_opt = match ptth_server::load_toml::load::<ConfigFile, _> ("./config/ptth_server.toml") let config_file_opt = match ptth_server::load_toml::load::<ConfigFile, _> ("./config/ptth_server.toml") {
{
Ok (x) => Some (x), Ok (x) => Some (x),
Err (e) => { Err (e) => {
eprintln! ("Error in config TOML: {:?}", e); eprintln! ("Error in `./config/ptth_server.toml`: {:?}", e);
None None
} },
}; };
let (hit_tx, mut hit_rx) = mpsc::channel (1); let (hit_tx, mut hit_rx) = mpsc::channel (1);
@ -61,9 +60,7 @@ fn main ()
let fltk_tx = fltk_tx; let fltk_tx = fltk_tx;
rt.spawn (async move { rt.spawn (async move {
eprintln! ("Entering channel task");
while hit_rx.recv ().await.is_some () { while hit_rx.recv ().await.is_some () {
eprintln! ("fltk_tx");
fltk_tx.send (Message::Hit); fltk_tx.send (Message::Hit);
} }
}); });

View File

@ -115,7 +115,7 @@ AIABAACAAQAAgAEAAIABAACAAQAAgAEAAIABAACAAQAA" rel="icon" type="image/x-icon" />
{{#each entries}} {{#each entries}}
<tr> <tr>
<td><a class="entry" href="{{this.encoded_file_name}}{{this.trailing_slash}}"> <td><a class="entry" href="./{{this.encoded_file_name}}{{this.trailing_slash}}">
{{this.icon}} {{this.file_name}}{{this.trailing_slash}}</a></td> {{this.icon}} {{this.file_name}}{{this.trailing_slash}}</a></td>
<td><span class="grey">{{this.size}}</span></td> <td><span class="grey">{{this.size}}</span></td>
</tr> </tr>