♻️ Fixing clippy lints
parent
5c0d7ea998
commit
687cffdf90
|
@ -1,19 +1,21 @@
|
||||||
|
// False positive with itertools::process_results
|
||||||
|
#![allow (clippy::redundant_closure)]
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::*,
|
collections::HashMap,
|
||||||
convert::{TryFrom, TryInto},
|
convert::{TryFrom, TryInto},
|
||||||
iter::FromIterator,
|
iter::FromIterator,
|
||||||
path::Path,
|
path::Path,
|
||||||
};
|
};
|
||||||
|
|
||||||
use serde::Deserialize;
|
|
||||||
|
|
||||||
use crate::errors::ConfigError;
|
use crate::errors::ConfigError;
|
||||||
|
|
||||||
// Stuff we need to load from the config file and use to
|
// Stuff we need to load from the config file and use to
|
||||||
// set up the HTTP server
|
// set up the HTTP server
|
||||||
|
|
||||||
pub mod file {
|
pub mod file {
|
||||||
use super::*;
|
use std::collections::HashMap;
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
#[derive (Deserialize)]
|
#[derive (Deserialize)]
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
|
@ -76,7 +78,7 @@ impl Config {
|
||||||
|
|
||||||
let mut f = tokio::fs::File::open (path).await?;
|
let mut f = tokio::fs::File::open (path).await?;
|
||||||
|
|
||||||
let mut buffer = vec! [0u8; 4096];
|
let mut buffer = vec! [0_u8; 4096];
|
||||||
let bytes_read = f.read (&mut buffer).await?;
|
let bytes_read = f.read (&mut buffer).await?;
|
||||||
buffer.truncate (bytes_read);
|
buffer.truncate (bytes_read);
|
||||||
|
|
||||||
|
|
|
@ -29,8 +29,50 @@ pub enum ShuttingDownError {
|
||||||
ShuttingDown,
|
ShuttingDown,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive (Error, Debug)]
|
||||||
|
pub enum HandleHttpResponseError {
|
||||||
|
#[error ("HTTP error")]
|
||||||
|
Http (#[from] http::Error),
|
||||||
|
|
||||||
|
#[error ("Missing PTTH magic header")]
|
||||||
|
MissingPtthMagicHeader,
|
||||||
|
|
||||||
|
#[error ("PTTH magic header is not base64")]
|
||||||
|
PtthMagicHeaderNotBase64 (base64::DecodeError),
|
||||||
|
|
||||||
|
#[error ("PTTH magic header could not be decoded as MessagePack")]
|
||||||
|
PtthMagicHeaderNotMsgPack (rmp_serde::decode::Error),
|
||||||
|
|
||||||
|
#[error ("Couldn't tell server something")]
|
||||||
|
LostServer,
|
||||||
|
|
||||||
|
#[error ("Relaying task panicked")]
|
||||||
|
RelayingTaskPanicked (#[from] tokio::task::JoinError),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive (Error, Debug)]
|
||||||
|
pub enum RequestError {
|
||||||
|
#[error ("HTTP error")]
|
||||||
|
Http (#[from] http::Error),
|
||||||
|
|
||||||
|
#[error ("MessagePack encode error")]
|
||||||
|
MsgPack (#[from] rmp_serde::encode::Error),
|
||||||
|
|
||||||
|
#[error ("Handlebars rendering error")]
|
||||||
|
Handlebars (#[from] handlebars::RenderError),
|
||||||
|
|
||||||
|
#[error ("Error handling HTTP response")]
|
||||||
|
HandleHttpResponse (#[from] HandleHttpResponseError),
|
||||||
|
|
||||||
|
#[error ("Error is mysterious!")]
|
||||||
|
Mysterious,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive (Error, Debug)]
|
#[derive (Error, Debug)]
|
||||||
pub enum RelayError {
|
pub enum RelayError {
|
||||||
#[error ("Handlebars template file error")]
|
#[error ("Handlebars template file error")]
|
||||||
TemplateFile (#[from] handlebars::TemplateFileError),
|
TemplateFile (#[from] handlebars::TemplateFileError),
|
||||||
|
|
||||||
|
#[error ("Hyper error")]
|
||||||
|
Hyper (#[from] hyper::Error),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,23 @@
|
||||||
|
#![warn (clippy::pedantic)]
|
||||||
|
|
||||||
|
// I'm not sure if I like this one
|
||||||
|
#![allow (clippy::enum_glob_use)]
|
||||||
|
|
||||||
|
// I don't see the point in documenting the errors outside of where the
|
||||||
|
// error type is defined.
|
||||||
|
#![allow (clippy::missing_errors_doc)]
|
||||||
|
|
||||||
|
// I don't see the point of writing the type twice if I'm initializing a struct
|
||||||
|
// and the type is already in the struct definition.
|
||||||
|
#![allow (clippy::default_trait_access)]
|
||||||
|
|
||||||
|
// False positive on futures::select! macro
|
||||||
|
#![allow (clippy::mut_mut)]
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
borrow::Cow,
|
borrow::Cow,
|
||||||
error::Error,
|
collections::HashMap,
|
||||||
collections::*,
|
convert::TryFrom,
|
||||||
convert::{Infallible, TryFrom},
|
|
||||||
iter::FromIterator,
|
iter::FromIterator,
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
|
@ -12,6 +27,11 @@ use std::{
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use chrono::{
|
||||||
|
DateTime,
|
||||||
|
SecondsFormat,
|
||||||
|
Utc
|
||||||
|
};
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use futures::{
|
use futures::{
|
||||||
FutureExt,
|
FutureExt,
|
||||||
|
@ -91,12 +111,6 @@ enum RequestRendezvous {
|
||||||
|
|
||||||
type ResponseRendezvous = oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>;
|
type ResponseRendezvous = oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>;
|
||||||
|
|
||||||
use chrono::{
|
|
||||||
DateTime,
|
|
||||||
SecondsFormat,
|
|
||||||
Utc
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive (Clone)]
|
#[derive (Clone)]
|
||||||
pub struct ServerStatus {
|
pub struct ServerStatus {
|
||||||
last_seen: DateTime <Utc>,
|
last_seen: DateTime <Utc>,
|
||||||
|
@ -174,9 +188,11 @@ async fn handle_http_listen (
|
||||||
watcher_code: String,
|
watcher_code: String,
|
||||||
api_key: &[u8],
|
api_key: &[u8],
|
||||||
)
|
)
|
||||||
-> Result <Response <Body>, http::Error>
|
-> Result <Response <Body>, RequestError>
|
||||||
{
|
{
|
||||||
let trip_error = error_reply (StatusCode::UNAUTHORIZED, "Bad X-ApiKey");
|
use RequestRendezvous::*;
|
||||||
|
|
||||||
|
let trip_error = || Ok (error_reply (StatusCode::UNAUTHORIZED, "Bad X-ApiKey")?);
|
||||||
|
|
||||||
let expected_tripcode = {
|
let expected_tripcode = {
|
||||||
let config = state.config.read ().await;
|
let config = state.config.read ().await;
|
||||||
|
@ -184,16 +200,16 @@ async fn handle_http_listen (
|
||||||
match config.servers.get (&watcher_code) {
|
match config.servers.get (&watcher_code) {
|
||||||
None => {
|
None => {
|
||||||
error! ("Denied http_listen for non-existent server name {}", watcher_code);
|
error! ("Denied http_listen for non-existent server name {}", watcher_code);
|
||||||
return trip_error;
|
return trip_error ();
|
||||||
},
|
},
|
||||||
Some (x) => (*x).tripcode.clone (),
|
Some (x) => (*x).tripcode,
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let actual_tripcode = blake3::hash (api_key);
|
let actual_tripcode = blake3::hash (api_key);
|
||||||
|
|
||||||
if expected_tripcode != actual_tripcode {
|
if expected_tripcode != actual_tripcode {
|
||||||
error! ("Denied http_listen for bad tripcode {}", base64::encode (actual_tripcode.as_bytes ()));
|
error! ("Denied http_listen for bad tripcode {}", base64::encode (actual_tripcode.as_bytes ()));
|
||||||
return trip_error;
|
return trip_error ();
|
||||||
}
|
}
|
||||||
|
|
||||||
// End of early returns
|
// End of early returns
|
||||||
|
@ -206,8 +222,6 @@ async fn handle_http_listen (
|
||||||
status.last_seen = Utc::now ();
|
status.last_seen = Utc::now ();
|
||||||
}
|
}
|
||||||
|
|
||||||
use RequestRendezvous::*;
|
|
||||||
|
|
||||||
let (tx, rx) = oneshot::channel ();
|
let (tx, rx) = oneshot::channel ();
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -220,7 +234,7 @@ async fn handle_http_listen (
|
||||||
// handle them immediately
|
// handle them immediately
|
||||||
|
|
||||||
debug! ("Sending {} parked requests to server {}", v.len (), watcher_code);
|
debug! ("Sending {} parked requests to server {}", v.len (), watcher_code);
|
||||||
return ok_reply (rmp_serde::to_vec (&v).unwrap ());
|
return Ok (ok_reply (rmp_serde::to_vec (&v)?)?);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -234,14 +248,14 @@ async fn handle_http_listen (
|
||||||
x = rx.fuse () => match x {
|
x = rx.fuse () => match x {
|
||||||
Ok (Ok (one_req)) => {
|
Ok (Ok (one_req)) => {
|
||||||
debug! ("Unparking server {}", watcher_code);
|
debug! ("Unparking server {}", watcher_code);
|
||||||
ok_reply (rmp_serde::to_vec (&vec! [one_req]).unwrap ())
|
Ok (ok_reply (rmp_serde::to_vec (&vec! [one_req])?)?)
|
||||||
},
|
},
|
||||||
Ok (Err (ShuttingDownError::ShuttingDown)) => error_reply (StatusCode::SERVICE_UNAVAILABLE, "Server is shutting down, try again soon"),
|
Ok (Err (ShuttingDownError::ShuttingDown)) => Ok (error_reply (StatusCode::SERVICE_UNAVAILABLE, "Server is shutting down, try again soon")?),
|
||||||
Err (_) => error_reply (StatusCode::INTERNAL_SERVER_ERROR, "Server error"),
|
Err (_) => Ok (error_reply (StatusCode::INTERNAL_SERVER_ERROR, "Server error")?),
|
||||||
},
|
},
|
||||||
_ = delay_for (Duration::from_secs (30)).fuse () => {
|
_ = delay_for (Duration::from_secs (30)).fuse () => {
|
||||||
debug! ("Timed out http_listen for server {}", watcher_code);
|
debug! ("Timed out http_listen for server {}", watcher_code);
|
||||||
return error_reply (StatusCode::NO_CONTENT, "No requests now, long-poll again")
|
return Ok (error_reply (StatusCode::NO_CONTENT, "No requests now, long-poll again")?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -253,26 +267,32 @@ async fn handle_http_response (
|
||||||
state: Arc <RelayState>,
|
state: Arc <RelayState>,
|
||||||
req_id: String,
|
req_id: String,
|
||||||
)
|
)
|
||||||
-> Result <Response <Body>, http::Error>
|
-> Result <Response <Body>, HandleHttpResponseError>
|
||||||
{
|
{
|
||||||
let (parts, mut body) = req.into_parts ();
|
|
||||||
let resp_parts: http_serde::ResponseParts = rmp_serde::from_read_ref (&base64::decode (parts.headers.get (ptth_core::PTTH_MAGIC_HEADER).unwrap ()).unwrap ()).unwrap ();
|
|
||||||
|
|
||||||
// Intercept the body packets here so we can check when the stream
|
|
||||||
// ends or errors out
|
|
||||||
|
|
||||||
#[derive (Debug)]
|
#[derive (Debug)]
|
||||||
enum BodyFinishedReason {
|
enum BodyFinishedReason {
|
||||||
StreamFinished,
|
StreamFinished,
|
||||||
ClientDisconnected,
|
ClientDisconnected,
|
||||||
}
|
}
|
||||||
use BodyFinishedReason::*;
|
use BodyFinishedReason::*;
|
||||||
|
use HandleHttpResponseError::*;
|
||||||
|
|
||||||
|
let (parts, mut body) = req.into_parts ();
|
||||||
|
|
||||||
|
let magic_header = parts.headers.get (ptth_core::PTTH_MAGIC_HEADER).ok_or (MissingPtthMagicHeader)?;
|
||||||
|
|
||||||
|
let magic_header = base64::decode (magic_header).map_err (PtthMagicHeaderNotBase64)?;
|
||||||
|
|
||||||
|
let resp_parts: http_serde::ResponseParts = rmp_serde::from_read_ref (&magic_header).map_err (PtthMagicHeaderNotMsgPack)?;
|
||||||
|
|
||||||
|
// Intercept the body packets here so we can check when the stream
|
||||||
|
// ends or errors out
|
||||||
|
|
||||||
let (mut body_tx, body_rx) = mpsc::channel (2);
|
let (mut body_tx, body_rx) = mpsc::channel (2);
|
||||||
let (body_finished_tx, body_finished_rx) = oneshot::channel ();
|
let (body_finished_tx, body_finished_rx) = oneshot::channel ();
|
||||||
let mut shutdown_watch_rx = state.shutdown_watch_rx.clone ();
|
let mut shutdown_watch_rx = state.shutdown_watch_rx.clone ();
|
||||||
|
|
||||||
spawn (async move {
|
let relay_task = spawn (async move {
|
||||||
if shutdown_watch_rx.recv ().await == Some (false) {
|
if shutdown_watch_rx.recv ().await == Some (false) {
|
||||||
loop {
|
loop {
|
||||||
let item = body.next ().await;
|
let item = body.next ().await;
|
||||||
|
@ -285,7 +305,7 @@ async fn handle_http_response (
|
||||||
futures::select! {
|
futures::select! {
|
||||||
x = body_tx.send (item).fuse () => if let Err (_) = x {
|
x = body_tx.send (item).fuse () => if let Err (_) = x {
|
||||||
info! ("Body closed while relaying. (Client hung up?)");
|
info! ("Body closed while relaying. (Client hung up?)");
|
||||||
body_finished_tx.send (ClientDisconnected).unwrap ();
|
body_finished_tx.send (ClientDisconnected).map_err (|_| LostServer).unwrap ();
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
_ = shutdown_watch_rx.recv ().fuse () => {
|
_ = shutdown_watch_rx.recv ().fuse () => {
|
||||||
|
@ -296,7 +316,7 @@ async fn handle_http_response (
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
debug! ("Finished relaying bytes");
|
debug! ("Finished relaying bytes");
|
||||||
body_finished_tx.send (StreamFinished).unwrap ();
|
body_finished_tx.send (StreamFinished).map_err (|_| LostServer).unwrap ();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -313,7 +333,7 @@ async fn handle_http_response (
|
||||||
match response_rendezvous.remove (&req_id) {
|
match response_rendezvous.remove (&req_id) {
|
||||||
None => {
|
None => {
|
||||||
error! ("Server tried to respond to non-existent request");
|
error! ("Server tried to respond to non-existent request");
|
||||||
return error_reply (StatusCode::BAD_REQUEST, "Request ID not found in response_rendezvous");
|
return Ok (error_reply (StatusCode::BAD_REQUEST, "Request ID not found in response_rendezvous")?);
|
||||||
},
|
},
|
||||||
Some ((_, x)) => x,
|
Some ((_, x)) => x,
|
||||||
}
|
}
|
||||||
|
@ -323,20 +343,22 @@ async fn handle_http_response (
|
||||||
if tx.send (Ok ((resp_parts, body))).is_err () {
|
if tx.send (Ok ((resp_parts, body))).is_err () {
|
||||||
let msg = "Failed to connect to client";
|
let msg = "Failed to connect to client";
|
||||||
error! (msg);
|
error! (msg);
|
||||||
return error_reply (StatusCode::BAD_GATEWAY, msg);
|
return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
relay_task.await?;
|
||||||
|
|
||||||
debug! ("Connected server to client for streaming.");
|
debug! ("Connected server to client for streaming.");
|
||||||
match body_finished_rx.await {
|
match body_finished_rx.await {
|
||||||
Ok (StreamFinished) => {
|
Ok (StreamFinished) => {
|
||||||
error_reply (StatusCode::OK, "StreamFinished")
|
Ok (error_reply (StatusCode::OK, "StreamFinished")?)
|
||||||
},
|
},
|
||||||
Ok (ClientDisconnected) => {
|
Ok (ClientDisconnected) => {
|
||||||
error_reply (StatusCode::OK, "ClientDisconnected")
|
Ok (error_reply (StatusCode::OK, "ClientDisconnected")?)
|
||||||
},
|
},
|
||||||
Err (e) => {
|
Err (e) => {
|
||||||
debug! ("body_finished_rx {}", e);
|
debug! ("body_finished_rx {}", e);
|
||||||
error_reply (StatusCode::OK, "body_finished_rx Err")
|
Ok (error_reply (StatusCode::OK, "body_finished_rx Err")?)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -361,7 +383,7 @@ async fn handle_http_request (
|
||||||
|
|
||||||
let req = match http_serde::RequestParts::from_hyper (req.method, uri, req.headers) {
|
let req = match http_serde::RequestParts::from_hyper (req.method, uri, req.headers) {
|
||||||
Ok (x) => x,
|
Ok (x) => x,
|
||||||
_ => return error_reply (StatusCode::BAD_REQUEST, "Bad request"),
|
Err (_) => return error_reply (StatusCode::BAD_REQUEST, "Bad request"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let (tx, rx) = oneshot::channel ();
|
let (tx, rx) = oneshot::channel ();
|
||||||
|
@ -375,6 +397,8 @@ async fn handle_http_request (
|
||||||
trace! ("Created request {}", req_id);
|
trace! ("Created request {}", req_id);
|
||||||
|
|
||||||
{
|
{
|
||||||
|
use RequestRendezvous::*;
|
||||||
|
|
||||||
let mut request_rendezvous = state.request_rendezvous.lock ().await;
|
let mut request_rendezvous = state.request_rendezvous.lock ().await;
|
||||||
|
|
||||||
let wrapped = http_serde::WrappedRequest {
|
let wrapped = http_serde::WrappedRequest {
|
||||||
|
@ -382,8 +406,6 @@ async fn handle_http_request (
|
||||||
req,
|
req,
|
||||||
};
|
};
|
||||||
|
|
||||||
use RequestRendezvous::*;
|
|
||||||
|
|
||||||
let new_rendezvous = match request_rendezvous.remove (&watcher_code) {
|
let new_rendezvous = match request_rendezvous.remove (&watcher_code) {
|
||||||
Some (ParkedClients (mut v)) => {
|
Some (ParkedClients (mut v)) => {
|
||||||
debug! ("Parking request {} ({} already queued)", req_id, v.len ());
|
debug! ("Parking request {} ({} already queued)", req_id, v.len ());
|
||||||
|
@ -439,7 +461,7 @@ async fn handle_http_request (
|
||||||
let mut resp = Response::builder ()
|
let mut resp = Response::builder ()
|
||||||
.status (hyper::StatusCode::from (parts.status_code));
|
.status (hyper::StatusCode::from (parts.status_code));
|
||||||
|
|
||||||
for (k, v) in parts.headers.into_iter () {
|
for (k, v) in parts.headers {
|
||||||
resp = resp.header (&k, v);
|
resp = resp.header (&k, v);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -567,17 +589,17 @@ async fn handle_server_list_internal (state: &Arc <RelayState>)
|
||||||
|
|
||||||
async fn handle_server_list (
|
async fn handle_server_list (
|
||||||
state: Arc <RelayState>
|
state: Arc <RelayState>
|
||||||
) -> Result <Response <Body>, http::Error>
|
) -> Result <Response <Body>, RequestError>
|
||||||
{
|
{
|
||||||
let page = handle_server_list_internal (&state).await;
|
let page = handle_server_list_internal (&state).await;
|
||||||
|
|
||||||
let s = state.handlebars.render ("relay_server_list", &page).unwrap ();
|
let s = state.handlebars.render ("relay_server_list", &page)?;
|
||||||
ok_reply (s)
|
Ok (ok_reply (s)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument (level = "trace", skip (req, state))]
|
#[instrument (level = "trace", skip (req, state))]
|
||||||
async fn handle_all (req: Request <Body>, state: Arc <RelayState>)
|
async fn handle_all (req: Request <Body>, state: Arc <RelayState>)
|
||||||
-> Result <Response <Body>, http::Error>
|
-> Result <Response <Body>, RequestError>
|
||||||
{
|
{
|
||||||
let path = req.uri ().path ();
|
let path = req.uri ().path ();
|
||||||
//println! ("{}", path);
|
//println! ("{}", path);
|
||||||
|
@ -592,44 +614,47 @@ async fn handle_all (req: Request <Body>, state: Arc <RelayState>)
|
||||||
|
|
||||||
return if let Some (request_code) = prefix_match ("/7ZSFUKGV/http_response/", path) {
|
return if let Some (request_code) = prefix_match ("/7ZSFUKGV/http_response/", path) {
|
||||||
let request_code = request_code.into ();
|
let request_code = request_code.into ();
|
||||||
handle_http_response (req, state, request_code).await
|
Ok (handle_http_response (req, state, request_code).await?)
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
error_reply (StatusCode::BAD_REQUEST, "Can't POST this")
|
Ok (error_reply (StatusCode::BAD_REQUEST, "Can't POST this")?)
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some (listen_code) = prefix_match ("/7ZSFUKGV/http_listen/", path) {
|
if let Some (listen_code) = prefix_match ("/7ZSFUKGV/http_listen/", path) {
|
||||||
let api_key = match api_key {
|
let api_key = match api_key {
|
||||||
None => return error_reply (StatusCode::UNAUTHORIZED, "Can't register as server without an API key"),
|
None => return Ok (error_reply (StatusCode::UNAUTHORIZED, "Can't register as server without an API key")?),
|
||||||
Some (x) => x,
|
Some (x) => x,
|
||||||
};
|
};
|
||||||
handle_http_listen (state, listen_code.into (), api_key.as_bytes ()).await
|
handle_http_listen (state, listen_code.into (), api_key.as_bytes ()).await
|
||||||
}
|
}
|
||||||
else if let Some (rest) = prefix_match ("/frontend/servers/", path) {
|
else if let Some (rest) = prefix_match ("/frontend/servers/", path) {
|
||||||
if rest == "" {
|
if rest == "" {
|
||||||
handle_server_list (state).await
|
Ok (handle_server_list (state).await?)
|
||||||
}
|
}
|
||||||
else if let Some (idx) = rest.find ('/') {
|
else if let Some (idx) = rest.find ('/') {
|
||||||
let listen_code = String::from (&rest [0..idx]);
|
let listen_code = String::from (&rest [0..idx]);
|
||||||
let path = String::from (&rest [idx..]);
|
let path = String::from (&rest [idx..]);
|
||||||
let (parts, _) = req.into_parts ();
|
let (parts, _) = req.into_parts ();
|
||||||
|
|
||||||
handle_http_request (parts, path, state, listen_code).await
|
Ok (handle_http_request (parts, path, state, listen_code).await?)
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
error_reply (StatusCode::BAD_REQUEST, "Bad URI format")
|
Ok (error_reply (StatusCode::BAD_REQUEST, "Bad URI format")?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if path == "/" {
|
else if path == "/" {
|
||||||
let s = state.handlebars.render ("relay_root", &()).unwrap ();
|
let s = state.handlebars.render ("relay_root", &())?;
|
||||||
ok_reply (s)
|
Ok (ok_reply (s)?)
|
||||||
}
|
}
|
||||||
else if path == "/frontend/relay_up_check" {
|
else if path == "/frontend/relay_up_check" {
|
||||||
error_reply (StatusCode::OK, "Relay is up")
|
Ok (error_reply (StatusCode::OK, "Relay is up")?)
|
||||||
|
}
|
||||||
|
else if path == "/frontend/test_mysterious_error" {
|
||||||
|
Err (RequestError::Mysterious)
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
error_reply (StatusCode::OK, "Hi")
|
Ok (error_reply (StatusCode::OK, "Hi")?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -641,10 +666,10 @@ pub fn load_templates (asset_root: &Path)
|
||||||
|
|
||||||
let asset_root = asset_root.join ("handlebars/relay");
|
let asset_root = asset_root.join ("handlebars/relay");
|
||||||
|
|
||||||
for (k, v) in vec! [
|
for (k, v) in &[
|
||||||
("relay_server_list", "relay_server_list.html"),
|
("relay_server_list", "relay_server_list.html"),
|
||||||
("relay_root", "relay_root.html"),
|
("relay_root", "relay_root.html"),
|
||||||
].into_iter () {
|
] {
|
||||||
handlebars.register_template_file (k, &asset_root.join (v))?;
|
handlebars.register_template_file (k, &asset_root.join (v))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -670,7 +695,7 @@ pub async fn run_relay (
|
||||||
shutdown_oneshot: oneshot::Receiver <()>,
|
shutdown_oneshot: oneshot::Receiver <()>,
|
||||||
config_reload_path: Option <PathBuf>
|
config_reload_path: Option <PathBuf>
|
||||||
)
|
)
|
||||||
-> Result <(), Box <dyn Error>>
|
-> Result <(), RelayError>
|
||||||
{
|
{
|
||||||
let addr = SocketAddr::from ((
|
let addr = SocketAddr::from ((
|
||||||
[0, 0, 0, 0],
|
[0, 0, 0, 0],
|
||||||
|
@ -693,7 +718,7 @@ pub async fn run_relay (
|
||||||
let state = state.clone ();
|
let state = state.clone ();
|
||||||
|
|
||||||
async {
|
async {
|
||||||
Ok::<_, Infallible> (service_fn (move |req| {
|
Ok::<_, RequestError> (service_fn (move |req| {
|
||||||
let state = state.clone ();
|
let state = state.clone ();
|
||||||
|
|
||||||
handle_all (req, state)
|
handle_all (req, state)
|
||||||
|
@ -705,18 +730,18 @@ pub async fn run_relay (
|
||||||
.serve (make_svc);
|
.serve (make_svc);
|
||||||
|
|
||||||
server.with_graceful_shutdown (async {
|
server.with_graceful_shutdown (async {
|
||||||
|
use ShuttingDownError::ShuttingDown;
|
||||||
|
|
||||||
shutdown_oneshot.await.ok ();
|
shutdown_oneshot.await.ok ();
|
||||||
|
|
||||||
state.shutdown_watch_tx.broadcast (true).unwrap ();
|
state.shutdown_watch_tx.broadcast (true).unwrap ();
|
||||||
|
|
||||||
use ShuttingDownError::ShuttingDown;
|
|
||||||
|
|
||||||
let mut response_rendezvous = state.response_rendezvous.write ().await;
|
let mut response_rendezvous = state.response_rendezvous.write ().await;
|
||||||
let mut swapped = DashMap::default ();
|
let mut swapped = DashMap::default ();
|
||||||
|
|
||||||
std::mem::swap (&mut swapped, &mut response_rendezvous);
|
std::mem::swap (&mut swapped, &mut response_rendezvous);
|
||||||
|
|
||||||
for (_, sender) in swapped.into_iter () {
|
for (_, sender) in swapped {
|
||||||
sender.send (Err (ShuttingDown)).ok ();
|
sender.send (Err (ShuttingDown)).ok ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue