// Static file server that can plug into the PTTH reverse server // I'm not sure if I like this one #![allow (clippy::enum_glob_use)] use std::{ cmp::min, collections::HashMap, convert::{Infallible, TryFrom}, io::SeekFrom, path::{ Path, PathBuf, }, sync::Arc, }; use arc_swap::ArcSwap; use handlebars::Handlebars; use serde::Serialize; use tokio::{ fs::{ DirEntry, File, ReadDir, }, io::{ AsyncReadExt, AsyncSeekExt, }, sync::mpsc::{ channel, }, }; use tracing::instrument; use ptth_core::{ http_serde::{ Method, Response, StatusCode, }, prelude::*, }; pub mod errors; pub mod metrics; mod html; mod internal; #[cfg(feature = "markdown")] mod markdown; mod range; use errors::FileServerError; #[derive (Default)] pub struct Config { pub file_server_root: Option , } pub struct FileServer { pub config: Config, pub handlebars: handlebars::Handlebars <'static>, pub metrics_startup: metrics::Startup, pub metrics_interval: Arc >>, pub hidden_path: Option , } impl FileServer { pub fn new ( file_server_root: Option , asset_root: &Path, name: String, metrics_interval: Arc >>, hidden_path: Option , ) -> Result { Ok (Self { config: Config { file_server_root, }, handlebars: load_templates (asset_root)?, metrics_startup: metrics::Startup::new (name), metrics_interval, hidden_path, }) } } #[derive (Serialize)] struct DirJson { entries: Vec , } #[derive (Serialize)] struct DirEntryJson { name: String, size: u64, is_dir: bool, } async fn read_dir_entry_json (entry: DirEntry) -> Option { let name = entry.file_name ().into_string ().ok ()?; let metadata = entry.metadata ().await.ok ()?; let is_dir = metadata.is_dir (); let size = metadata.len (); Some (DirEntryJson { name, size, is_dir, }) } async fn serve_dir_json ( mut dir: ReadDir ) -> Result { let mut entries = vec! []; while let Ok (Some (entry)) = dir.next_entry ().await { if let Some (entry) = read_dir_entry_json (entry).await { entries.push (entry); } } entries.sort_unstable_by (|a, b| a.name.cmp (&b.name)); let dir = DirJson { entries, }; let mut response = Response::default (); response.header ("content-type".to_string (), "application/json; charset=UTF-8".to_string ().into_bytes ()); response.body_bytes (serde_json::to_string (&dir).unwrap ().into_bytes ()); Ok (response) } #[instrument (level = "debug", skip (f))] async fn serve_file ( mut f: File, client_wants_body: bool, range: range::ValidParsed, if_none_match: Option <&[u8]>, ) -> Result { // Tripping the etag through UTF-8 isn't the best way to encourage it to // be valid ASCII, but if I make it binary I might accidentally pass the // hash binary as a header, which is not valid. let actual_etag = get_file_etag (&f).await.map (String::into_bytes); let input = ServeFileInput { if_none_match, actual_etag, client_wants_body, range_requested: range.range_requested, }; let decision = serve_file_decision (&input); let (range, range_requested) = (range.range, range.range_requested); info! ("Serving range {}-{}", range.start, range.end); let content_length = range.end - range.start; let body = if decision.should_send_body { let seek = SeekFrom::Start (range.start); f.seek (seek).await?; let (tx, rx) = channel (1); tokio::spawn (async move { stream_file (f, content_length, tx).await; }); Some (rx) } else { None }; let mut response = Response::default (); response.status_code (decision.status_code); // The cache-related headers in HTTP have bad names. See here: // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control // The intended semantics I'm using are: // - etag - Some random hashed value that changes whenever the metadata // (name, inode number, length, mtime) of a file changes. Also changes // on new server instance. Maybe. // - no-cache - Clients and the relay can store this, but should revalidate // with the origin server (us) because only we can check if the file // changed on disk. // - max-age=0 - The file might change at any point during or after the // request, so for proper invalidation, the client should immediately // consider it stale. response.header ("cache-control".to_string (), b"no-cache,max-age=0".to_vec ()); if let Some (etag) = input.actual_etag { response.header ("etag".to_string (), etag); }; response.header (String::from ("accept-ranges"), b"bytes".to_vec ()); if range_requested { response.header (String::from ("content-range"), format! ("bytes {}-{}/{}", range.start, range.end - 1, range.end).into_bytes ()); } response.content_length = Some (content_length); if let Some (body) = body { response.body (body); } Ok (response) } #[derive (Debug)] struct ServeFileInput <'a> { if_none_match: Option <&'a [u8]>, actual_etag: Option >, client_wants_body: bool, range_requested: bool, } #[derive (Debug, PartialEq)] struct ServeFileOutput { status_code: StatusCode, should_send_body: bool, } fn serve_file_decision (input: &ServeFileInput) -> ServeFileOutput { if let (Some (if_none_match), Some (actual_etag)) = (&input.if_none_match, &input.actual_etag) { if actual_etag == if_none_match { return ServeFileOutput { status_code: StatusCode::NotModified, should_send_body: false, }; } } if ! input.client_wants_body { return ServeFileOutput { status_code: StatusCode::NoContent, should_send_body: false, }; } if input.range_requested { return ServeFileOutput { status_code: StatusCode::PartialContent, should_send_body: true, }; } ServeFileOutput { status_code: StatusCode::Ok, should_send_body: true, } } async fn get_file_etag (f: &File) -> Option { #[derive (Serialize)] struct CacheBreaker { len: u64, mtime: std::time::SystemTime, } let md = f.metadata ().await.ok ()?; let buf = rmp_serde::to_vec (&CacheBreaker { len: md.len (), mtime: md.modified ().ok ()?, }).ok ()?; let hash = blake3::hash (&buf); Some (hash.to_hex ().to_string ()) } async fn stream_file ( mut f: File, content_length: u64, tx: tokio::sync::mpsc::Sender , Infallible>>, ) { let mut bytes_sent = 0; let mut bytes_left = content_length; let mark_interval = 200_000; let mut next_mark = mark_interval; loop { let mut buffer = vec! [0_u8; 65_536]; let bytes_read = f.read (&mut buffer).await.expect ("Couldn't read from file"); if bytes_read == 0 { break; } buffer.truncate (bytes_read); let bytes_read_64 = u64::try_from (bytes_read).expect ("Couldn't fit usize into u64"); let bytes_read_64 = min (bytes_left, bytes_read_64); if tx.send (Ok::<_, Infallible> (buffer)).await.is_err () { warn! ("Cancelling file stream (Sent {} out of {} bytes)", bytes_sent, content_length); break; } bytes_left -= bytes_read_64; if bytes_left == 0 { debug! ("Finished"); break; } bytes_sent += bytes_read_64; while next_mark <= bytes_sent { trace! ("Sent {} bytes", next_mark); next_mark += mark_interval; } //delay_for (Duration::from_millis (50)).await; } } impl FileServer { /// Top-level request handler for the file server module. /// /// Passes a request to the internal file server logic. /// Returns an HTTP response as HTML or JSON, depending on the request. #[instrument (level = "debug", skip (self, headers))] pub async fn serve_all ( &self, method: Method, uri: &str, headers: &HashMap > ) -> Result { use internal::{ OutputFormat, Response::*, }; fn serve_error >> ( status_code: StatusCode, msg: S ) -> Response { let mut resp = Response::default (); resp.status_code (status_code); resp.body_bytes (msg.into ()); resp } let default_root = PathBuf::from ("./"); let root: &std::path::Path = self.config.file_server_root .as_ref () .unwrap_or (&default_root); Ok (match internal::serve_all (root, method, uri, headers, self.hidden_path.as_deref ()).await? { Favicon => serve_error (StatusCode::NotFound, "Not found\n"), Forbidden => serve_error (StatusCode::Forbidden, "403 Forbidden\n"), MethodNotAllowed => serve_error (StatusCode::MethodNotAllowed, "Unsupported method\n"), NotFound => serve_error (StatusCode::NotFound, "404 Not Found\nAre you missing a trailing slash?\n"), RangeNotSatisfiable (file_len) => { let mut resp = Response::default (); resp.status_code (StatusCode::RangeNotSatisfiable) .header ("content-range".to_string (), format! ("bytes */{}", file_len).into_bytes ()); resp }, Redirect (location) => { let mut resp = Response::default (); resp.status_code (StatusCode::TemporaryRedirect) .header ("location".to_string (), location.into_bytes ()); resp.body_bytes (b"Redirecting...\n".to_vec ()); resp }, InvalidQuery => serve_error (StatusCode::BadRequest, "Query is invalid for this object\n"), Root => html::serve_root (self).await?, ServeDir (internal::ServeDirParams { path, dir, format }) => match format { OutputFormat::Json => serve_dir_json (dir.into_inner ()).await?, OutputFormat::Html => html::serve_dir (&self.handlebars, &self.metrics_startup, path.to_string_lossy (), dir.into_inner ()).await?, }, ServeFile (internal::ServeFileParams { file, send_body, range, }) => serve_file (file.into_inner (), send_body, range, headers.get ("if-none-match").map (|v| &v[..])).await?, MarkdownErr (e) => { #[cfg (feature = "markdown")] { use markdown::Error::*; let e = e.inner; let code = match &e { TooBig => StatusCode::InternalServerError, //NotMarkdown => serve_error (StatusCode::BadRequest, "File is not Markdown"), NotUtf8 => StatusCode::BadRequest, }; return Ok (serve_error (code, e.to_string ())); } #[cfg (not (feature = "markdown"))] { let _e = e; serve_error (StatusCode::BadRequest, "Markdown feature is disabled") } }, MarkdownPreview (s) => html::serve (s), }) } } fn load_templates ( asset_root: &Path ) -> Result , anyhow::Error> { let mut handlebars = Handlebars::new (); handlebars.set_strict_mode (true); let asset_root = asset_root.join ("handlebars/server"); for (k, v) in &[ ("file_server_dir", "file_server_dir.html"), ("file_server_root", "file_server_root.html"), ] { handlebars.register_template_file (k, asset_root.join (v))?; } Ok (handlebars) } fn pretty_print_bytes (b: u64) -> String { if b < 1024 { format! ("{} B", b) } else if (b + 512) < 1024 * 1024 { format! ("{} KiB", (b + 512) / 1024) } else if (b + 512 * 1024) < 1024 * 1024 * 1024 { format! ("{} MiB", (b + 512 * 1024) / 1024 / 1024) } else { format! ("{} GiB", (b + 512 * 1024 * 1024) / 1024 / 1024 / 1024) } } #[cfg (test)] mod tests;