406 lines
9.5 KiB
Rust
406 lines
9.5 KiB
Rust
//! # PTTH Server
|
|
//!
|
|
//! The PTTH server is an HTTP server that can serve files from
|
|
//! behind a firewall, because it only makes outgoing HTTP connections
|
|
//! to a PTTH relay.
|
|
//!
|
|
//! ```
|
|
//! View from outside the PTTH tunnel:
|
|
//!
|
|
//! * HTTP client
|
|
//! |
|
|
//! | HTTP(S) requests
|
|
//! V
|
|
//! * ptth_relay
|
|
//! ^
|
|
//! | HTTP(S) requests
|
|
//! |
|
|
//! * ptth_server
|
|
//!
|
|
//! View from inside the PTTH tunnel:
|
|
//!
|
|
//! * HTTP client
|
|
//! |
|
|
//! | HTTP(S) requests
|
|
//! V
|
|
//! * ptth_relay
|
|
//! |
|
|
//! | HTTP(S) requests
|
|
//! V
|
|
//! * ptth_server
|
|
//! ```
|
|
|
|
#![warn (clippy::pedantic)]
|
|
|
|
// I don't see the point in documenting the errors outside of where the
|
|
// error type is defined.
|
|
#![allow (clippy::missing_errors_doc)]
|
|
|
|
// False positive on futures::select! macro
|
|
#![allow (clippy::mut_mut)]
|
|
|
|
use std::{
|
|
path::PathBuf,
|
|
sync::Arc,
|
|
time::Duration,
|
|
};
|
|
|
|
use futures::FutureExt;
|
|
use reqwest::Client;
|
|
use tokio::{
|
|
sync::{
|
|
mpsc,
|
|
oneshot,
|
|
},
|
|
};
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
|
|
use ptth_core::{
|
|
http_serde,
|
|
prelude::*,
|
|
};
|
|
|
|
pub mod errors;
|
|
|
|
/// In-process file server module with byte range and ETag support
|
|
pub mod file_server;
|
|
|
|
/// Load and de-serialize structures from TOML, with a size limit
|
|
/// and checking permissions (On Unix)
|
|
pub mod load_toml;
|
|
|
|
use errors::ServerError;
|
|
|
|
struct State {
|
|
file_server: file_server::FileServer,
|
|
config: Config,
|
|
client: Client,
|
|
}
|
|
|
|
// Unwrap a request from PTTH format and pass it into file_server.
|
|
// When file_server responds, wrap it back up and stream it to the relay.
|
|
|
|
async fn handle_one_req (
|
|
state: &Arc <State>,
|
|
wrapped_req: http_serde::WrappedRequest
|
|
) -> Result <(), ServerError>
|
|
{
|
|
let (req_id, parts) = (wrapped_req.id, wrapped_req.req);
|
|
|
|
debug! ("Handling request {}", req_id);
|
|
|
|
let response = state.file_server.serve_all (
|
|
parts.method,
|
|
&parts.uri,
|
|
&parts.headers,
|
|
).await?;
|
|
|
|
let mut resp_req = state.client
|
|
.post (&format! ("{}/http_response/{}", state.config.relay_url, req_id))
|
|
.header (ptth_core::PTTH_MAGIC_HEADER, base64::encode (rmp_serde::to_vec (&response.parts).map_err (ServerError::MessagePackEncodeResponse)?));
|
|
|
|
if let Some (length) = response.content_length {
|
|
resp_req = resp_req.header ("Content-Length", length.to_string ());
|
|
}
|
|
if let Some (mut body) = response.body {
|
|
if state.config.throttle_upload {
|
|
// Spawn another task to throttle the chunks
|
|
|
|
let (tx, rx) = mpsc::channel (1);
|
|
|
|
tokio::spawn (async move {
|
|
while let Some (chunk) = body.recv ().await {
|
|
let len = chunk.as_ref ().map (Vec::len).ok ();
|
|
tx.send (chunk).await?;
|
|
|
|
if let Some (_len) = len {
|
|
// debug! ("Throttling {} byte chunk", len);
|
|
}
|
|
|
|
tokio::time::sleep (Duration::from_millis (1000)).await;
|
|
}
|
|
|
|
Ok::<_, anyhow::Error> (())
|
|
});
|
|
|
|
resp_req = resp_req.body (reqwest::Body::wrap_stream (ReceiverStream::new (rx)));
|
|
}
|
|
else {
|
|
resp_req = resp_req.body (reqwest::Body::wrap_stream (ReceiverStream::new (body)));
|
|
}
|
|
}
|
|
|
|
let req = resp_req.build ().map_err (ServerError::Step5Responding)?;
|
|
|
|
debug! ("{:?}", req.headers ());
|
|
|
|
//println! ("Step 6");
|
|
match state.client.execute (req).await {
|
|
Ok (r) => {
|
|
let status = r.status ();
|
|
let text = r.text ().await.map_err (ServerError::Step7AfterResponse)?;
|
|
debug! ("{:?} {:?}", status, text);
|
|
},
|
|
Err (e) => {
|
|
if e.is_request () {
|
|
warn! ("Error while POSTing response. Client probably hung up.");
|
|
}
|
|
|
|
error! ("Err: {:?}", e);
|
|
},
|
|
}
|
|
|
|
Ok::<(), ServerError> (())
|
|
}
|
|
|
|
async fn handle_req_resp (
|
|
state: &Arc <State>,
|
|
req_resp: reqwest::Response
|
|
) -> Result <(), ServerError>
|
|
{
|
|
//println! ("Step 1");
|
|
|
|
let body = req_resp.bytes ().await.map_err (ServerError::CantCollectWrappedRequests)?;
|
|
let wrapped_reqs: Vec <http_serde::WrappedRequest> = match rmp_serde::from_read_ref (&body)
|
|
{
|
|
Ok (x) => x,
|
|
Err (e) => {
|
|
error! ("Can't parse wrapped requests: {:?}", e);
|
|
return Err (ServerError::CantParseWrappedRequests (e));
|
|
},
|
|
};
|
|
|
|
debug! ("Unwrapped {} requests", wrapped_reqs.len ());
|
|
|
|
for wrapped_req in wrapped_reqs {
|
|
let state = state.clone ();
|
|
|
|
// These have to detach, so we won't be able to catch the join errors.
|
|
|
|
tokio::spawn (async move {
|
|
handle_one_req (&state, wrapped_req).await
|
|
});
|
|
}
|
|
|
|
Ok (())
|
|
}
|
|
|
|
/// Config for ptth_server and the file server module
|
|
///
|
|
/// This is a complete config.
|
|
/// The bin frontend is allowed to load an incomplete config from
|
|
/// the TOML file, fill it out with command-line options, and put
|
|
/// the completed config in this struct.
|
|
|
|
#[derive (Clone)]
|
|
pub struct ConfigFile {
|
|
/// A name that uniquely identifies this server on the relay.
|
|
/// May be human-readable.
|
|
pub name: String,
|
|
|
|
/// Secret API key used to authenticate the server with the relay
|
|
pub api_key: String,
|
|
|
|
/// URL of the PTTH relay server that ptth_server should connect to
|
|
pub relay_url: String,
|
|
|
|
/// Directory that the file server module will expose to clients
|
|
/// over the relay. If None, the current working dir is used.
|
|
pub file_server_root: Option <PathBuf>,
|
|
|
|
/// For debugging.
|
|
pub throttle_upload: bool,
|
|
}
|
|
|
|
impl ConfigFile {
|
|
#[must_use]
|
|
pub fn new (name: String, api_key: String, relay_url: String) -> Self {
|
|
Self {
|
|
name,
|
|
api_key,
|
|
relay_url,
|
|
file_server_root: None,
|
|
throttle_upload: false,
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn tripcode (&self) -> String {
|
|
base64::encode (blake3::hash (self.api_key.as_bytes ()).as_bytes ())
|
|
}
|
|
}
|
|
|
|
/// Config for ptth_server itself
|
|
|
|
#[derive (Default)]
|
|
pub struct Config {
|
|
/// URL of the PTTH relay server that ptth_server should connect to
|
|
pub relay_url: String,
|
|
|
|
/// For debugging.
|
|
pub throttle_upload: bool,
|
|
}
|
|
|
|
/// Runs a PTTH file server
|
|
|
|
pub async fn run_server (
|
|
config_file: ConfigFile,
|
|
shutdown_oneshot: oneshot::Receiver <()>,
|
|
hidden_path: Option <PathBuf>,
|
|
asset_root: Option <PathBuf>
|
|
)
|
|
-> Result <(), ServerError>
|
|
{
|
|
use std::{
|
|
convert::TryInto,
|
|
};
|
|
|
|
use arc_swap::ArcSwap;
|
|
|
|
let asset_root = asset_root.unwrap_or_else (PathBuf::new);
|
|
|
|
info! ("Server name is {}", config_file.name);
|
|
info! ("Tripcode is {}", config_file.tripcode ());
|
|
|
|
let mut headers = reqwest::header::HeaderMap::new ();
|
|
headers.insert ("X-ApiKey", config_file.api_key.try_into ().map_err (ServerError::ApiKeyInvalid)?);
|
|
|
|
let client = Client::builder ()
|
|
.default_headers (headers)
|
|
.connect_timeout (Duration::from_secs (30))
|
|
.build ().map_err (ServerError::CantBuildHttpClient)?;
|
|
|
|
let metrics_interval = Arc::new (ArcSwap::default ());
|
|
|
|
let interval_writer = Arc::clone (&metrics_interval);
|
|
tokio::spawn (async move {
|
|
file_server::metrics::Interval::monitor (interval_writer).await;
|
|
});
|
|
|
|
let state = Arc::new (State {
|
|
file_server: file_server::FileServer::new (
|
|
config_file.file_server_root,
|
|
&asset_root,
|
|
config_file.name,
|
|
metrics_interval,
|
|
hidden_path,
|
|
)?,
|
|
config: Config {
|
|
relay_url: config_file.relay_url,
|
|
throttle_upload: config_file.throttle_upload,
|
|
},
|
|
client,
|
|
});
|
|
|
|
run_server_loop (state, shutdown_oneshot).await
|
|
}
|
|
|
|
async fn run_server_loop (
|
|
state: Arc <State>,
|
|
shutdown_oneshot: oneshot::Receiver <()>,
|
|
) -> Result <(), ServerError> {
|
|
use http::status::StatusCode;
|
|
|
|
let mut backoff_delay = 0;
|
|
let mut shutdown_oneshot = shutdown_oneshot.fuse ();
|
|
|
|
loop {
|
|
// TODO: Extract loop body to function?
|
|
|
|
if backoff_delay > 0 {
|
|
let sleep = tokio::time::sleep (Duration::from_millis (backoff_delay));
|
|
tokio::pin! (sleep);
|
|
|
|
tokio::select! {
|
|
_ = &mut sleep => {},
|
|
_ = &mut shutdown_oneshot => {
|
|
info! ("Received graceful shutdown");
|
|
break;
|
|
},
|
|
}
|
|
}
|
|
|
|
debug! ("http_listen");
|
|
|
|
let req_req = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.file_server.metrics_startup.server_name))
|
|
.timeout (Duration::from_secs (30))
|
|
.send ();
|
|
|
|
let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500);
|
|
|
|
let req_req = futures::select! {
|
|
r = req_req.fuse () => r,
|
|
_ = shutdown_oneshot => {
|
|
info! ("Received graceful shutdown");
|
|
break;
|
|
},
|
|
};
|
|
|
|
let req_resp = match req_req {
|
|
Err (e) => {
|
|
if e.is_timeout () {
|
|
error! ("Client-side timeout. Is an overly-aggressive firewall closing long-lived connections? Is the network flakey?");
|
|
}
|
|
else {
|
|
error! ("Err: {:?}", e);
|
|
if backoff_delay != err_backoff_delay {
|
|
error! ("Non-timeout issue, increasing backoff_delay");
|
|
backoff_delay = err_backoff_delay;
|
|
}
|
|
}
|
|
continue;
|
|
},
|
|
Ok (x) => x,
|
|
};
|
|
|
|
if req_resp.status () == StatusCode::NO_CONTENT {
|
|
debug! ("http_listen long poll timed out on the server, good.");
|
|
continue;
|
|
}
|
|
else if req_resp.status () != StatusCode::OK {
|
|
error! ("{}", req_resp.status ());
|
|
let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?;
|
|
let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?;
|
|
error! ("{}", body);
|
|
if backoff_delay != err_backoff_delay {
|
|
error! ("Non-timeout issue, increasing backoff_delay");
|
|
backoff_delay = err_backoff_delay;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
// Unpack the requests, spawn them into new tasks, then loop back
|
|
// around.
|
|
|
|
if handle_req_resp (&state, req_resp).await.is_err () {
|
|
backoff_delay = err_backoff_delay;
|
|
continue;
|
|
}
|
|
|
|
if backoff_delay != 0 {
|
|
debug! ("backoff_delay = 0");
|
|
backoff_delay = 0;
|
|
}
|
|
}
|
|
|
|
info! ("Exiting");
|
|
|
|
Ok (())
|
|
}
|
|
|
|
#[cfg (test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn tripcode_algo () {
|
|
let config = ConfigFile::new (
|
|
"TestName".into (),
|
|
"PlaypenCausalPlatformCommodeImproveCatalyze".into (),
|
|
"".into (),
|
|
);
|
|
|
|
assert_eq! (config.tripcode (), "A9rPwZyY89Ag4TJjMoyYA2NeGOm99Je6rq1s0rg8PfY=".to_string ());
|
|
}
|
|
}
|