five_five_five/src/driver.rs

278 lines
8.0 KiB
Rust

/// This is where all the I/O and task spawning and joining happens
use std::
{
time::{Duration, Instant},
};
use eframe::egui;
use tokio::{
fs,
io::{
AsyncWriteExt,
BufWriter,
Result,
},
sync::mpsc,
};
use crate::
{
capture::Capture,
controller::
{
Controller,
MsgFromController,
JpegFrame,
RgbaFrame,
},
};
pub enum MsgToDriver
{
GuiNeedsRgbaFrame,
GuiDebugCapture (bool),
DecodedJpegToRgba (RgbaFrame),
NetworkWriteFinished,
GotCapture ((Capture, JpegFrame)),
}
pub enum MsgToGui
{
NewRgbaFrame ((RgbaFrame, Instant)),
}
pub struct Driver
{
send: mpsc::Sender <MsgToDriver>,
recv: mpsc::Receiver <MsgToDriver>,
gui_ctx: egui::Context,
send_to_gui: mpsc::Sender <MsgToGui>,
capture: Option <Capture>,
ctl: Controller,
debug_logger: DebugLogger,
}
pub fn sleep_ms (ms: u64)
{
std::thread::sleep (std::time::Duration::from_millis (ms));
}
impl Driver
{
pub fn new (
send: mpsc::Sender <MsgToDriver>,
recv: mpsc::Receiver <MsgToDriver>,
gui_ctx: egui::Context,
send_to_gui: mpsc::Sender <MsgToGui>,
) -> Self
{
let now = Instant::now ();
Self {
send,
recv,
gui_ctx,
send_to_gui,
capture: Some (Capture::new ().unwrap ()),
ctl: Controller::new (now),
debug_logger: DebugLogger::new (now),
}
}
pub async fn run (&mut self)
{
loop
{
self.debug_logger.log_state (&self.ctl).await.unwrap ();
match self.recv.recv().await.unwrap ()
{
MsgToDriver::GuiNeedsRgbaFrame =>
{
self.ctl.handle_gui_needs_frame ();
},
MsgToDriver::GuiDebugCapture(x) => self.debug_logger.set_enabled (x).await.unwrap (),
MsgToDriver::DecodedJpegToRgba (frame) =>
{
self.ctl.handle_rgba_frame (frame);
},
MsgToDriver::NetworkWriteFinished =>
{
self.ctl.handle_network_write_finished ();
},
MsgToDriver::GotCapture ((capture, jpeg)) =>
{
self.ctl.handle_capture (jpeg);
self.capture = Some (capture);
},
}
while let Some (msg) = self.ctl.poll ()
{
match msg
{
MsgFromController::RepaintGui (rgba_frame) =>
{
self.send_to_gui.send (MsgToGui::NewRgbaFrame ((rgba_frame, Instant::now ()))).await.ok ();
self.gui_ctx.request_repaint();
},
MsgFromController::StartJpegDecoder (jpeg_frame) =>
{
let send = self.send.clone ();
tokio::task::spawn_blocking (move ||
{
// sleep_ms (500);
let mut decoder = zune_jpeg::JpegDecoder::new_with_options (&jpeg_frame.data, zune_core::options::DecoderOptions::new_fast().jpeg_set_out_colorspace(zune_core::colorspace::ColorSpace::RGBA));
decoder.decode_headers().unwrap ();
let mut rgba = vec![0u8;decoder.output_buffer_size().unwrap ()];
decoder.decode_into(&mut rgba).unwrap ();
let rgba = RgbaFrame
{
data: rgba,
capture_time: jpeg_frame.capture_time,
};
send.blocking_send (MsgToDriver::DecodedJpegToRgba (rgba)).ok ();
});
},
MsgFromController::StartNetworkSend (jpeg) =>
{
let send = self.send.clone ();
tokio::spawn (async move
{
sleep_ms (50);
send.send (MsgToDriver::NetworkWriteFinished).await.ok ();
});
},
MsgFromController::StartCapture =>
{
let mut capture = self.capture.take ().unwrap ();
let send = self.send.clone ();
tokio::task::spawn_blocking (move ||
{
let mut data = vec! [0u8; capture.size_image()];
let len = capture.wait_for_frame(&mut data).unwrap ();
data.resize (len, 0u8);
let frame = JpegFrame
{
data,
capture_time: Instant::now (),
};
// sleep_ms (500);
send.blocking_send (MsgToDriver::GotCapture ((capture, frame))).ok ();
});
}
}
}
}
}
}
struct DebugLogger {
inner: Option <DebugLoggerInner>,
start_time: Instant,
}
impl DebugLogger {
pub fn new (start_time: Instant) -> Self {
Self {
inner: None,
start_time,
}
}
pub async fn log_state (&mut self, ctl: &Controller) -> Result <()> {
if let Some (inner) = &mut self.inner {
inner.log_state(ctl).await?;
}
Ok (())
}
pub async fn set_enabled (&mut self, x: bool) -> Result <()> {
match (&mut self.inner, x) {
(None, true) => {
let f = fs::File::create ("untracked/debug.jsonl").await?;
let f = BufWriter::new (f);
self.inner = Some (DebugLoggerInner {
overhead: Default::default (),
log_ops: 0,
f,
start_time: self.start_time,
});
},
(Some (inner), false) => {
inner.log_overhead ().await?;
inner.f.flush ().await?;
self.inner = None;
},
_ => (),
}
Ok (())
}
}
struct DebugLoggerInner {
// Time spent inside the logger
overhead: Duration,
// Log operations performed
log_ops: u32,
// Log output file
f: BufWriter <fs::File>,
// An arbitrary start time to count milliseconds from
start_time: Instant,
}
impl DebugLoggerInner {
pub async fn log_overhead (&mut self) -> Result <()> {
self.log_map (|me| {
serde_json::json! ({
"overhead": {
"milliseconds": me.overhead.as_millis (),
"ops": me.log_ops,
},
})
}).await
}
pub async fn log_state (&mut self, ctl: &Controller) -> Result <()> {
self.log_map (|me| {
serde_json::json! ({
"state": ctl.to_json (me.start_time),
})
}).await
}
async fn log_map <F: FnOnce (&Self) -> serde_json::Value> (&mut self, f: F) -> Result <()> {
let start = Instant::now ();
let time = (start - self.start_time).as_millis ();
let j: serde_json::Value = f (&*self);
let line = serde_json::json! ([time, j]);
self.log_ops += 1;
self.f.write_all (format! ("{line}\n").as_bytes ()).await?;
let stop = Instant::now ();
self.overhead += stop - start;
Ok (())
}
}