Compare commits
15 Commits
5843432bfe
...
292ade0f46
Author | SHA1 | Date |
---|---|---|
_ | 292ade0f46 | |
Trisha Lefler | 7b9c6c7dc4 | |
Trisha Lefler | 89d4fdbcc3 | |
(on company time) | 5b6adc1305 | |
(on company time) | 863bbe18e4 | |
(on company time) | 1f398462b7 | |
(on company time) | ce7ce42168 | |
(on company time) | c30747d954 | |
(on company time) | 77f842485f | |
(on company time) | fff278a494 | |
_ | 7fe4444b65 | |
_ | 70eb419fdc | |
_ | 96fdf642c3 | |
_ | 140434cf66 | |
Trisha Lefler | 80c43caf71 |
|
@ -20,7 +20,7 @@ futures = "0.3.7"
|
||||||
futures-util = "0.3.8"
|
futures-util = "0.3.8"
|
||||||
handlebars = "3.5.3"
|
handlebars = "3.5.3"
|
||||||
http = "0.2.3"
|
http = "0.2.3"
|
||||||
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] }
|
hyper = { version = "0.14.20", features = ["http1", "http2", "server", "stream", "tcp"] }
|
||||||
itertools = "0.9.0"
|
itertools = "0.9.0"
|
||||||
rand = "0.8.3"
|
rand = "0.8.3"
|
||||||
rmp-serde = "0.15.5"
|
rmp-serde = "0.15.5"
|
||||||
|
|
|
@ -135,6 +135,11 @@ async fn handle_http_request (
|
||||||
|
|
||||||
let (tx, rx) = oneshot::channel ();
|
let (tx, rx) = oneshot::channel ();
|
||||||
|
|
||||||
|
let tx = relay_state::ResponseRendezvous {
|
||||||
|
timeout: Instant::now () + Duration::from_secs (120),
|
||||||
|
tx,
|
||||||
|
};
|
||||||
|
|
||||||
let req_id = rusty_ulid::generate_ulid_string ();
|
let req_id = rusty_ulid::generate_ulid_string ();
|
||||||
|
|
||||||
debug! ("Forwarding {}", req_id);
|
debug! ("Forwarding {}", req_id);
|
||||||
|
@ -791,6 +796,45 @@ pub async fn run_relay (
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set a task to periodically sweep and time-out requests where the client
|
||||||
|
// and server are never going to rendezvous
|
||||||
|
|
||||||
|
let state_2 = Arc::clone (&state);
|
||||||
|
tokio::spawn (async move {
|
||||||
|
let mut interval = tokio::time::interval (Duration::from_secs (60));
|
||||||
|
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
|
use rusty_ulid::Ulid;
|
||||||
|
|
||||||
|
interval.tick ().await;
|
||||||
|
|
||||||
|
{
|
||||||
|
let timeout_ms = Utc::now ().timestamp () - 120_000;
|
||||||
|
if let Ok (timeout_ms) = u64::try_from (timeout_ms) {
|
||||||
|
let timeout_ulid = Ulid::from_timestamp_with_rng (timeout_ms, &mut rand::thread_rng ()).to_string ();
|
||||||
|
|
||||||
|
let mut request_rendezvous = state_2.request_rendezvous.lock ().await;
|
||||||
|
request_rendezvous.iter_mut ()
|
||||||
|
.for_each (|(k, v)| {
|
||||||
|
match v {
|
||||||
|
RequestRendezvous::ParkedServer (_) => (),
|
||||||
|
RequestRendezvous::ParkedClients (requests) => requests.retain (|req| req.id.as_str () >= timeout_ulid.as_str ()),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let now = Instant::now ();
|
||||||
|
let response_rendezvous = state_2.response_rendezvous.read ().await;
|
||||||
|
response_rendezvous.retain (|_, v| v.timeout >= now);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let make_svc = make_service_fn (|_conn| {
|
let make_svc = make_service_fn (|_conn| {
|
||||||
let state = state.clone ();
|
let state = state.clone ();
|
||||||
let handlebars = handlebars.clone ();
|
let handlebars = handlebars.clone ();
|
||||||
|
@ -849,7 +893,7 @@ pub async fn run_relay (
|
||||||
std::mem::swap (&mut swapped, &mut response_rendezvous);
|
std::mem::swap (&mut swapped, &mut response_rendezvous);
|
||||||
|
|
||||||
for (_, sender) in swapped {
|
for (_, sender) in swapped {
|
||||||
sender.send (Err (ShuttingDown)).ok ();
|
sender.tx.send (Err (ShuttingDown)).ok ();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut request_rendezvous = state.request_rendezvous.lock ().await;
|
let mut request_rendezvous = state.request_rendezvous.lock ().await;
|
||||||
|
|
|
@ -61,7 +61,10 @@ pub enum RequestRendezvous {
|
||||||
ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, ShuttingDownError>>),
|
ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, ShuttingDownError>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
type ResponseRendezvous = oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>;
|
pub (crate) struct ResponseRendezvous {
|
||||||
|
pub timeout: Instant,
|
||||||
|
pub tx: oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive (Clone)]
|
#[derive (Clone)]
|
||||||
pub struct ServerStatus {
|
pub struct ServerStatus {
|
||||||
|
|
|
@ -119,6 +119,21 @@ pub async fn v1_server_list (state: &Relay)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_api_key (headers: &hyper::HeaderMap) -> Option <&str>
|
||||||
|
{
|
||||||
|
if let Some (key) = headers.get ("X-ApiKey").and_then (|v| v.to_str ().ok ()) {
|
||||||
|
return Some (key);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some (s) = headers.get ("Authorization").and_then (|v| v.to_str ().ok ()) {
|
||||||
|
if let Some (key) = s.strip_prefix ("Bearer ") {
|
||||||
|
return Some (key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument (level = "trace", skip (req, state))]
|
#[instrument (level = "trace", skip (req, state))]
|
||||||
async fn api_v1 (
|
async fn api_v1 (
|
||||||
req: Request <Body>,
|
req: Request <Body>,
|
||||||
|
@ -132,7 +147,7 @@ async fn api_v1 (
|
||||||
AuditEvent,
|
AuditEvent,
|
||||||
};
|
};
|
||||||
|
|
||||||
let api_key = req.headers ().get ("X-ApiKey");
|
let api_key = get_api_key (req.headers ());
|
||||||
|
|
||||||
let api_key = match api_key {
|
let api_key = match api_key {
|
||||||
None => return Ok (error_reply (StatusCode::FORBIDDEN, strings::NO_API_KEY)?),
|
None => return Ok (error_reply (StatusCode::FORBIDDEN, strings::NO_API_KEY)?),
|
||||||
|
@ -176,7 +191,10 @@ async fn api_v1 (
|
||||||
path: path_rest.to_string (),
|
path: path_rest.to_string (),
|
||||||
})).await;
|
})).await;
|
||||||
|
|
||||||
if path_rest == "test" {
|
if path_rest == "metrics" {
|
||||||
|
Ok (metrics (req, state).await?)
|
||||||
|
}
|
||||||
|
else if path_rest == "test" {
|
||||||
Ok (error_reply (StatusCode::OK, "You're valid!")?)
|
Ok (error_reply (StatusCode::OK, "You're valid!")?)
|
||||||
}
|
}
|
||||||
else if path_rest == "server_list" {
|
else if path_rest == "server_list" {
|
||||||
|
@ -205,6 +223,65 @@ async fn api_v1 (
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument (level = "trace", skip (req, state))]
|
||||||
|
async fn metrics (
|
||||||
|
req: Request <Body>,
|
||||||
|
state: &Relay,
|
||||||
|
)
|
||||||
|
-> Result <Response <Body>, RequestError>
|
||||||
|
{
|
||||||
|
let mut s = String::with_capacity (4 * 1_024);
|
||||||
|
|
||||||
|
let mut push_metric = |name, help, kind, value| {
|
||||||
|
if let Some (help) = help {
|
||||||
|
s.push_str (format! ("# HELP {} {}\n", name, help).as_str ());
|
||||||
|
}
|
||||||
|
s.push_str (format! ("# TYPE {} {}\n", name, kind).as_str ());
|
||||||
|
s.push_str (format! ("{} {}\n", name, value).as_str ());
|
||||||
|
};
|
||||||
|
|
||||||
|
let request_rendezvous_count = {
|
||||||
|
let g = state.request_rendezvous.lock ().await;
|
||||||
|
g.len ()
|
||||||
|
};
|
||||||
|
|
||||||
|
let server_status_count;
|
||||||
|
let connected_server_count;
|
||||||
|
|
||||||
|
let now = Utc::now ();
|
||||||
|
|
||||||
|
{
|
||||||
|
let g = state.server_status.lock ().await;
|
||||||
|
server_status_count = g.len ();
|
||||||
|
connected_server_count = g.iter ()
|
||||||
|
.filter (|(_, s)| now - s.last_seen < chrono::Duration::seconds (60))
|
||||||
|
.count ();
|
||||||
|
}
|
||||||
|
|
||||||
|
let response_rendezvous_count = {
|
||||||
|
let g = state.response_rendezvous.read ().await;
|
||||||
|
g.len ()
|
||||||
|
};
|
||||||
|
|
||||||
|
push_metric ("request_rendezvous_count", None, "gauge", request_rendezvous_count.to_string ());
|
||||||
|
push_metric ("server_status_count", None, "gauge", server_status_count.to_string ());
|
||||||
|
push_metric ("connected_server_count", None, "gauge", connected_server_count.to_string ());
|
||||||
|
push_metric ("response_rendezvous_count", None, "gauge", response_rendezvous_count.to_string ());
|
||||||
|
|
||||||
|
#[cfg (target_os = "linux")]
|
||||||
|
{
|
||||||
|
if let Some (rss) = tokio::fs::read_to_string ("/proc/self/status").await
|
||||||
|
.ok ()
|
||||||
|
.and_then (|s| get_rss_from_status (s.as_str ()))
|
||||||
|
{
|
||||||
|
push_metric ("relay_vm_rss", Some ("VmRSS of the relay process, in kB"), "gauge", rss.to_string ());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok (Response::builder ()
|
||||||
|
.body (Body::from (s))?)
|
||||||
|
}
|
||||||
|
|
||||||
#[instrument (level = "trace", skip (req, state))]
|
#[instrument (level = "trace", skip (req, state))]
|
||||||
pub async fn handle (
|
pub async fn handle (
|
||||||
req: Request <Body>,
|
req: Request <Body>,
|
||||||
|
@ -230,6 +307,20 @@ pub async fn handle (
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_rss_from_status (proc_status: &str) -> Option <u64>
|
||||||
|
{
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
for line in proc_status.lines () {
|
||||||
|
if let Some (rest) = line.strip_prefix ("VmRSS:\t").and_then (|s| s.strip_suffix (" kB"))
|
||||||
|
{
|
||||||
|
return u64::from_str (rest.trim_start ()).ok ();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg (test)]
|
#[cfg (test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::{
|
use std::{
|
||||||
|
@ -246,8 +337,9 @@ mod tests {
|
||||||
struct TestCase {
|
struct TestCase {
|
||||||
// Inputs
|
// Inputs
|
||||||
path_rest: &'static str,
|
path_rest: &'static str,
|
||||||
|
auth_header: Option <&'static str>,
|
||||||
valid_key: Option <&'static str>,
|
valid_key: Option <&'static str>,
|
||||||
input_key: Option <&'static str>,
|
x_api_key: Option <&'static str>,
|
||||||
|
|
||||||
// Expected
|
// Expected
|
||||||
expected_status: StatusCode,
|
expected_status: StatusCode,
|
||||||
|
@ -268,9 +360,15 @@ mod tests {
|
||||||
x
|
x
|
||||||
}
|
}
|
||||||
|
|
||||||
fn input_key (&self, v: Option <&'static str>) -> Self {
|
fn auth_header (&self, v: Option <&'static str>) -> Self {
|
||||||
let mut x = self.clone ();
|
let mut x = self.clone ();
|
||||||
x.input_key = v;
|
x.auth_header = v;
|
||||||
|
x
|
||||||
|
}
|
||||||
|
|
||||||
|
fn x_api_key (&self, v: Option <&'static str>) -> Self {
|
||||||
|
let mut x = self.clone ();
|
||||||
|
x.x_api_key = v;
|
||||||
x
|
x
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -298,13 +396,16 @@ mod tests {
|
||||||
.expected_body (format! ("{}\n", body))
|
.expected_body (format! ("{}\n", body))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn test (&self) {
|
async fn test (&self, name: &str) {
|
||||||
let mut input = Request::builder ()
|
let mut input = Request::builder ()
|
||||||
.method ("GET")
|
.method ("GET")
|
||||||
.uri (format! ("http://127.0.0.1:4000/scraper/{}", self.path_rest));
|
.uri (format! ("http://127.0.0.1:4000/scraper/{}", self.path_rest));
|
||||||
|
|
||||||
if let Some (input_key) = self.input_key {
|
if let Some (auth_header) = self.auth_header {
|
||||||
input = input.header ("X-ApiKey", input_key);
|
input = input.header ("Authorization", auth_header);
|
||||||
|
}
|
||||||
|
if let Some (x_api_key) = self.x_api_key {
|
||||||
|
input = input.header ("X-ApiKey", x_api_key);
|
||||||
}
|
}
|
||||||
let input = input.body (Body::empty ()).unwrap ();
|
let input = input.body (Body::empty ()).unwrap ();
|
||||||
|
|
||||||
|
@ -331,15 +432,15 @@ mod tests {
|
||||||
expected_headers.insert (*key, (*value).try_into ().expect ("Couldn't convert header value"));
|
expected_headers.insert (*key, (*value).try_into ().expect ("Couldn't convert header value"));
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq! (actual_head.status, self.expected_status);
|
assert_eq! (actual_head.status, self.expected_status, "{}", name);
|
||||||
assert_eq! (actual_head.headers, expected_headers);
|
assert_eq! (actual_head.headers, expected_headers, "{}", name);
|
||||||
|
|
||||||
let actual_body = hyper::body::to_bytes (actual_body).await;
|
let actual_body = hyper::body::to_bytes (actual_body).await;
|
||||||
let actual_body = actual_body.expect ("Body should be convertible to bytes");
|
let actual_body = actual_body.expect ("Body should be convertible to bytes");
|
||||||
let actual_body = actual_body.to_vec ();
|
let actual_body = actual_body.to_vec ();
|
||||||
let actual_body = String::from_utf8 (actual_body).expect ("Body should be UTF-8");
|
let actual_body = String::from_utf8 (actual_body).expect ("Body should be UTF-8");
|
||||||
|
|
||||||
assert_eq! (actual_body, self.expected_body);
|
assert_eq! (actual_body, self.expected_body, "{}", name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -351,7 +452,8 @@ mod tests {
|
||||||
let base_case = TestCase {
|
let base_case = TestCase {
|
||||||
path_rest: "v1/test",
|
path_rest: "v1/test",
|
||||||
valid_key: Some ("bogus"),
|
valid_key: Some ("bogus"),
|
||||||
input_key: Some ("bogus"),
|
auth_header: None,
|
||||||
|
x_api_key: Some ("bogus"),
|
||||||
expected_status: StatusCode::OK,
|
expected_status: StatusCode::OK,
|
||||||
expected_headers: vec! [
|
expected_headers: vec! [
|
||||||
("content-type", "text/plain"),
|
("content-type", "text/plain"),
|
||||||
|
@ -359,21 +461,47 @@ mod tests {
|
||||||
expected_body: "You're valid!\n".to_string (),
|
expected_body: "You're valid!\n".to_string (),
|
||||||
};
|
};
|
||||||
|
|
||||||
for case in &[
|
base_case
|
||||||
base_case.clone (),
|
.test ("00").await;
|
||||||
base_case.path_rest ("v9999/test")
|
|
||||||
.expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_VERSION),
|
base_case
|
||||||
base_case.valid_key (None)
|
.path_rest ("v9999/test")
|
||||||
.expected (StatusCode::FORBIDDEN, strings::FORBIDDEN),
|
.expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_VERSION)
|
||||||
base_case.input_key (Some ("borgus"))
|
.test ("01").await;
|
||||||
.expected (StatusCode::FORBIDDEN, strings::FORBIDDEN),
|
|
||||||
base_case.path_rest ("v1/toast")
|
base_case
|
||||||
.expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_ENDPOINT),
|
.valid_key (None)
|
||||||
base_case.input_key (None)
|
.expected (StatusCode::FORBIDDEN, strings::FORBIDDEN)
|
||||||
.expected (StatusCode::FORBIDDEN, strings::NO_API_KEY),
|
.test ("02").await;
|
||||||
] {
|
|
||||||
case.test ().await;
|
base_case
|
||||||
}
|
.x_api_key (Some ("borgus"))
|
||||||
|
.expected (StatusCode::FORBIDDEN, strings::FORBIDDEN)
|
||||||
|
.test ("03").await;
|
||||||
|
|
||||||
|
base_case
|
||||||
|
.path_rest ("v1/toast")
|
||||||
|
.expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_ENDPOINT)
|
||||||
|
.test ("04").await;
|
||||||
|
|
||||||
|
base_case
|
||||||
|
.x_api_key (None)
|
||||||
|
.expected (StatusCode::FORBIDDEN, strings::NO_API_KEY)
|
||||||
|
.test ("05").await;
|
||||||
|
|
||||||
|
base_case
|
||||||
|
.x_api_key (None)
|
||||||
|
.auth_header (Some ("Bearer bogus"))
|
||||||
|
.expected (StatusCode::OK, "You're valid!")
|
||||||
|
.test ("06").await;
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rss () {
|
||||||
|
let input = "VmHWM: 584 kB\nVmRSS: 584 kB\nRssAnon: 68 kB\n";
|
||||||
|
|
||||||
|
assert_eq! (get_rss_from_status (input), Some (584));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -188,7 +188,7 @@ pub async fn handle_response (
|
||||||
};
|
};
|
||||||
|
|
||||||
// UKAUFFY4 (Send half)
|
// UKAUFFY4 (Send half)
|
||||||
if tx.send (Ok ((resp_parts, body))).is_err () {
|
if tx.tx.send (Ok ((resp_parts, body))).is_err () {
|
||||||
let msg = "Failed to connect to client";
|
let msg = "Failed to connect to client";
|
||||||
error! (msg);
|
error! (msg);
|
||||||
return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?);
|
return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?);
|
||||||
|
|
|
@ -47,13 +47,12 @@ fn main ()
|
||||||
let app = app::App::default();
|
let app = app::App::default();
|
||||||
let mut wind = Window::new (100, 100, 500, 180, "PTTH server");
|
let mut wind = Window::new (100, 100, 500, 180, "PTTH server");
|
||||||
|
|
||||||
let config_file_opt = match ptth_server::load_toml::load::<ConfigFile, _> ("./config/ptth_server.toml")
|
let config_file_opt = match ptth_server::load_toml::load::<ConfigFile, _> ("./config/ptth_server.toml") {
|
||||||
{
|
|
||||||
Ok (x) => Some (x),
|
Ok (x) => Some (x),
|
||||||
Err (e) => {
|
Err (e) => {
|
||||||
eprintln! ("Error in config TOML: {:?}", e);
|
eprintln! ("Error in `./config/ptth_server.toml`: {:?}", e);
|
||||||
None
|
None
|
||||||
}
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let (hit_tx, mut hit_rx) = mpsc::channel (1);
|
let (hit_tx, mut hit_rx) = mpsc::channel (1);
|
||||||
|
@ -61,9 +60,7 @@ fn main ()
|
||||||
let fltk_tx = fltk_tx;
|
let fltk_tx = fltk_tx;
|
||||||
|
|
||||||
rt.spawn (async move {
|
rt.spawn (async move {
|
||||||
eprintln! ("Entering channel task");
|
|
||||||
while hit_rx.recv ().await.is_some () {
|
while hit_rx.recv ().await.is_some () {
|
||||||
eprintln! ("fltk_tx");
|
|
||||||
fltk_tx.send (Message::Hit);
|
fltk_tx.send (Message::Hit);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -115,7 +115,7 @@ AIABAACAAQAAgAEAAIABAACAAQAAgAEAAIABAACAAQAA" rel="icon" type="image/x-icon" />
|
||||||
|
|
||||||
{{#each entries}}
|
{{#each entries}}
|
||||||
<tr>
|
<tr>
|
||||||
<td><a class="entry" href="{{this.encoded_file_name}}{{this.trailing_slash}}">
|
<td><a class="entry" href="./{{this.encoded_file_name}}{{this.trailing_slash}}">
|
||||||
{{this.icon}} {{this.file_name}}{{this.trailing_slash}}</a></td>
|
{{this.icon}} {{this.file_name}}{{this.trailing_slash}}</a></td>
|
||||||
<td><span class="grey">{{this.size}}</span></td>
|
<td><span class="grey">{{this.size}}</span></td>
|
||||||
</tr>
|
</tr>
|
||||||
|
|
Loading…
Reference in New Issue