🐛 bug: give up on env var, it wasn't working
parent
9ab3b42e32
commit
91a29abb39
|
@ -145,6 +145,7 @@ pub mod file {
|
||||||
pub news_url: Option <String>,
|
pub news_url: Option <String>,
|
||||||
pub hide_audit_log: Option <bool>,
|
pub hide_audit_log: Option <bool>,
|
||||||
pub webhook_url: Option <String>,
|
pub webhook_url: Option <String>,
|
||||||
|
pub webhook_interval_s: Option <u32>,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,6 +161,7 @@ pub struct Config {
|
||||||
pub news_url: Option <String>,
|
pub news_url: Option <String>,
|
||||||
pub hide_audit_log: bool,
|
pub hide_audit_log: bool,
|
||||||
pub webhook_url: Option <String>,
|
pub webhook_url: Option <String>,
|
||||||
|
pub webhook_interval_s: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
|
@ -173,6 +175,7 @@ impl Default for Config {
|
||||||
news_url: None,
|
news_url: None,
|
||||||
hide_audit_log: false,
|
hide_audit_log: false,
|
||||||
webhook_url: None,
|
webhook_url: None,
|
||||||
|
webhook_interval_s: 7200,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -203,6 +206,7 @@ impl TryFrom <file::Config> for Config {
|
||||||
news_url: f.news_url,
|
news_url: f.news_url,
|
||||||
hide_audit_log: f.hide_audit_log.unwrap_or (false),
|
hide_audit_log: f.hide_audit_log.unwrap_or (false),
|
||||||
webhook_url: f.webhook_url,
|
webhook_url: f.webhook_url,
|
||||||
|
webhook_interval_s: f.webhook_interval_s.unwrap_or (7200),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -589,6 +589,10 @@ async fn handle_all (
|
||||||
use routing::Route::*;
|
use routing::Route::*;
|
||||||
|
|
||||||
let state = &*state;
|
let state = &*state;
|
||||||
|
{
|
||||||
|
let mut counters = state.webhook_counters.write ().await;
|
||||||
|
counters.requests_total += 1;
|
||||||
|
}
|
||||||
|
|
||||||
// The path is cloned here, so it's okay to consume the request
|
// The path is cloned here, so it's okay to consume the request
|
||||||
// later.
|
// later.
|
||||||
|
@ -878,38 +882,10 @@ pub async fn run_relay (
|
||||||
|
|
||||||
state.audit_log.push (AuditEvent::new (AuditData::RelayStart)).await;
|
state.audit_log.push (AuditEvent::new (AuditData::RelayStart)).await;
|
||||||
|
|
||||||
let webhook_task = {
|
{
|
||||||
let state = state.clone ();
|
let state = state.clone ();
|
||||||
|
tokio::spawn (webhook_task (state));
|
||||||
tokio::spawn (async move {
|
}
|
||||||
let client = reqwest::Client::default ();
|
|
||||||
|
|
||||||
let mut interval = tokio::time::interval (std::time::Duration::from_secs (7200));
|
|
||||||
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
|
|
||||||
|
|
||||||
let mut tick_seq = 0;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
interval.tick ().await;
|
|
||||||
|
|
||||||
let webhook_url = {
|
|
||||||
let config = state.config.read ().await;
|
|
||||||
config.webhook_url.clone ()
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some (webhook_url) = webhook_url.as_ref () {
|
|
||||||
let now = chrono::Utc::now ();
|
|
||||||
|
|
||||||
let j = serde_json::json! ({
|
|
||||||
"text": format! ("PTTH relay sent test webhook message {} at {:?}", tick_seq, now),
|
|
||||||
}).to_string ();
|
|
||||||
|
|
||||||
client.post (webhook_url).body (j).send ().await.ok ();
|
|
||||||
tick_seq += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
trace! ("Serving relay on {:?}", addr);
|
trace! ("Serving relay on {:?}", addr);
|
||||||
|
|
||||||
|
@ -943,10 +919,67 @@ pub async fn run_relay (
|
||||||
debug! ("Performed all cleanup");
|
debug! ("Performed all cleanup");
|
||||||
}).await?;
|
}).await?;
|
||||||
|
|
||||||
webhook_task.abort ();
|
|
||||||
|
|
||||||
Ok (())
|
Ok (())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn webhook_task (state: Arc <Relay>) {
|
||||||
|
use crate::relay_state::MonitoringCounters;
|
||||||
|
|
||||||
|
let client = reqwest::Client::default ();
|
||||||
|
|
||||||
|
let webhook_interval_s = {
|
||||||
|
let config = state.config.read ().await;
|
||||||
|
config.webhook_interval_s
|
||||||
|
};
|
||||||
|
dbg! (webhook_interval_s);
|
||||||
|
|
||||||
|
let mut interval = tokio::time::interval (std::time::Duration::from_secs (webhook_interval_s.into ()));
|
||||||
|
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
|
||||||
|
|
||||||
|
let mut tick_seq = 1;
|
||||||
|
let mut last_counters_reported = (MonitoringCounters::default (), Utc::now (), 0);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
interval.tick ().await;
|
||||||
|
|
||||||
|
let webhook_url = {
|
||||||
|
let config = state.config.read ().await;
|
||||||
|
config.webhook_url.clone ()
|
||||||
|
};
|
||||||
|
|
||||||
|
let webhook_url = match webhook_url {
|
||||||
|
Some (x) => x,
|
||||||
|
None => {
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let now = Utc::now ();
|
||||||
|
|
||||||
|
let counters = {
|
||||||
|
state.webhook_counters.read ().await.clone ()
|
||||||
|
};
|
||||||
|
|
||||||
|
let requests_total_diff = counters.requests_total - last_counters_reported.0.requests_total;
|
||||||
|
|
||||||
|
let j = serde_json::json! ({
|
||||||
|
"text": format! ("From tick {} to {}: Handled {} requests", last_counters_reported.2, tick_seq, requests_total_diff),
|
||||||
|
}).to_string ();
|
||||||
|
|
||||||
|
match client.post (webhook_url).body (j).send ().await {
|
||||||
|
Ok (resp) => {
|
||||||
|
if resp.status () == StatusCode::OK {
|
||||||
|
last_counters_reported = (counters, now, tick_seq);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
dbg! (resp.status ());
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err (e) => { dbg! (e); },
|
||||||
|
}
|
||||||
|
tick_seq += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg (test)]
|
#[cfg (test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
|
@ -48,8 +48,7 @@ async fn main () -> Result <(), Box <dyn Error>> {
|
||||||
}
|
}
|
||||||
|
|
||||||
let config_path = PathBuf::from ("config/ptth_relay.toml");
|
let config_path = PathBuf::from ("config/ptth_relay.toml");
|
||||||
let mut config = Config::from_file (&config_path).await?;
|
let config = Config::from_file (&config_path).await?;
|
||||||
config.webhook_url = std::env::var ("WEBHOOK_URL").ok ().or (config.webhook_url);
|
|
||||||
|
|
||||||
let (shutdown_rx, forced_shutdown) = ptth_core::graceful_shutdown::init_with_force ();
|
let (shutdown_rx, forced_shutdown) = ptth_core::graceful_shutdown::init_with_force ();
|
||||||
|
|
||||||
|
|
|
@ -101,6 +101,16 @@ pub struct Relay {
|
||||||
/// Memory backend for audit logging
|
/// Memory backend for audit logging
|
||||||
// TODO: Add file / database / network server logging backend
|
// TODO: Add file / database / network server logging backend
|
||||||
pub (crate) audit_log: BoundedVec <AuditEvent>,
|
pub (crate) audit_log: BoundedVec <AuditEvent>,
|
||||||
|
|
||||||
|
/// Counters for webhook reporting
|
||||||
|
pub (crate) webhook_counters: RwLock <MonitoringCounters>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive (Clone, Default)]
|
||||||
|
pub (crate) struct MonitoringCounters {
|
||||||
|
pub (crate) requests_total: u64,
|
||||||
|
pub (crate) requests_by_scraper_api: HashMap <String, u64>,
|
||||||
|
pub (crate) requests_by_email: HashMap <String, u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive (Clone)]
|
#[derive (Clone)]
|
||||||
|
@ -204,6 +214,7 @@ impl TryFrom <Config> for Relay {
|
||||||
shutdown_watch_rx,
|
shutdown_watch_rx,
|
||||||
unregistered_servers: BoundedVec::new (20),
|
unregistered_servers: BoundedVec::new (20),
|
||||||
audit_log: BoundedVec::new (256),
|
audit_log: BoundedVec::new (256),
|
||||||
|
webhook_counters: Default::default (),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue