/// This is where all the I/O and task spawning and joining happens use std:: { time::{Duration, Instant}, }; use eframe::egui; use tokio::{ fs, io::{ AsyncWriteExt, BufWriter, Result, }, sync::mpsc, }; use crate:: { capture::Capture, controller:: { Controller, MsgFromController, JpegFrame, RgbaFrame, }, }; pub enum MsgToDriver { GuiNeedsRgbaFrame, GuiDebugCapture (bool), DecodedJpegToRgba (RgbaFrame), NetworkWriteFinished, GotCapture ((Capture, JpegFrame)), } pub enum MsgToGui { NewRgbaFrame ((RgbaFrame, Instant)), } pub struct Driver { send: mpsc::Sender , recv: mpsc::Receiver , gui_ctx: egui::Context, send_to_gui: mpsc::Sender , capture: Option , ctl: Controller, debug_logger: DebugLogger, } pub fn sleep_ms (ms: u64) { std::thread::sleep (std::time::Duration::from_millis (ms)); } impl Driver { pub fn new ( send: mpsc::Sender , recv: mpsc::Receiver , gui_ctx: egui::Context, send_to_gui: mpsc::Sender , ) -> Self { let now = Instant::now (); Self { send, recv, gui_ctx, send_to_gui, capture: Some (Capture::new ().unwrap ()), ctl: Controller::new (now), debug_logger: DebugLogger::new (now), } } pub async fn run (&mut self) { loop { self.debug_logger.log_state (&self.ctl).await.unwrap (); match self.recv.recv().await.unwrap () { MsgToDriver::GuiNeedsRgbaFrame => { self.ctl.handle_gui_needs_frame (); }, MsgToDriver::GuiDebugCapture(x) => self.debug_logger.set_enabled (x).await.unwrap (), MsgToDriver::DecodedJpegToRgba (frame) => { self.ctl.handle_rgba_frame (frame); }, MsgToDriver::NetworkWriteFinished => { self.ctl.handle_network_write_finished (); }, MsgToDriver::GotCapture ((capture, jpeg)) => { self.ctl.handle_capture (jpeg); self.capture = Some (capture); }, } while let Some (msg) = self.ctl.poll () { match msg { MsgFromController::RepaintGui (rgba_frame) => { self.send_to_gui.send (MsgToGui::NewRgbaFrame ((rgba_frame, Instant::now ()))).await.ok (); self.gui_ctx.request_repaint(); }, MsgFromController::StartJpegDecoder (jpeg_frame) => { let send = self.send.clone (); tokio::task::spawn_blocking (move || { // sleep_ms (500); let mut decoder = zune_jpeg::JpegDecoder::new_with_options (&jpeg_frame.data, zune_core::options::DecoderOptions::new_fast().jpeg_set_out_colorspace(zune_core::colorspace::ColorSpace::RGBA)); decoder.decode_headers().unwrap (); let mut rgba = vec![0u8;decoder.output_buffer_size().unwrap ()]; decoder.decode_into(&mut rgba).unwrap (); let rgba = RgbaFrame { data: rgba, capture_time: jpeg_frame.capture_time, }; send.blocking_send (MsgToDriver::DecodedJpegToRgba (rgba)).ok (); }); }, MsgFromController::StartNetworkSend (jpeg) => { let send = self.send.clone (); tokio::spawn (async move { sleep_ms (50); send.send (MsgToDriver::NetworkWriteFinished).await.ok (); }); }, MsgFromController::StartCapture => { let mut capture = self.capture.take ().unwrap (); let send = self.send.clone (); tokio::task::spawn_blocking (move || { let mut data = vec! [0u8; capture.size_image()]; let len = capture.wait_for_frame(&mut data).unwrap (); data.resize (len, 0u8); let frame = JpegFrame { data, capture_time: Instant::now (), }; // sleep_ms (500); send.blocking_send (MsgToDriver::GotCapture ((capture, frame))).ok (); }); } } } } } } struct DebugLogger { inner: Option , start_time: Instant, } impl DebugLogger { pub fn new (start_time: Instant) -> Self { Self { inner: None, start_time, } } pub async fn log_state (&mut self, ctl: &Controller) -> Result <()> { if let Some (inner) = &mut self.inner { inner.log_state(ctl).await?; } Ok (()) } pub async fn set_enabled (&mut self, x: bool) -> Result <()> { match (&mut self.inner, x) { (None, true) => { let f = fs::File::create ("untracked/debug.jsonl").await?; let f = BufWriter::new (f); self.inner = Some (DebugLoggerInner { overhead: Default::default (), log_ops: 0, f, start_time: self.start_time, }); }, (Some (inner), false) => { inner.log_overhead ().await?; inner.f.flush ().await?; self.inner = None; }, _ => (), } Ok (()) } } struct DebugLoggerInner { // Time spent inside the logger overhead: Duration, // Log operations performed log_ops: u32, // Log output file f: BufWriter , // An arbitrary start time to count milliseconds from start_time: Instant, } impl DebugLoggerInner { pub async fn log_overhead (&mut self) -> Result <()> { self.log_map (|me| { serde_json::json! ({ "overhead": { "milliseconds": me.overhead.as_millis (), "ops": me.log_ops, }, }) }).await } pub async fn log_state (&mut self, ctl: &Controller) -> Result <()> { self.log_map (|me| { serde_json::json! ({ "state": ctl.to_json (me.start_time), }) }).await } async fn log_map serde_json::Value> (&mut self, f: F) -> Result <()> { let start = Instant::now (); let time = (start - self.start_time).as_millis (); let j: serde_json::Value = f (&*self); let line = serde_json::json! ([time, j]); self.log_ops += 1; self.f.write_all (format! ("{line}\n").as_bytes ()).await?; let stop = Instant::now (); self.overhead += stop - start; Ok (()) } }