👕 refactor: port to Tokio

main
_ 2023-09-14 21:35:48 -05:00
parent f02c8b8449
commit 44a24d0d2d
5 changed files with 144 additions and 39 deletions

88
Cargo.lock generated
View File

@ -87,6 +87,15 @@ dependencies = [
"winit", "winit",
] ]
[[package]]
name = "addr2line"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
dependencies = [
"gimli",
]
[[package]] [[package]]
name = "adler" name = "adler"
version = "1.0.2" version = "1.0.2"
@ -229,7 +238,7 @@ dependencies = [
"polling", "polling",
"rustix 0.37.23", "rustix 0.37.23",
"slab", "slab",
"socket2", "socket2 0.4.9",
"waker-fn", "waker-fn",
] ]
@ -333,6 +342,21 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backtrace"
version = "0.3.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.3.2" version = "1.3.2"
@ -913,6 +937,7 @@ dependencies = [
"linuxvideo", "linuxvideo",
"rayon", "rayon",
"thiserror", "thiserror",
"tokio",
"zune-core", "zune-core",
"zune-jpeg", "zune-jpeg",
] ]
@ -1037,6 +1062,12 @@ dependencies = [
"wasi", "wasi",
] ]
[[package]]
name = "gimli"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
[[package]] [[package]]
name = "gl_generator" name = "gl_generator"
version = "0.14.0" version = "0.14.0"
@ -1622,6 +1653,15 @@ dependencies = [
"objc", "objc",
] ]
[[package]]
name = "object"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.18.0" version = "1.18.0"
@ -1874,6 +1914,12 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da"
[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.37.23" version = "0.37.23"
@ -2065,6 +2111,16 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "socket2"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e"
dependencies = [
"libc",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "static_assertions" name = "static_assertions"
version = "1.1.0" version = "1.1.0"
@ -2178,6 +2234,36 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.4",
"tokio-macros",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-macros"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.31",
]
[[package]] [[package]]
name = "toml_datetime" name = "toml_datetime"
version = "0.6.3" version = "0.6.3"

View File

@ -10,6 +10,7 @@ eframe = "0.22.0"
linuxvideo = { path = "deps/LinuxVideo" } linuxvideo = { path = "deps/LinuxVideo" }
rayon = "1.7.0" rayon = "1.7.0"
thiserror = "1.0.48" thiserror = "1.0.48"
tokio = { version = "1.32.0", features = ["full"] }
zune-core = { path = "deps/zune-image/zune-core" } zune-core = { path = "deps/zune-image/zune-core" }
zune-jpeg = { path = "deps/zune-image/zune-jpeg", default-features = false, features = ["std"] } zune-jpeg = { path = "deps/zune-image/zune-jpeg", default-features = false, features = ["std"] }

View File

@ -1,3 +1,6 @@
/// No I/O, no blocking, no threads. One state machine that controls
/// everything.
use std:: use std::
{ {
time::Instant, time::Instant,

View File

@ -1,10 +1,12 @@
/// This is where all the I/O and task spawning and joining happens
use std:: use std::
{ {
sync::mpsc,
time::Instant, time::Instant,
}; };
use eframe::egui; use eframe::egui;
use tokio::sync::mpsc;
use crate:: use crate::
{ {
@ -33,11 +35,11 @@ pub enum MsgToGui
pub struct Driver pub struct Driver
{ {
send: mpsc::SyncSender <MsgToDriver>, send: mpsc::Sender <MsgToDriver>,
recv: mpsc::Receiver <MsgToDriver>, recv: mpsc::Receiver <MsgToDriver>,
gui_ctx: egui::Context, gui_ctx: egui::Context,
send_to_gui: mpsc::SyncSender <MsgToGui>, send_to_gui: mpsc::Sender <MsgToGui>,
capture: Option <Capture>, capture: Option <Capture>,
@ -52,10 +54,10 @@ pub fn sleep_ms (ms: u64)
impl Driver impl Driver
{ {
pub fn new ( pub fn new (
send: mpsc::SyncSender <MsgToDriver>, send: mpsc::Sender <MsgToDriver>,
recv: mpsc::Receiver <MsgToDriver>, recv: mpsc::Receiver <MsgToDriver>,
gui_ctx: egui::Context, gui_ctx: egui::Context,
send_to_gui: mpsc::SyncSender <MsgToGui>, send_to_gui: mpsc::Sender <MsgToGui>,
) -> Self ) -> Self
{ {
Self { Self {
@ -68,13 +70,11 @@ impl Driver
} }
} }
pub fn run (&mut self) pub async fn run (&mut self)
{ {
let pool = rayon::ThreadPoolBuilder::new().build().unwrap ();
loop loop
{ {
match self.recv.recv().unwrap () match self.recv.recv().await.unwrap ()
{ {
MsgToDriver::GuiNeedsRgbaFrame => MsgToDriver::GuiNeedsRgbaFrame =>
{ {
@ -101,13 +101,13 @@ impl Driver
{ {
MsgFromController::RepaintGui (rgba_frame) => MsgFromController::RepaintGui (rgba_frame) =>
{ {
self.send_to_gui.send (MsgToGui::NewRgbaFrame (rgba_frame)).unwrap (); self.send_to_gui.send (MsgToGui::NewRgbaFrame (rgba_frame)).await.ok ();
self.gui_ctx.request_repaint(); self.gui_ctx.request_repaint();
}, },
MsgFromController::StartJpegDecoder (jpeg_frame) => MsgFromController::StartJpegDecoder (jpeg_frame) =>
{ {
let send = self.send.clone (); let send = self.send.clone ();
pool.spawn (move || tokio::task::spawn_blocking (move ||
{ {
// sleep_ms (500); // sleep_ms (500);
let mut decoder = zune_jpeg::JpegDecoder::new_with_options (&jpeg_frame.data, zune_core::options::DecoderOptions::new_fast().jpeg_set_out_colorspace(zune_core::colorspace::ColorSpace::RGBA)); let mut decoder = zune_jpeg::JpegDecoder::new_with_options (&jpeg_frame.data, zune_core::options::DecoderOptions::new_fast().jpeg_set_out_colorspace(zune_core::colorspace::ColorSpace::RGBA));
@ -122,17 +122,17 @@ impl Driver
capture_time: jpeg_frame.capture_time, capture_time: jpeg_frame.capture_time,
}; };
send.send (MsgToDriver::DecodedJpegToRgba (rgba)).unwrap (); send.blocking_send (MsgToDriver::DecodedJpegToRgba (rgba)).ok ();
}); });
}, },
MsgFromController::StartNetworkSend (jpeg) => MsgFromController::StartNetworkSend (jpeg) =>
{ {
let send = self.send.clone (); let send = self.send.clone ();
pool.spawn (move || tokio::spawn (async move
{ {
sleep_ms (50); sleep_ms (50);
send.send (MsgToDriver::NetworkWriteFinished).unwrap (); send.send (MsgToDriver::NetworkWriteFinished).await.ok ();
}); });
}, },
MsgFromController::StartCapture => MsgFromController::StartCapture =>
@ -140,7 +140,7 @@ impl Driver
let mut capture = self.capture.take ().unwrap (); let mut capture = self.capture.take ().unwrap ();
let send = self.send.clone (); let send = self.send.clone ();
pool.spawn (move || tokio::task::spawn_blocking (move ||
{ {
let mut data = vec! [0u8; capture.size_image()]; let mut data = vec! [0u8; capture.size_image()];
capture.wait_for_frame(&mut data).unwrap (); capture.wait_for_frame(&mut data).unwrap ();
@ -153,7 +153,7 @@ impl Driver
// sleep_ms (500); // sleep_ms (500);
send.send (MsgToDriver::GotCapture ((capture, frame))).unwrap (); send.blocking_send (MsgToDriver::GotCapture ((capture, frame))).ok ();
}); });
} }
} }

View File

@ -1,6 +1,4 @@
use std::{ use std::{
sync::mpsc,
thread,
time::{ time::{
Duration, Duration,
Instant, Instant,
@ -9,6 +7,8 @@ use std::{
use eframe::egui; use eframe::egui;
use tokio::sync::mpsc;
mod capture; mod capture;
mod controller; mod controller;
mod driver; mod driver;
@ -65,9 +65,8 @@ struct App {
camera_fps: u64, camera_fps: u64,
latency: Duration, latency: Duration,
send_to_ctl: mpsc::SyncSender <MsgToDriver>, send_to_ctl: mpsc::Sender <MsgToDriver>,
recv_at_gui: mpsc::Receiver <MsgToGui>, recv_at_gui: mpsc::Receiver <MsgToGui>,
driver_thread: thread::JoinHandle <()>,
requesting_frames: bool, requesting_frames: bool,
@ -77,21 +76,13 @@ struct App {
impl App { impl App {
fn new ( fn new (
cc: &eframe::CreationContext<'_>, cc: &eframe::CreationContext<'_>,
send_to_ctl: mpsc::Sender <MsgToDriver>,
recv_at_gui: mpsc::Receiver <MsgToGui>,
send_gui_handle: tokio::sync::oneshot::Sender <egui::Context>,
) -> Self ) -> Self
{ {
let (send_to_ctl, recv_at_ctl) = mpsc::sync_channel (8);
let (send_to_gui, recv_at_gui) = mpsc::sync_channel (8);
let gui_ctx = cc.egui_ctx.clone (); let gui_ctx = cc.egui_ctx.clone ();
send_gui_handle.send (gui_ctx).unwrap ();
let mut driver = Driver::new
(
send_to_ctl.clone (),
recv_at_ctl,
gui_ctx,
send_to_gui,
);
let driver_thread = thread::spawn (move || driver.run ());
let img = egui::ColorImage::new ([1280,720], egui::Color32::TEMPORARY_COLOR); let img = egui::ColorImage::new ([1280,720], egui::Color32::TEMPORARY_COLOR);
@ -106,7 +97,6 @@ impl App {
latency: Duration::from_millis(0), latency: Duration::from_millis(0),
send_to_ctl, send_to_ctl,
recv_at_gui, recv_at_gui,
driver_thread,
requesting_frames: true, requesting_frames: true,
next_stat_refresh: Instant::now (), next_stat_refresh: Instant::now (),
} }
@ -123,7 +113,7 @@ impl eframe::App for App {
if self.requesting_frames if self.requesting_frames
{ {
self.send_to_ctl.send (MsgToDriver::GuiNeedsRgbaFrame).unwrap (); self.send_to_ctl.blocking_send (MsgToDriver::GuiNeedsRgbaFrame).unwrap ();
} }
while let Ok (msg) = self.recv_at_gui.try_recv() while let Ok (msg) = self.recv_at_gui.try_recv()
@ -135,7 +125,7 @@ impl eframe::App for App {
texture.set (egui::ColorImage::from_rgba_premultiplied([1280,720], &frame.data), Default::default ()); texture.set (egui::ColorImage::from_rgba_premultiplied([1280,720], &frame.data), Default::default ());
self.latency = Instant::now () - frame.capture_time; self.latency = Instant::now () - frame.capture_time;
self.camera_frames += 1; self.camera_frames += 1;
println! (" Camera"); // println! (" Camera");
}, },
} }
} }
@ -169,7 +159,7 @@ impl eframe::App for App {
ui.image (texture, size); ui.image (texture, size);
self.gui_frames += 1; self.gui_frames += 1;
println! ("GUI"); // println! ("GUI");
ui.checkbox(&mut self.requesting_frames, "Run"); ui.checkbox(&mut self.requesting_frames, "Run");
@ -180,9 +170,34 @@ impl eframe::App for App {
fn main_egui () fn main_egui ()
{ {
let native_options = eframe::NativeOptions::default(); let rt = tokio::runtime::Runtime::new ().unwrap ();
eframe::run_native("Five Five Five", native_options, Box::new(|cc| Box::new(App::new(cc)))).unwrap (); let (send_to_ctl, recv_at_ctl) = mpsc::channel (8);
let (send_to_gui, recv_at_gui) = mpsc::channel (8);
let (send_gui_handle, recv_gui_handle) = tokio::sync::oneshot::channel ();
let send_to_ctl_2 = send_to_ctl.clone ();
rt.spawn (async move {
let gui_ctx = recv_gui_handle.await.unwrap ();
let mut driver = Driver::new
(
send_to_ctl_2,
recv_at_ctl,
gui_ctx,
send_to_gui,
);
driver.run ().await
});
// GUIs are the most needy, picky, finicky piece of any program
// so they get spawned last, given the main thread, and babied.
//
// Not to mention that `run_native` is callback heck. I'm sure it's
// for a good reason, but still.
let native_options = eframe::NativeOptions::default();
eframe::run_native("Five Five Five", native_options, Box::new(|cc| Box::new(App::new(cc, send_to_ctl, recv_at_gui, send_gui_handle)))).unwrap ();
} }
#[derive (Debug, thiserror::Error)] #[derive (Debug, thiserror::Error)]