Merge /run/media/user/d3de9062-a284-4b94-8900-0c416f57f9ac/projects/ptth

main
_ 2022-10-26 03:50:51 +00:00
commit 292ade0f46
7 changed files with 210 additions and 38 deletions

View File

@ -20,7 +20,7 @@ futures = "0.3.7"
futures-util = "0.3.8"
handlebars = "3.5.3"
http = "0.2.3"
hyper = { version = "0.14.4", features = ["http1", "server", "stream", "tcp"] }
hyper = { version = "0.14.20", features = ["http1", "http2", "server", "stream", "tcp"] }
itertools = "0.9.0"
rand = "0.8.3"
rmp-serde = "0.15.5"

View File

@ -135,6 +135,11 @@ async fn handle_http_request (
let (tx, rx) = oneshot::channel ();
let tx = relay_state::ResponseRendezvous {
timeout: Instant::now () + Duration::from_secs (120),
tx,
};
let req_id = rusty_ulid::generate_ulid_string ();
debug! ("Forwarding {}", req_id);
@ -791,6 +796,45 @@ pub async fn run_relay (
});
}
// Set a task to periodically sweep and time-out requests where the client
// and server are never going to rendezvous
let state_2 = Arc::clone (&state);
tokio::spawn (async move {
let mut interval = tokio::time::interval (Duration::from_secs (60));
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
loop {
use std::convert::TryFrom;
use rusty_ulid::Ulid;
interval.tick ().await;
{
let timeout_ms = Utc::now ().timestamp () - 120_000;
if let Ok (timeout_ms) = u64::try_from (timeout_ms) {
let timeout_ulid = Ulid::from_timestamp_with_rng (timeout_ms, &mut rand::thread_rng ()).to_string ();
let mut request_rendezvous = state_2.request_rendezvous.lock ().await;
request_rendezvous.iter_mut ()
.for_each (|(k, v)| {
match v {
RequestRendezvous::ParkedServer (_) => (),
RequestRendezvous::ParkedClients (requests) => requests.retain (|req| req.id.as_str () >= timeout_ulid.as_str ()),
}
});
}
}
{
let now = Instant::now ();
let response_rendezvous = state_2.response_rendezvous.read ().await;
response_rendezvous.retain (|_, v| v.timeout >= now);
}
}
});
let make_svc = make_service_fn (|_conn| {
let state = state.clone ();
let handlebars = handlebars.clone ();
@ -849,7 +893,7 @@ pub async fn run_relay (
std::mem::swap (&mut swapped, &mut response_rendezvous);
for (_, sender) in swapped {
sender.send (Err (ShuttingDown)).ok ();
sender.tx.send (Err (ShuttingDown)).ok ();
}
let mut request_rendezvous = state.request_rendezvous.lock ().await;

View File

@ -61,7 +61,10 @@ pub enum RequestRendezvous {
ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, ShuttingDownError>>),
}
type ResponseRendezvous = oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>;
pub (crate) struct ResponseRendezvous {
pub timeout: Instant,
pub tx: oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>,
}
#[derive (Clone)]
pub struct ServerStatus {

View File

@ -119,6 +119,21 @@ pub async fn v1_server_list (state: &Relay)
}
}
fn get_api_key (headers: &hyper::HeaderMap) -> Option <&str>
{
if let Some (key) = headers.get ("X-ApiKey").and_then (|v| v.to_str ().ok ()) {
return Some (key);
}
if let Some (s) = headers.get ("Authorization").and_then (|v| v.to_str ().ok ()) {
if let Some (key) = s.strip_prefix ("Bearer ") {
return Some (key);
}
}
None
}
#[instrument (level = "trace", skip (req, state))]
async fn api_v1 (
req: Request <Body>,
@ -132,7 +147,7 @@ async fn api_v1 (
AuditEvent,
};
let api_key = req.headers ().get ("X-ApiKey");
let api_key = get_api_key (req.headers ());
let api_key = match api_key {
None => return Ok (error_reply (StatusCode::FORBIDDEN, strings::NO_API_KEY)?),
@ -176,7 +191,10 @@ async fn api_v1 (
path: path_rest.to_string (),
})).await;
if path_rest == "test" {
if path_rest == "metrics" {
Ok (metrics (req, state).await?)
}
else if path_rest == "test" {
Ok (error_reply (StatusCode::OK, "You're valid!")?)
}
else if path_rest == "server_list" {
@ -205,6 +223,65 @@ async fn api_v1 (
}
}
#[instrument (level = "trace", skip (req, state))]
async fn metrics (
req: Request <Body>,
state: &Relay,
)
-> Result <Response <Body>, RequestError>
{
let mut s = String::with_capacity (4 * 1_024);
let mut push_metric = |name, help, kind, value| {
if let Some (help) = help {
s.push_str (format! ("# HELP {} {}\n", name, help).as_str ());
}
s.push_str (format! ("# TYPE {} {}\n", name, kind).as_str ());
s.push_str (format! ("{} {}\n", name, value).as_str ());
};
let request_rendezvous_count = {
let g = state.request_rendezvous.lock ().await;
g.len ()
};
let server_status_count;
let connected_server_count;
let now = Utc::now ();
{
let g = state.server_status.lock ().await;
server_status_count = g.len ();
connected_server_count = g.iter ()
.filter (|(_, s)| now - s.last_seen < chrono::Duration::seconds (60))
.count ();
}
let response_rendezvous_count = {
let g = state.response_rendezvous.read ().await;
g.len ()
};
push_metric ("request_rendezvous_count", None, "gauge", request_rendezvous_count.to_string ());
push_metric ("server_status_count", None, "gauge", server_status_count.to_string ());
push_metric ("connected_server_count", None, "gauge", connected_server_count.to_string ());
push_metric ("response_rendezvous_count", None, "gauge", response_rendezvous_count.to_string ());
#[cfg (target_os = "linux")]
{
if let Some (rss) = tokio::fs::read_to_string ("/proc/self/status").await
.ok ()
.and_then (|s| get_rss_from_status (s.as_str ()))
{
push_metric ("relay_vm_rss", Some ("VmRSS of the relay process, in kB"), "gauge", rss.to_string ());
}
}
Ok (Response::builder ()
.body (Body::from (s))?)
}
#[instrument (level = "trace", skip (req, state))]
pub async fn handle (
req: Request <Body>,
@ -230,6 +307,20 @@ pub async fn handle (
}
}
fn get_rss_from_status (proc_status: &str) -> Option <u64>
{
use std::str::FromStr;
for line in proc_status.lines () {
if let Some (rest) = line.strip_prefix ("VmRSS:\t").and_then (|s| s.strip_suffix (" kB"))
{
return u64::from_str (rest.trim_start ()).ok ();
}
}
None
}
#[cfg (test)]
mod tests {
use std::{
@ -246,8 +337,9 @@ mod tests {
struct TestCase {
// Inputs
path_rest: &'static str,
auth_header: Option <&'static str>,
valid_key: Option <&'static str>,
input_key: Option <&'static str>,
x_api_key: Option <&'static str>,
// Expected
expected_status: StatusCode,
@ -268,9 +360,15 @@ mod tests {
x
}
fn input_key (&self, v: Option <&'static str>) -> Self {
fn auth_header (&self, v: Option <&'static str>) -> Self {
let mut x = self.clone ();
x.input_key = v;
x.auth_header = v;
x
}
fn x_api_key (&self, v: Option <&'static str>) -> Self {
let mut x = self.clone ();
x.x_api_key = v;
x
}
@ -298,13 +396,16 @@ mod tests {
.expected_body (format! ("{}\n", body))
}
async fn test (&self) {
async fn test (&self, name: &str) {
let mut input = Request::builder ()
.method ("GET")
.uri (format! ("http://127.0.0.1:4000/scraper/{}", self.path_rest));
if let Some (input_key) = self.input_key {
input = input.header ("X-ApiKey", input_key);
if let Some (auth_header) = self.auth_header {
input = input.header ("Authorization", auth_header);
}
if let Some (x_api_key) = self.x_api_key {
input = input.header ("X-ApiKey", x_api_key);
}
let input = input.body (Body::empty ()).unwrap ();
@ -331,15 +432,15 @@ mod tests {
expected_headers.insert (*key, (*value).try_into ().expect ("Couldn't convert header value"));
}
assert_eq! (actual_head.status, self.expected_status);
assert_eq! (actual_head.headers, expected_headers);
assert_eq! (actual_head.status, self.expected_status, "{}", name);
assert_eq! (actual_head.headers, expected_headers, "{}", name);
let actual_body = hyper::body::to_bytes (actual_body).await;
let actual_body = actual_body.expect ("Body should be convertible to bytes");
let actual_body = actual_body.to_vec ();
let actual_body = String::from_utf8 (actual_body).expect ("Body should be UTF-8");
assert_eq! (actual_body, self.expected_body);
assert_eq! (actual_body, self.expected_body, "{}", name);
}
}
@ -351,7 +452,8 @@ mod tests {
let base_case = TestCase {
path_rest: "v1/test",
valid_key: Some ("bogus"),
input_key: Some ("bogus"),
auth_header: None,
x_api_key: Some ("bogus"),
expected_status: StatusCode::OK,
expected_headers: vec! [
("content-type", "text/plain"),
@ -359,21 +461,47 @@ mod tests {
expected_body: "You're valid!\n".to_string (),
};
for case in &[
base_case.clone (),
base_case.path_rest ("v9999/test")
.expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_VERSION),
base_case.valid_key (None)
.expected (StatusCode::FORBIDDEN, strings::FORBIDDEN),
base_case.input_key (Some ("borgus"))
.expected (StatusCode::FORBIDDEN, strings::FORBIDDEN),
base_case.path_rest ("v1/toast")
.expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_ENDPOINT),
base_case.input_key (None)
.expected (StatusCode::FORBIDDEN, strings::NO_API_KEY),
] {
case.test ().await;
}
base_case
.test ("00").await;
base_case
.path_rest ("v9999/test")
.expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_VERSION)
.test ("01").await;
base_case
.valid_key (None)
.expected (StatusCode::FORBIDDEN, strings::FORBIDDEN)
.test ("02").await;
base_case
.x_api_key (Some ("borgus"))
.expected (StatusCode::FORBIDDEN, strings::FORBIDDEN)
.test ("03").await;
base_case
.path_rest ("v1/toast")
.expected (StatusCode::NOT_FOUND, strings::UNKNOWN_API_ENDPOINT)
.test ("04").await;
base_case
.x_api_key (None)
.expected (StatusCode::FORBIDDEN, strings::NO_API_KEY)
.test ("05").await;
base_case
.x_api_key (None)
.auth_header (Some ("Bearer bogus"))
.expected (StatusCode::OK, "You're valid!")
.test ("06").await;
});
}
#[test]
fn rss () {
let input = "VmHWM: 584 kB\nVmRSS: 584 kB\nRssAnon: 68 kB\n";
assert_eq! (get_rss_from_status (input), Some (584));
}
}

View File

@ -188,7 +188,7 @@ pub async fn handle_response (
};
// UKAUFFY4 (Send half)
if tx.send (Ok ((resp_parts, body))).is_err () {
if tx.tx.send (Ok ((resp_parts, body))).is_err () {
let msg = "Failed to connect to client";
error! (msg);
return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?);

View File

@ -47,13 +47,12 @@ fn main ()
let app = app::App::default();
let mut wind = Window::new (100, 100, 500, 180, "PTTH server");
let config_file_opt = match ptth_server::load_toml::load::<ConfigFile, _> ("./config/ptth_server.toml")
{
let config_file_opt = match ptth_server::load_toml::load::<ConfigFile, _> ("./config/ptth_server.toml") {
Ok (x) => Some (x),
Err (e) => {
eprintln! ("Error in config TOML: {:?}", e);
eprintln! ("Error in `./config/ptth_server.toml`: {:?}", e);
None
}
},
};
let (hit_tx, mut hit_rx) = mpsc::channel (1);
@ -61,9 +60,7 @@ fn main ()
let fltk_tx = fltk_tx;
rt.spawn (async move {
eprintln! ("Entering channel task");
while hit_rx.recv ().await.is_some () {
eprintln! ("fltk_tx");
fltk_tx.send (Message::Hit);
}
});

View File

@ -115,7 +115,7 @@ AIABAACAAQAAgAEAAIABAACAAQAAgAEAAIABAACAAQAA" rel="icon" type="image/x-icon" />
{{#each entries}}
<tr>
<td><a class="entry" href="{{this.encoded_file_name}}{{this.trailing_slash}}">
<td><a class="entry" href="./{{this.encoded_file_name}}{{this.trailing_slash}}">
{{this.icon}} {{this.file_name}}{{this.trailing_slash}}</a></td>
<td><span class="grey">{{this.size}}</span></td>
</tr>