From 91a29abb397754228ae87bdfa3ffd333279f353e Mon Sep 17 00:00:00 2001 From: "(on company time)" <_@_> Date: Fri, 16 Dec 2022 11:23:54 -0600 Subject: [PATCH] :bug: bug: give up on env var, it wasn't working --- crates/ptth_relay/src/config.rs | 4 ++ crates/ptth_relay/src/lib.rs | 99 ++++++++++++++++++---------- crates/ptth_relay/src/main.rs | 3 +- crates/ptth_relay/src/relay_state.rs | 11 ++++ 4 files changed, 82 insertions(+), 35 deletions(-) diff --git a/crates/ptth_relay/src/config.rs b/crates/ptth_relay/src/config.rs index 68c2c5b..92d4573 100644 --- a/crates/ptth_relay/src/config.rs +++ b/crates/ptth_relay/src/config.rs @@ -145,6 +145,7 @@ pub mod file { pub news_url: Option , pub hide_audit_log: Option , pub webhook_url: Option , + pub webhook_interval_s: Option , } } @@ -160,6 +161,7 @@ pub struct Config { pub news_url: Option , pub hide_audit_log: bool, pub webhook_url: Option , + pub webhook_interval_s: u32, } impl Default for Config { @@ -173,6 +175,7 @@ impl Default for Config { news_url: None, hide_audit_log: false, webhook_url: None, + webhook_interval_s: 7200, } } } @@ -203,6 +206,7 @@ impl TryFrom for Config { news_url: f.news_url, hide_audit_log: f.hide_audit_log.unwrap_or (false), webhook_url: f.webhook_url, + webhook_interval_s: f.webhook_interval_s.unwrap_or (7200), }) } } diff --git a/crates/ptth_relay/src/lib.rs b/crates/ptth_relay/src/lib.rs index fad5833..f900657 100644 --- a/crates/ptth_relay/src/lib.rs +++ b/crates/ptth_relay/src/lib.rs @@ -589,6 +589,10 @@ async fn handle_all ( use routing::Route::*; let state = &*state; + { + let mut counters = state.webhook_counters.write ().await; + counters.requests_total += 1; + } // The path is cloned here, so it's okay to consume the request // later. @@ -878,38 +882,10 @@ pub async fn run_relay ( state.audit_log.push (AuditEvent::new (AuditData::RelayStart)).await; - let webhook_task = { + { let state = state.clone (); - - tokio::spawn (async move { - let client = reqwest::Client::default (); - - let mut interval = tokio::time::interval (std::time::Duration::from_secs (7200)); - interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); - - let mut tick_seq = 0; - - loop { - interval.tick ().await; - - let webhook_url = { - let config = state.config.read ().await; - config.webhook_url.clone () - }; - - if let Some (webhook_url) = webhook_url.as_ref () { - let now = chrono::Utc::now (); - - let j = serde_json::json! ({ - "text": format! ("PTTH relay sent test webhook message {} at {:?}", tick_seq, now), - }).to_string (); - - client.post (webhook_url).body (j).send ().await.ok (); - tick_seq += 1; - } - } - }) - }; + tokio::spawn (webhook_task (state)); + } trace! ("Serving relay on {:?}", addr); @@ -943,10 +919,67 @@ pub async fn run_relay ( debug! ("Performed all cleanup"); }).await?; - webhook_task.abort (); - Ok (()) } +async fn webhook_task (state: Arc ) { + use crate::relay_state::MonitoringCounters; + + let client = reqwest::Client::default (); + + let webhook_interval_s = { + let config = state.config.read ().await; + config.webhook_interval_s + }; + dbg! (webhook_interval_s); + + let mut interval = tokio::time::interval (std::time::Duration::from_secs (webhook_interval_s.into ())); + interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); + + let mut tick_seq = 1; + let mut last_counters_reported = (MonitoringCounters::default (), Utc::now (), 0); + + loop { + interval.tick ().await; + + let webhook_url = { + let config = state.config.read ().await; + config.webhook_url.clone () + }; + + let webhook_url = match webhook_url { + Some (x) => x, + None => { + continue; + }, + }; + + let now = Utc::now (); + + let counters = { + state.webhook_counters.read ().await.clone () + }; + + let requests_total_diff = counters.requests_total - last_counters_reported.0.requests_total; + + let j = serde_json::json! ({ + "text": format! ("From tick {} to {}: Handled {} requests", last_counters_reported.2, tick_seq, requests_total_diff), + }).to_string (); + + match client.post (webhook_url).body (j).send ().await { + Ok (resp) => { + if resp.status () == StatusCode::OK { + last_counters_reported = (counters, now, tick_seq); + } + else { + dbg! (resp.status ()); + } + }, + Err (e) => { dbg! (e); }, + } + tick_seq += 1; + } +} + #[cfg (test)] mod tests; diff --git a/crates/ptth_relay/src/main.rs b/crates/ptth_relay/src/main.rs index 2bdd35a..7dc7f4a 100644 --- a/crates/ptth_relay/src/main.rs +++ b/crates/ptth_relay/src/main.rs @@ -48,8 +48,7 @@ async fn main () -> Result <(), Box > { } let config_path = PathBuf::from ("config/ptth_relay.toml"); - let mut config = Config::from_file (&config_path).await?; - config.webhook_url = std::env::var ("WEBHOOK_URL").ok ().or (config.webhook_url); + let config = Config::from_file (&config_path).await?; let (shutdown_rx, forced_shutdown) = ptth_core::graceful_shutdown::init_with_force (); diff --git a/crates/ptth_relay/src/relay_state.rs b/crates/ptth_relay/src/relay_state.rs index 4ad3bcb..4aadc0b 100644 --- a/crates/ptth_relay/src/relay_state.rs +++ b/crates/ptth_relay/src/relay_state.rs @@ -101,6 +101,16 @@ pub struct Relay { /// Memory backend for audit logging // TODO: Add file / database / network server logging backend pub (crate) audit_log: BoundedVec , + + /// Counters for webhook reporting + pub (crate) webhook_counters: RwLock , +} + +#[derive (Clone, Default)] +pub (crate) struct MonitoringCounters { + pub (crate) requests_total: u64, + pub (crate) requests_by_scraper_api: HashMap , + pub (crate) requests_by_email: HashMap , } #[derive (Clone)] @@ -204,6 +214,7 @@ impl TryFrom for Relay { shutdown_watch_rx, unregistered_servers: BoundedVec::new (20), audit_log: BoundedVec::new (256), + webhook_counters: Default::default (), }) } }