ptth/crates/ptth_server/src/file_server/metrics.rs

214 lines
4.3 KiB
Rust

use std::{
sync::Arc,
time::{Duration, Instant},
};
use arc_swap::ArcSwap;
use chrono::{DateTime, Utc};
use tracing::{
debug, error, trace,
};
use ulid::Ulid;
// Metrics are named for when they're updated:
// - Startup (Once, when the server state is initialized)
// - Interval (About once per minute)
// - Events (When a request is processed)
#[derive (Debug, serde::Serialize)]
pub struct Startup {
// D-Bus machine ID, if we're on Linux
pub machine_id: Option <String>,
// Git version that ptth_server was built from (unimplemented)
pub git_version: Option <String>,
// User-assigned and human-readable name for this server.
// Must be unique within a relay.
pub server_name: String,
// Random base64 instance ID. ptth_server generates this at process start.
// It's a fallback for detecting outages without relying on any clocks.
#[serde (serialize_with = "serialize_ulid")]
pub instance_id: Ulid,
// System UTC
pub startup_utc: DateTime <Utc>,
}
#[derive (Debug, serde::Serialize)]
pub struct Interval {
#[serde (skip)]
pub at: Instant,
pub utc: DateTime <Utc>,
pub rss_mib: u64,
#[serde (skip)]
pub cpu_usage: heim::process::CpuUsage,
}
pub struct Events {
}
impl Startup {
#[must_use]
pub fn new (server_name: String) -> Self
{
let x = Self {
machine_id: get_machine_id (),
git_version: None,
server_name,
instance_id: ulid::Ulid::new (),
startup_utc: Utc::now (),
};
debug! ("metrics at startup: {:?}", x);
x
}
}
fn serialize_ulid <S: serde::Serializer> (t: &Ulid, s: S)
-> Result <S::Ok, S::Error>
{
let t = t.to_string ();
s.serialize_str (&t)
}
impl Interval {
pub async fn new () -> Result <Self, super::FileServerError> {
use tokio::join;
use heim::process;
use uom::si::{
information::mebibyte,
ratio,
time::second,
};
let our_process = process::current ().await?;
let cpu_usage = our_process.cpu_usage ();
let (cpu_usage, mem) = join! {
cpu_usage,
our_process.memory ()
};
let cpu_usage = cpu_usage?;
let mem = mem?;
let rss_mib = mem.rss ().get::<mebibyte> ();
let x = Self {
at: Instant::now (),
utc: Utc::now (),
rss_mib,
cpu_usage,
};
Ok (x)
}
pub async fn monitor (interval_writer: Arc <ArcSwap <Option <Interval>>>)
{
use uom::si::ratio::percent;
let mut interval = tokio::time::interval (Duration::from_secs (60));
#[derive (Default)]
struct Window {
window_length: u64,
next_interval: u64,
last_metrics: Arc <Option <Interval>>,
}
impl Window {
fn new (window_length: u64) -> Self {
Window {
window_length,
next_interval: 0,
last_metrics: Default::default (),
}
}
fn update (
&mut self,
counter: u64,
new_metrics: &Arc <Option <Interval>>
) {
if counter >= self.next_interval {
if let (Some (old), Some (new)) = (&*self.last_metrics, &**new_metrics) {
let diff = new.cpu_usage.clone () - old.cpu_usage.clone ();
trace! (
"CPU usage over {} s: {}%",
(new.at - old.at).as_secs (),
diff.get::<percent> (),
);
}
self.next_interval += self.window_length;
self.last_metrics = new_metrics.clone ();
}
}
}
let mut counter = 0_u64;
let mut windows = [1, 5, 10, 60, 1440]
.iter ()
.map (|len| Window::new (*len))
.collect::<Vec <_>> ();
loop {
interval.tick ().await;
let new_interval_metrics = match Interval::new ().await {
Err (e) => {
error! ("Failed to update interval metrics: {:?}", e);
continue;
},
Ok (x) => x,
};
let new_interval_metrics = Arc::new (Some (new_interval_metrics));
for window in windows.iter_mut () {
window.update (counter, &new_interval_metrics);
}
interval_writer.store (new_interval_metrics);
counter += 1;
//trace! ("interval metrics 1");
}
}
}
fn get_machine_id () -> Option <String> {
use std::{
fs::File,
io::Read,
};
let mut buf = vec! [0; 1024];
let mut f = File::open ("/etc/machine-id").ok ()?;
let bytes_read = f.read (&mut buf).ok ()?;
buf.truncate (bytes_read);
let s = std::str::from_utf8 (&buf).ok ()?;
let s = s.trim_end ().to_string ();
Some (s)
}
#[cfg (test)]
mod tests {
use super::*;
#[test]
fn ulid_null () {
let a = Startup::new ("bogus".to_string ());
let b = Startup::new ("bogus".to_string ());
assert_ne! (a.instance_id, b.instance_id);
}
}