From 687cffdf906b059006c32967aae54f0618dab386 Mon Sep 17 00:00:00 2001 From: _ <> Date: Sun, 29 Nov 2020 18:37:33 +0000 Subject: [PATCH] :recycle: Fixing clippy lints --- crates/ptth_relay/src/config.rs | 12 +-- crates/ptth_relay/src/errors.rs | 42 +++++++++ crates/ptth_relay/src/lib.rs | 147 +++++++++++++++++++------------- 3 files changed, 135 insertions(+), 66 deletions(-) diff --git a/crates/ptth_relay/src/config.rs b/crates/ptth_relay/src/config.rs index 471c69d..9ea5d83 100644 --- a/crates/ptth_relay/src/config.rs +++ b/crates/ptth_relay/src/config.rs @@ -1,19 +1,21 @@ +// False positive with itertools::process_results +#![allow (clippy::redundant_closure)] + use std::{ - collections::*, + collections::HashMap, convert::{TryFrom, TryInto}, iter::FromIterator, path::Path, }; -use serde::Deserialize; - use crate::errors::ConfigError; // Stuff we need to load from the config file and use to // set up the HTTP server pub mod file { - use super::*; + use std::collections::HashMap; + use serde::Deserialize; #[derive (Deserialize)] pub struct Server { @@ -76,7 +78,7 @@ impl Config { let mut f = tokio::fs::File::open (path).await?; - let mut buffer = vec! [0u8; 4096]; + let mut buffer = vec! [0_u8; 4096]; let bytes_read = f.read (&mut buffer).await?; buffer.truncate (bytes_read); diff --git a/crates/ptth_relay/src/errors.rs b/crates/ptth_relay/src/errors.rs index 28d58c3..b0d52dc 100644 --- a/crates/ptth_relay/src/errors.rs +++ b/crates/ptth_relay/src/errors.rs @@ -29,8 +29,50 @@ pub enum ShuttingDownError { ShuttingDown, } +#[derive (Error, Debug)] +pub enum HandleHttpResponseError { + #[error ("HTTP error")] + Http (#[from] http::Error), + + #[error ("Missing PTTH magic header")] + MissingPtthMagicHeader, + + #[error ("PTTH magic header is not base64")] + PtthMagicHeaderNotBase64 (base64::DecodeError), + + #[error ("PTTH magic header could not be decoded as MessagePack")] + PtthMagicHeaderNotMsgPack (rmp_serde::decode::Error), + + #[error ("Couldn't tell server something")] + LostServer, + + #[error ("Relaying task panicked")] + RelayingTaskPanicked (#[from] tokio::task::JoinError), +} + +#[derive (Error, Debug)] +pub enum RequestError { + #[error ("HTTP error")] + Http (#[from] http::Error), + + #[error ("MessagePack encode error")] + MsgPack (#[from] rmp_serde::encode::Error), + + #[error ("Handlebars rendering error")] + Handlebars (#[from] handlebars::RenderError), + + #[error ("Error handling HTTP response")] + HandleHttpResponse (#[from] HandleHttpResponseError), + + #[error ("Error is mysterious!")] + Mysterious, +} + #[derive (Error, Debug)] pub enum RelayError { #[error ("Handlebars template file error")] TemplateFile (#[from] handlebars::TemplateFileError), + + #[error ("Hyper error")] + Hyper (#[from] hyper::Error), } diff --git a/crates/ptth_relay/src/lib.rs b/crates/ptth_relay/src/lib.rs index 73bb72a..16c23b7 100644 --- a/crates/ptth_relay/src/lib.rs +++ b/crates/ptth_relay/src/lib.rs @@ -1,8 +1,23 @@ +#![warn (clippy::pedantic)] + +// I'm not sure if I like this one +#![allow (clippy::enum_glob_use)] + +// I don't see the point in documenting the errors outside of where the +// error type is defined. +#![allow (clippy::missing_errors_doc)] + +// I don't see the point of writing the type twice if I'm initializing a struct +// and the type is already in the struct definition. +#![allow (clippy::default_trait_access)] + +// False positive on futures::select! macro +#![allow (clippy::mut_mut)] + use std::{ borrow::Cow, - error::Error, - collections::*, - convert::{Infallible, TryFrom}, + collections::HashMap, + convert::TryFrom, iter::FromIterator, net::SocketAddr, path::{Path, PathBuf}, @@ -12,6 +27,11 @@ use std::{ time::Duration, }; +use chrono::{ + DateTime, + SecondsFormat, + Utc +}; use dashmap::DashMap; use futures::{ FutureExt, @@ -91,12 +111,6 @@ enum RequestRendezvous { type ResponseRendezvous = oneshot::Sender >; -use chrono::{ - DateTime, - SecondsFormat, - Utc -}; - #[derive (Clone)] pub struct ServerStatus { last_seen: DateTime , @@ -174,9 +188,11 @@ async fn handle_http_listen ( watcher_code: String, api_key: &[u8], ) --> Result , http::Error> +-> Result , RequestError> { - let trip_error = error_reply (StatusCode::UNAUTHORIZED, "Bad X-ApiKey"); + use RequestRendezvous::*; + + let trip_error = || Ok (error_reply (StatusCode::UNAUTHORIZED, "Bad X-ApiKey")?); let expected_tripcode = { let config = state.config.read ().await; @@ -184,16 +200,16 @@ async fn handle_http_listen ( match config.servers.get (&watcher_code) { None => { error! ("Denied http_listen for non-existent server name {}", watcher_code); - return trip_error; + return trip_error (); }, - Some (x) => (*x).tripcode.clone (), + Some (x) => (*x).tripcode, } }; let actual_tripcode = blake3::hash (api_key); if expected_tripcode != actual_tripcode { error! ("Denied http_listen for bad tripcode {}", base64::encode (actual_tripcode.as_bytes ())); - return trip_error; + return trip_error (); } // End of early returns @@ -206,8 +222,6 @@ async fn handle_http_listen ( status.last_seen = Utc::now (); } - use RequestRendezvous::*; - let (tx, rx) = oneshot::channel (); { @@ -220,7 +234,7 @@ async fn handle_http_listen ( // handle them immediately debug! ("Sending {} parked requests to server {}", v.len (), watcher_code); - return ok_reply (rmp_serde::to_vec (&v).unwrap ()); + return Ok (ok_reply (rmp_serde::to_vec (&v)?)?); } } @@ -234,14 +248,14 @@ async fn handle_http_listen ( x = rx.fuse () => match x { Ok (Ok (one_req)) => { debug! ("Unparking server {}", watcher_code); - ok_reply (rmp_serde::to_vec (&vec! [one_req]).unwrap ()) + Ok (ok_reply (rmp_serde::to_vec (&vec! [one_req])?)?) }, - Ok (Err (ShuttingDownError::ShuttingDown)) => error_reply (StatusCode::SERVICE_UNAVAILABLE, "Server is shutting down, try again soon"), - Err (_) => error_reply (StatusCode::INTERNAL_SERVER_ERROR, "Server error"), + Ok (Err (ShuttingDownError::ShuttingDown)) => Ok (error_reply (StatusCode::SERVICE_UNAVAILABLE, "Server is shutting down, try again soon")?), + Err (_) => Ok (error_reply (StatusCode::INTERNAL_SERVER_ERROR, "Server error")?), }, _ = delay_for (Duration::from_secs (30)).fuse () => { debug! ("Timed out http_listen for server {}", watcher_code); - return error_reply (StatusCode::NO_CONTENT, "No requests now, long-poll again") + return Ok (error_reply (StatusCode::NO_CONTENT, "No requests now, long-poll again")?) } } } @@ -253,26 +267,32 @@ async fn handle_http_response ( state: Arc , req_id: String, ) --> Result , http::Error> +-> Result , HandleHttpResponseError> { - let (parts, mut body) = req.into_parts (); - let resp_parts: http_serde::ResponseParts = rmp_serde::from_read_ref (&base64::decode (parts.headers.get (ptth_core::PTTH_MAGIC_HEADER).unwrap ()).unwrap ()).unwrap (); - - // Intercept the body packets here so we can check when the stream - // ends or errors out - #[derive (Debug)] enum BodyFinishedReason { StreamFinished, ClientDisconnected, } use BodyFinishedReason::*; + use HandleHttpResponseError::*; + + let (parts, mut body) = req.into_parts (); + + let magic_header = parts.headers.get (ptth_core::PTTH_MAGIC_HEADER).ok_or (MissingPtthMagicHeader)?; + + let magic_header = base64::decode (magic_header).map_err (PtthMagicHeaderNotBase64)?; + + let resp_parts: http_serde::ResponseParts = rmp_serde::from_read_ref (&magic_header).map_err (PtthMagicHeaderNotMsgPack)?; + + // Intercept the body packets here so we can check when the stream + // ends or errors out let (mut body_tx, body_rx) = mpsc::channel (2); let (body_finished_tx, body_finished_rx) = oneshot::channel (); let mut shutdown_watch_rx = state.shutdown_watch_rx.clone (); - spawn (async move { + let relay_task = spawn (async move { if shutdown_watch_rx.recv ().await == Some (false) { loop { let item = body.next ().await; @@ -285,7 +305,7 @@ async fn handle_http_response ( futures::select! { x = body_tx.send (item).fuse () => if let Err (_) = x { info! ("Body closed while relaying. (Client hung up?)"); - body_finished_tx.send (ClientDisconnected).unwrap (); + body_finished_tx.send (ClientDisconnected).map_err (|_| LostServer).unwrap (); break; }, _ = shutdown_watch_rx.recv ().fuse () => { @@ -296,7 +316,7 @@ async fn handle_http_response ( } else { debug! ("Finished relaying bytes"); - body_finished_tx.send (StreamFinished).unwrap (); + body_finished_tx.send (StreamFinished).map_err (|_| LostServer).unwrap (); break; } } @@ -313,7 +333,7 @@ async fn handle_http_response ( match response_rendezvous.remove (&req_id) { None => { error! ("Server tried to respond to non-existent request"); - return error_reply (StatusCode::BAD_REQUEST, "Request ID not found in response_rendezvous"); + return Ok (error_reply (StatusCode::BAD_REQUEST, "Request ID not found in response_rendezvous")?); }, Some ((_, x)) => x, } @@ -323,20 +343,22 @@ async fn handle_http_response ( if tx.send (Ok ((resp_parts, body))).is_err () { let msg = "Failed to connect to client"; error! (msg); - return error_reply (StatusCode::BAD_GATEWAY, msg); + return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?); } + relay_task.await?; + debug! ("Connected server to client for streaming."); match body_finished_rx.await { Ok (StreamFinished) => { - error_reply (StatusCode::OK, "StreamFinished") + Ok (error_reply (StatusCode::OK, "StreamFinished")?) }, Ok (ClientDisconnected) => { - error_reply (StatusCode::OK, "ClientDisconnected") + Ok (error_reply (StatusCode::OK, "ClientDisconnected")?) }, Err (e) => { debug! ("body_finished_rx {}", e); - error_reply (StatusCode::OK, "body_finished_rx Err") + Ok (error_reply (StatusCode::OK, "body_finished_rx Err")?) }, } } @@ -361,7 +383,7 @@ async fn handle_http_request ( let req = match http_serde::RequestParts::from_hyper (req.method, uri, req.headers) { Ok (x) => x, - _ => return error_reply (StatusCode::BAD_REQUEST, "Bad request"), + Err (_) => return error_reply (StatusCode::BAD_REQUEST, "Bad request"), }; let (tx, rx) = oneshot::channel (); @@ -375,6 +397,8 @@ async fn handle_http_request ( trace! ("Created request {}", req_id); { + use RequestRendezvous::*; + let mut request_rendezvous = state.request_rendezvous.lock ().await; let wrapped = http_serde::WrappedRequest { @@ -382,8 +406,6 @@ async fn handle_http_request ( req, }; - use RequestRendezvous::*; - let new_rendezvous = match request_rendezvous.remove (&watcher_code) { Some (ParkedClients (mut v)) => { debug! ("Parking request {} ({} already queued)", req_id, v.len ()); @@ -439,7 +461,7 @@ async fn handle_http_request ( let mut resp = Response::builder () .status (hyper::StatusCode::from (parts.status_code)); - for (k, v) in parts.headers.into_iter () { + for (k, v) in parts.headers { resp = resp.header (&k, v); } @@ -567,17 +589,17 @@ async fn handle_server_list_internal (state: &Arc ) async fn handle_server_list ( state: Arc -) -> Result , http::Error> +) -> Result , RequestError> { let page = handle_server_list_internal (&state).await; - let s = state.handlebars.render ("relay_server_list", &page).unwrap (); - ok_reply (s) + let s = state.handlebars.render ("relay_server_list", &page)?; + Ok (ok_reply (s)?) } #[instrument (level = "trace", skip (req, state))] async fn handle_all (req: Request , state: Arc ) --> Result , http::Error> +-> Result , RequestError> { let path = req.uri ().path (); //println! ("{}", path); @@ -592,44 +614,47 @@ async fn handle_all (req: Request , state: Arc ) return if let Some (request_code) = prefix_match ("/7ZSFUKGV/http_response/", path) { let request_code = request_code.into (); - handle_http_response (req, state, request_code).await + Ok (handle_http_response (req, state, request_code).await?) } else { - error_reply (StatusCode::BAD_REQUEST, "Can't POST this") + Ok (error_reply (StatusCode::BAD_REQUEST, "Can't POST this")?) }; } if let Some (listen_code) = prefix_match ("/7ZSFUKGV/http_listen/", path) { let api_key = match api_key { - None => return error_reply (StatusCode::UNAUTHORIZED, "Can't register as server without an API key"), + None => return Ok (error_reply (StatusCode::UNAUTHORIZED, "Can't register as server without an API key")?), Some (x) => x, }; handle_http_listen (state, listen_code.into (), api_key.as_bytes ()).await } else if let Some (rest) = prefix_match ("/frontend/servers/", path) { if rest == "" { - handle_server_list (state).await + Ok (handle_server_list (state).await?) } else if let Some (idx) = rest.find ('/') { let listen_code = String::from (&rest [0..idx]); let path = String::from (&rest [idx..]); let (parts, _) = req.into_parts (); - handle_http_request (parts, path, state, listen_code).await + Ok (handle_http_request (parts, path, state, listen_code).await?) } else { - error_reply (StatusCode::BAD_REQUEST, "Bad URI format") + Ok (error_reply (StatusCode::BAD_REQUEST, "Bad URI format")?) } } else if path == "/" { - let s = state.handlebars.render ("relay_root", &()).unwrap (); - ok_reply (s) + let s = state.handlebars.render ("relay_root", &())?; + Ok (ok_reply (s)?) } else if path == "/frontend/relay_up_check" { - error_reply (StatusCode::OK, "Relay is up") + Ok (error_reply (StatusCode::OK, "Relay is up")?) + } + else if path == "/frontend/test_mysterious_error" { + Err (RequestError::Mysterious) } else { - error_reply (StatusCode::OK, "Hi") + Ok (error_reply (StatusCode::OK, "Hi")?) } } @@ -641,10 +666,10 @@ pub fn load_templates (asset_root: &Path) let asset_root = asset_root.join ("handlebars/relay"); - for (k, v) in vec! [ + for (k, v) in &[ ("relay_server_list", "relay_server_list.html"), ("relay_root", "relay_root.html"), - ].into_iter () { + ] { handlebars.register_template_file (k, &asset_root.join (v))?; } @@ -670,7 +695,7 @@ pub async fn run_relay ( shutdown_oneshot: oneshot::Receiver <()>, config_reload_path: Option ) --> Result <(), Box > +-> Result <(), RelayError> { let addr = SocketAddr::from (( [0, 0, 0, 0], @@ -693,7 +718,7 @@ pub async fn run_relay ( let state = state.clone (); async { - Ok::<_, Infallible> (service_fn (move |req| { + Ok::<_, RequestError> (service_fn (move |req| { let state = state.clone (); handle_all (req, state) @@ -705,18 +730,18 @@ pub async fn run_relay ( .serve (make_svc); server.with_graceful_shutdown (async { + use ShuttingDownError::ShuttingDown; + shutdown_oneshot.await.ok (); state.shutdown_watch_tx.broadcast (true).unwrap (); - use ShuttingDownError::ShuttingDown; - let mut response_rendezvous = state.response_rendezvous.write ().await; let mut swapped = DashMap::default (); std::mem::swap (&mut swapped, &mut response_rendezvous); - for (_, sender) in swapped.into_iter () { + for (_, sender) in swapped { sender.send (Err (ShuttingDown)).ok (); }