ptth/crates/ptth_server/src/lib.rs

665 lines
16 KiB
Rust

//! # PTTH Server
//!
//! The PTTH server is an HTTP server that can serve files from
//! behind a firewall, because it only makes outgoing HTTP connections
//! to a PTTH relay.
//!
//! ```text
//! View from outside the PTTH tunnel:
//!
//! * HTTP client
//! |
//! | HTTP(S) requests
//! V
//! * ptth_relay
//! ^
//! | HTTP(S) requests
//! |
//! * ptth_server
//!
//! View from inside the PTTH tunnel:
//!
//! * HTTP client
//! |
//! | HTTP(S) requests
//! V
//! * ptth_relay
//! |
//! | HTTP(S) requests
//! V
//! * ptth_server
//! ```
#![warn (clippy::pedantic)]
// I don't see the point in documenting the errors outside of where the
// error type is defined.
#![allow (clippy::missing_errors_doc)]
// False positive on futures::select! macro
#![allow (clippy::mut_mut)]
pub mod errors;
/// In-process file server module with byte range and ETag support
pub mod file_server;
/// Load and de-serialize structures from TOML, with a size limit
/// and checking permissions (On Unix)
pub mod load_toml;
pub mod prelude;
use std::{
collections::*,
future::Future,
path::PathBuf,
sync::Arc,
time::Duration,
};
use futures::FutureExt;
use reqwest::Client;
use tokio::{
sync::{
mpsc,
oneshot,
},
};
use tokio_stream::wrappers::ReceiverStream;
use ptth_core::{
http_serde,
};
// use crate::key_validity::BlakeHashWrapper;
use errors::ServerError;
use prelude::*;
pub struct State {
// file_server: file_server::FileServer,
config: Config,
client: Client,
}
// Unwrap a request from PTTH format and pass it into file_server.
// When file_server responds, wrap it back up and stream it to the relay.
async fn handle_one_req (
state: &State,
req_id: String,
response: http_serde::Response,
) -> Result <(), ServerError>
{
let url = format! ("{}/http_response/{}", state.config.relay_url, req_id);
let mut resp_req = state.client
.post (&url)
.header (ptth_core::PTTH_MAGIC_HEADER, base64::encode (rmp_serde::to_vec (&response.parts).map_err (ServerError::MessagePackEncodeResponse)?))
.header ("X-PTTH-SERVER-NAME", &state.config.name);
if response.parts.status_code != ptth_core::http_serde::StatusCode::NotModified {
if let Some (length) = response.content_length {
resp_req = resp_req.header ("Content-Length", length.to_string ());
}
}
if let Some (mut body) = response.body {
if state.config.throttle_upload {
// Spawn another task to throttle the chunks
let (tx, rx) = mpsc::channel (1);
tokio::spawn (async move {
while let Some (chunk) = body.recv ().await {
let len = chunk.as_ref ().map (Vec::len).ok ();
tx.send (chunk).await?;
if let Some (_len) = len {
// debug! ("Throttling {} byte chunk", len);
}
tokio::time::sleep (Duration::from_millis (1000)).await;
}
Ok::<_, anyhow::Error> (())
});
resp_req = resp_req.body (reqwest::Body::wrap_stream (ReceiverStream::new (rx)));
}
else {
resp_req = resp_req.body (reqwest::Body::wrap_stream (ReceiverStream::new (body)));
}
}
let req = resp_req.build ().map_err (ServerError::Step5Responding)?;
trace! ("{:?}", req.headers ());
//println! ("Step 6");
match state.client.execute (req).await {
Ok (r) => {
let status = r.status ();
let text = r.text ().await.map_err (ServerError::Step7AfterResponse)?;
debug! ("http_response {} {:?} {:?}", req_id, status, text);
},
Err (e) => {
if e.is_request () {
warn! ("Error while POSTing response. Client probably hung up.");
}
error! ("Err: {:?}", e);
},
}
Ok::<(), ServerError> (())
}
async fn handle_requests <F, H, SH> (
state: &Arc <State>,
wrapped_reqs: Vec <http_serde::WrappedRequest>,
spawn_handler: &mut SH,
) -> Result <(), ServerError>
where
F: Send + Future <Output = anyhow::Result <http_serde::Response>>,
H: Send + 'static + FnOnce (http_serde::RequestParts) -> F,
SH: Send + FnMut () -> H
{
//println! ("Step 1");
for wrapped_req in wrapped_reqs {
let state = Arc::clone (&state);
let handler = spawn_handler ();
// These have to detach, so we won't be able to catch the join errors.
tokio::spawn (async move {
let (req_id, parts) = (wrapped_req.id, wrapped_req.req);
info! ("Req {} {}", req_id, parts.uri);
let f = handler (parts);
let response = f.await?;
let output = handle_one_req (&state, req_id.clone (), response).await;
debug! ("Req {} task exiting", req_id);
output
});
}
Ok (())
}
/// Config for `ptth_server` and the file server module
///
/// This is a complete config.
/// The bin frontend is allowed to load an incomplete config from
/// the TOML file, fill it out with command-line options, and put
/// the completed config in this struct.
#[derive (Clone)]
pub struct ConfigFile {
/// A name that uniquely identifies this server on the relay.
/// May be human-readable.
pub name: String,
/// Secret API key used to authenticate the server with the relay
pub api_key: String,
/// URL of the PTTH relay server that ptth_server should connect to
pub relay_url: String,
/// Directory that the file server module will expose to clients
/// over the relay, under `/files`. If None, the current working dir is used.
pub file_server_root: PathBuf,
/// The file server module will expose these directories to clients under
/// `/dirs`. If symlinks can't be used (like on Windows), this allows PTTH
/// to serve multiple directories easily.
pub file_server_roots: BTreeMap <String, PathBuf>,
/// For debugging.
pub throttle_upload: bool,
pub client_keys: HashSet <String>,
pub allow_any_client: bool,
pub index_directories: bool,
}
impl ConfigFile {
#[must_use]
pub fn new (name: String, api_key: String, relay_url: String) -> Self {
Self {
name,
api_key,
relay_url,
file_server_root: PathBuf::from ("."),
file_server_roots: Default::default (),
throttle_upload: false,
client_keys: Default::default (),
allow_any_client: true,
index_directories: true,
}
}
#[must_use]
pub fn tripcode (&self) -> String {
base64::encode (blake3::hash (self.api_key.as_bytes ()).as_bytes ())
}
}
/// Config for `ptth_server` itself
#[derive (Default)]
pub struct Config {
/// The name of our `ptth_server` instance
pub name: String,
/// URL of the PTTH relay server that ptth_server should connect to
pub relay_url: String,
/// For debugging.
pub throttle_upload: bool,
}
pub struct Builder {
config_file: ConfigFile,
hidden_path: Option <PathBuf>,
asset_root: Option <PathBuf>,
}
impl Builder {
pub fn new (
name: String,
relay_url: String,
) -> Self {
let config_file = ConfigFile {
name,
api_key: ptth_core::gen_key (),
relay_url,
file_server_root: PathBuf::from ("."),
file_server_roots: Default::default (),
throttle_upload: false,
client_keys: Default::default (),
allow_any_client: true,
index_directories: true,
};
Self {
config_file,
hidden_path: None,
asset_root: None,
}
}
pub fn build (self) -> Result <State, ServerError>
{
State::new (
self.config_file,
self.hidden_path,
self.asset_root
)
}
pub fn api_key (mut self, key: String) -> Self {
self.config_file.api_key = key;
self
}
}
/// Runs a `ptth_server` instance with the built-in file server module
pub async fn run_server (
config_file: ConfigFile,
shutdown_oneshot: oneshot::Receiver <()>,
hidden_path: Option <PathBuf>,
asset_root: Option <PathBuf>,
hit_counter: Option <mpsc::Sender <()>>,
)
-> Result <(), ServerError>
{
let metrics_interval = Arc::new (arc_swap::ArcSwap::default ());
let interval_writer = Arc::clone (&metrics_interval);
tokio::spawn (async move {
file_server::metrics::Interval::monitor (interval_writer).await;
});
let file_server = file_server::FileServer::new (
file_server::Config {
file_server_root: config_file.file_server_root.clone (),
file_server_roots: config_file.file_server_roots.clone (),
},
&asset_root.clone ().unwrap_or_else (|| PathBuf::from (".")),
config_file.name.clone (),
metrics_interval,
hidden_path.clone (),
)?;
let file_server = Arc::new (file_server);
let state = Arc::new (State::new (
config_file,
hidden_path,
asset_root,
)?);
let file_server_2 = Arc::clone (&file_server);
let mut spawn_handler = || {
let file_server = Arc::clone (&file_server);
let hit_counter = hit_counter.clone ();
|req: http_serde::RequestParts| async move {
if let Some (hit_tx) = &hit_counter {
eprintln! ("hit_tx.send");
hit_tx.send (()).await;
}
Ok (file_server.serve_all (req.method, &req.uri, &req.headers).await?)
}
};
State::run (
&state,
shutdown_oneshot,
&mut spawn_handler,
).await
}
impl State {
pub fn new (
config_file: ConfigFile,
hidden_path: Option <PathBuf>,
asset_root: Option <PathBuf>
)
-> Result <Self, ServerError>
{
use std::convert::TryInto;
let asset_root = asset_root.unwrap_or_else (PathBuf::new);
info! ("Server name is {}", config_file.name);
info! ("Tripcode is {}", config_file.tripcode ());
let mut headers = reqwest::header::HeaderMap::new ();
headers.insert ("X-ApiKey", config_file.api_key.try_into ().map_err (ServerError::ApiKeyInvalid)?);
// Cookie 01FYZ3W64SM6KYNP48J6EWSCEF
// Try to keep the Clients similar here
let client = Client::builder ()
.default_headers (headers)
.timeout (Duration::from_secs (7 * 86400))
.connect_timeout (Duration::from_secs (30))
.build ().map_err (ServerError::CantBuildHttpClient)?;
let state = State {
config: Config {
name: config_file.name,
relay_url: config_file.relay_url,
throttle_upload: config_file.throttle_upload,
},
client,
};
Ok (state)
}
async fn http_listen (
state: &Arc <Self>,
) -> Result <Vec <http_serde::WrappedRequest>, ServerError>
{
use http::status::StatusCode;
let req_resp = state.client.get (&format! ("{}/http_listen/{}", state.config.relay_url, state.config.name))
.timeout (Duration::from_secs (30))
.send ().await.map_err (ServerError::Step3Response)?;
if req_resp.status () == StatusCode::NO_CONTENT {
return Ok (Vec::new ());
}
if req_resp.status () != StatusCode::OK {
error! ("{}", req_resp.status ());
let body = req_resp.bytes ().await.map_err (ServerError::Step3CollectBody)?;
let body = String::from_utf8 (body.to_vec ()).map_err (ServerError::Step3ErrorResponseNotUtf8)?;
error! ("{}", body);
return Err (ServerError::Step3Unknown);
}
let body = req_resp.bytes ().await.map_err (ServerError::CantCollectWrappedRequests)?;
let wrapped_reqs: Vec <http_serde::WrappedRequest> = rmp_serde::from_read_ref (&body)
.map_err (ServerError::CantParseWrappedRequests)?;
Ok (wrapped_reqs)
}
pub async fn run <F, H, SH> (
state: &Arc <Self>,
shutdown_oneshot: oneshot::Receiver <()>,
spawn_handler: &mut SH,
) -> Result <(), ServerError>
where
F: Send + Future <Output = anyhow::Result <http_serde::Response>>,
H: Send + 'static + FnOnce (http_serde::RequestParts) -> F,
SH: Send + FnMut () -> H
{
let mut backoff_delay = 0;
let mut shutdown_oneshot = shutdown_oneshot.fuse ();
for i in 0u64.. {
// TODO: Extract loop body to function?
if backoff_delay > 0 {
let sleep = tokio::time::sleep (Duration::from_millis (backoff_delay));
tokio::pin! (sleep);
tokio::select! {
_ = &mut sleep => {},
_ = &mut shutdown_oneshot => {
info! ("Received graceful shutdown");
break;
},
}
}
trace! ("http_listen {}...", i);
let http_listen_fut = Self::http_listen (state);
let http_listen = futures::select! {
r = http_listen_fut.fuse () => r,
_ = shutdown_oneshot => {
info! ("Received graceful shutdown");
break;
},
};
let err_backoff_delay = std::cmp::min (30_000, backoff_delay * 2 + 500);
let reqs = match http_listen {
Err (e) => {
backoff_delay = err_backoff_delay;
error! ("http_listen {} error, backing off... {:?}", i, e);
continue;
},
Ok (x) => x,
};
trace! ("http_listen {} unwrapped {} requests", i, reqs.len ());
// Unpack the requests, spawn them into new tasks, then loop back
// around.
if handle_requests (
&state,
reqs,
spawn_handler,
).await.is_err () {
backoff_delay = err_backoff_delay;
continue;
}
if backoff_delay != 0 {
debug! ("backoff_delay = 0");
backoff_delay = 0;
}
}
info! ("Exiting");
Ok (())
}
}
pub mod executable {
use std::{
collections::*,
path::{Path, PathBuf},
};
use structopt::StructOpt;
use super::{
load_toml,
prelude::*,
};
pub async fn main (args: &[OsString]) -> anyhow::Result <()> {
let opt = Opt::from_iter (args);
let asset_root = opt.asset_root;
let path = opt.config_path.clone ().unwrap_or_else (|| PathBuf::from ("./config/ptth_server.toml"));
let config_file: ConfigFile = if opt.auto_gen_key {
// If we're in autonomous mode, try harder to fix things
match load_toml::load (&path) {
Err (_) => {
gen_and_save_key (&path)?;
load_toml::load (&path)?
},
Ok (x) => x,
}
}
else {
match load_toml::load (&path) {
Err (super::errors::LoadTomlError::Io (_)) => bail! ("API key not provided in config file and auto-gen-key not provided"),
Ok (x) => x,
Err (e) => return Err (e.into ()),
}
};
let file_server_roots = config_file.file_server_roots
.unwrap_or_else (|| Default::default ());
// `git grep JRY5NXZU` # duplicated code?
let config_file = super::ConfigFile {
name: opt.name.or (config_file.name).ok_or (anyhow::anyhow! ("`name` must be provided in command line or config file"))?,
api_key: config_file.api_key,
relay_url: opt.relay_url.or (config_file.relay_url).ok_or (anyhow::anyhow! ("`--relay-url` must be provided in command line or `relay_url` in config file"))?,
file_server_root: opt.file_server_root.or (config_file.file_server_root).unwrap_or_else (PathBuf::new),
file_server_roots,
throttle_upload: opt.throttle_upload,
allow_any_client: true,
client_keys: Default::default (),
index_directories: config_file.index_directories.unwrap_or (true),
};
if opt.print_tripcode {
println! (r#"name = "{}""#, config_file.name);
println! (r#"tripcode = "{}""#, config_file.tripcode ());
return Ok (());
}
super::run_server (
config_file,
ptth_core::graceful_shutdown::init (),
Some (path),
asset_root,
None,
).await?;
Ok (())
}
#[derive (Debug, StructOpt)]
struct Opt {
#[structopt (long)]
auto_gen_key: bool,
#[structopt (long)]
throttle_upload: bool,
#[structopt (long)]
file_server_root: Option <PathBuf>,
#[structopt (long)]
asset_root: Option <PathBuf>,
#[structopt (long)]
config_path: Option <PathBuf>,
#[structopt (long)]
name: Option <String>,
#[structopt (long)]
print_tripcode: bool,
#[structopt (long)]
relay_url: Option <String>,
}
#[derive (Default, serde::Deserialize)]
struct ConfigFile {
pub name: Option <String>,
pub api_key: String,
pub relay_url: Option <String>,
pub file_server_root: Option <PathBuf>,
pub file_server_roots: Option <BTreeMap <String, PathBuf>>,
pub index_directories: Option <bool>,
}
fn gen_and_save_key (path: &Path) -> anyhow::Result <()> {
use std::fs::File;
let api_key = ptth_core::gen_key ();
let mut f = File::create (path).with_context (|| format! ("Can't create config file `{:?}`", path))?;
#[cfg (unix)]
{
use std::os::unix::fs::PermissionsExt;
let metadata = f.metadata ()?;
let mut permissions = metadata.permissions ();
permissions.set_mode (0o600);
f.set_permissions (permissions)?;
}
#[cfg (not (unix))]
{
tracing::warn! ("Error VR6VW5QT: API keys aren't protected from clients on non-Unix OSes yet");
}
f.write_all (format! ("api_key = \"{}\"\n", api_key).as_bytes ())?;
Ok (())
}
}
#[cfg (test)]
mod tests {
use super::*;
#[test]
fn tripcode_algo () {
let config = ConfigFile::new (
"TestName".into (),
"PlaypenCausalPlatformCommodeImproveCatalyze".into (),
"".into (),
);
assert_eq! (config.tripcode (), "A9rPwZyY89Ag4TJjMoyYA2NeGOm99Je6rq1s0rg8PfY=".to_string ());
}
}