now the code is pretty and it just shows the camera stream on screen

which is cool
main
_ 2023-09-10 23:10:54 -05:00
parent f87d3fc00e
commit ef2e803c43
4 changed files with 311 additions and 737 deletions

View File

@ -22,8 +22,8 @@ impl Capture
pub fn new () -> Result <Self, Error> pub fn new () -> Result <Self, Error>
{ {
let x = Device::open ("/dev/video0")?; let x = Device::open ("/dev/video0")?;
dbg! (x.formats (linuxvideo::BufType::VIDEO_CAPTURE).collect::<Vec <_>> ());
let x = x.video_capture (PixFormat::new (1920, 1080, PixelFormat::MJPG))?; let x = x.video_capture (PixFormat::new (1280, 720, PixelFormat::MJPG))?;
x.set_frame_interval(linuxvideo::Fract::new(1, 30)).unwrap (); x.set_frame_interval(linuxvideo::Fract::new(1, 30)).unwrap ();
dbg! (x.format ()); dbg! (x.format ());
let size_image = usize::try_from (x.format ().size_image ()).unwrap (); let size_image = usize::try_from (x.format ().size_image ()).unwrap ();
@ -54,7 +54,7 @@ impl Capture
{ {
let bytesused = usize::try_from (view.bytesused ()).unwrap (); let bytesused = usize::try_from (view.bytesused ()).unwrap ();
let input = &view [0..bytesused]; let input = &view [0..bytesused];
&mut output [0..bytesused].copy_from_slice (&input); let _ = &mut output [0..bytesused].copy_from_slice (&input);
Ok (input.len ()) Ok (input.len ())
})?) })?)
} }

View File

@ -1,298 +0,0 @@
use std::fmt;
use thiserror::Error;
#[derive (Default)]
pub struct Controller
{
/// Pipeline for pulling data from the network, decoding it,
/// and displaying it.
rx_pipe: RxPipeline,
/// Pipeline for pulling data from capture devices, encoding it,
/// and sending it over the network.
tx_pipe: TxPipeline,
}
#[derive (Debug)]
pub struct ControlEvent
{
pub rx: Option <RxPipelineEvent>,
pub tx: Option <TxPipelineEvent>,
}
impl Controller
{
pub fn handle_capture_frame (&mut self, buf_raw: BufRaw)
-> Result <(), ControlError>
{
self.tx_pipe.handle_capture_frame (buf_raw)
}
pub fn handle_encoder_finished (&mut self)
{
self.tx_pipe.encoder_is_busy = false;
}
pub fn handle_encoded_packet (&mut self, buf_enc: Option <BufEncoded>) -> Result <(), ControlError>
{
self.tx_pipe.handle_encoded_packet (buf_enc)
}
pub fn handle_transmitted (&mut self, x: bool)
{
self.tx_pipe.handle_transmitted (x)
}
pub fn poll (&mut self) -> ControlEvent
{
let rx = self.rx_pipe.poll ();
let tx = self.tx_pipe.poll ();
ControlEvent {
rx,
tx,
}
}
}
#[derive (Default)]
struct RxPipeline
{
buf_enc: Option <BufEncoded>,
buf_raw: Option <BufRaw>,
}
#[derive (Debug)]
pub enum RxPipelineEvent
{
DisplayRaw (BufRaw),
Decode (BufEncoded),
Receive,
}
#[derive (Debug, Error)]
enum RxPipeError
{
#[error ("already have encoded data")]
AlreadyHaveEncodedData,
#[error ("already have raw data")]
AlreadyHaveRawData,
}
impl RxPipeline
{
fn handle_encoder_polled (&mut self, has_data: bool)
{
}
fn handle_raw_frame (&mut self, buf_raw: BufRaw) -> Result <(), RxPipeError>
{
if self.has_raw_data ()
{
return Err (RxPipeError::AlreadyHaveRawData);
}
self.buf_raw = Some (buf_raw);
Ok (())
}
fn poll (&mut self) -> Option <RxPipelineEvent>
{
if let Some (buf_raw) = self.buf_raw.take ()
{
return Some (RxPipelineEvent::DisplayRaw (buf_raw));
}
if ! self.has_raw_data ()
{
if let Some (buf_enc) = self.buf_enc.take ()
{
return Some (RxPipelineEvent::Decode (buf_enc));
}
}
if ! self.has_encoded_data ()
{
return Some (RxPipelineEvent::Receive);
}
None
}
/// True if we have a buffer of received data, ready
/// to be decoded.
fn has_encoded_data (&self) -> bool
{
self.buf_enc.is_some ()
}
fn has_raw_data (&self) -> bool
{
self.buf_raw.is_some ()
}
}
#[derive (Default)]
struct TxPipeline
{
buf_enc: Option <BufEncoded>,
buf_raw: Option <BufRaw>,
capture_is_busy: bool,
encoder_has_data: bool,
encoder_is_busy: bool,
network_congested: bool,
transmitter_is_busy: bool,
}
#[derive (Debug)]
pub enum TxPipelineEvent
{
/// Capture a raw frame and pass it to handle_capture_frame
Capture,
/// Poll the encoder. If it returns a packet, pass it to
/// handle_encoded_packet.
PollEncoder,
/// Encode this raw frame and pass the encoded frame to
/// handle_encoded_packet
Encode (BufRaw),
/// Transmit this encoded frame to the network
Transmit (BufEncoded),
}
impl TxPipeline
{
fn handle_capture_frame (&mut self, buf_raw: BufRaw)
-> Result <(), ControlError>
{
self.capture_is_busy = false;
if self.has_raw_frame ()
{
return Err (ControlError::AlreadyHaveRawData);
}
self.buf_raw = Some (buf_raw);
Ok (())
}
fn handle_encoded_packet (&mut self, buf_enc: Option <BufEncoded>) -> Result <(), ControlError>
{
self.encoder_has_data = buf_enc.is_some ();
let buf_enc = match buf_enc
{
None => return Ok (()),
Some (x) => x,
};
self.encoder_is_busy = false;
if self.has_encoded_packet ()
{
return Err (ControlError::AlreadyHaveEncodedData);
}
self.buf_enc = Some (buf_enc);
Ok (())
}
fn handle_transmitted (&mut self, busy: bool)
{
self.network_congested = busy;
self.transmitter_is_busy = false;
}
fn has_encoded_packet (&self) -> bool
{
self.buf_enc.is_some ()
}
fn has_raw_frame (&self) -> bool
{
self.buf_raw.is_some ()
}
fn poll (&mut self) -> Option <TxPipelineEvent>
{
if ! self.network_congested && ! self.transmitter_is_busy
{
if let Some (buf_enc) = self.buf_enc.take ()
{
self.transmitter_is_busy = true;
return Some (TxPipelineEvent::Transmit (buf_enc));
}
}
if ! self.has_encoded_packet () && ! self.encoder_is_busy
{
if self.encoder_has_data
{
return Some (TxPipelineEvent::PollEncoder);
}
if let Some (buf_raw) = self.buf_raw.take ()
{
self.encoder_has_data = true;
self.encoder_is_busy = true;
return Some (TxPipelineEvent::Encode (buf_raw));
}
}
if ! self.has_raw_frame ()
{
if ! self.capture_is_busy
{
self.capture_is_busy = true;
return Some (TxPipelineEvent::Capture);
}
}
None
}
}
pub struct BufEncoded
{
pub buf: Vec <u8>
}
impl fmt::Debug for BufEncoded {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f
.debug_struct("BufEncoded")
.finish()
}
}
pub struct BufRaw
{
pub buf: Vec <u8>
}
impl fmt::Debug for BufRaw {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f
.debug_struct("BufRaw")
.finish()
}
}
#[derive (Debug, Error)]
pub enum ControlError
{
#[error ("already have encoded data")]
AlreadyHaveEncodedData,
#[error ("already have raw data")]
AlreadyHaveRawData,
}

View File

@ -12,10 +12,8 @@ use std::{
use eframe::egui; use eframe::egui;
use rayon::ThreadPool; use rayon::ThreadPool;
use crate::controller::*;
mod capture; mod capture;
mod controller; mod task;
fn main() -> Result <(), Error> fn main() -> Result <(), Error>
{ {
@ -23,12 +21,7 @@ fn main() -> Result <(), Error>
let _exe_name = args.next (); let _exe_name = args.next ();
match args.next ().as_deref () match args.next ().as_deref ()
{ {
None => None => main_egui (),
{
let pool = rayon::ThreadPoolBuilder::new ().build ().unwrap ();
let mut driver = Driver::new (&pool)?;
driver.main ()?;
},
Some ("capture") => Some ("capture") =>
{ {
use std::io::Write; use std::io::Write;
@ -38,7 +31,7 @@ fn main() -> Result <(), Error>
for _ in 0..30 for _ in 0..30
{ {
cap.wait_for_frame (&mut buf); cap.wait_for_frame (&mut buf).unwrap ();
} }
let start = Instant::now (); let start = Instant::now ();
@ -67,7 +60,6 @@ fn main() -> Result <(), Error>
zune_core::options::DecoderOptions::new_fast ().jpeg_set_out_colorspace (zune_core::colorspace::ColorSpace::YCbCr), zune_core::options::DecoderOptions::new_fast ().jpeg_set_out_colorspace (zune_core::colorspace::ColorSpace::YCbCr),
); );
decoder.decode_headers ().unwrap (); decoder.decode_headers ().unwrap ();
dbg! (decoder.get_output_colorspace ());
pixels.resize (decoder.output_buffer_size ().unwrap (), 0); pixels.resize (decoder.output_buffer_size ().unwrap (), 0);
decoder.decode_into (&mut pixels).unwrap (); decoder.decode_into (&mut pixels).unwrap ();
@ -100,7 +92,7 @@ fn main() -> Result <(), Error>
let mpf_ycbcr = (stop - start).as_millis () as f32 / 30.0f32; let mpf_ycbcr = (stop - start).as_millis () as f32 / 30.0f32;
dbg! (mpf_ycbcr); dbg! (mpf_ycbcr);
std::fs::write ("raw.data", &pixels); std::fs::write ("raw.data", &pixels).unwrap ();
let start = Instant::now (); let start = Instant::now ();
let mut pixels = vec! []; let mut pixels = vec! [];
@ -120,27 +112,50 @@ fn main() -> Result <(), Error>
let mpf_rgb = (stop - start).as_millis () as f32 / 30.0f32; let mpf_rgb = (stop - start).as_millis () as f32 / 30.0f32;
dbg! (mpf_rgb); dbg! (mpf_rgb);
}, },
Some ("egui") => main_egui (),
Some (_) => eprintln! ("Unknown subcommand"), Some (_) => eprintln! ("Unknown subcommand"),
} }
Ok (()) Ok (())
} }
struct MyEguiApp {
struct App {
img: egui::ColorImage, img: egui::ColorImage,
texture: Option <egui::TextureHandle>, texture: Option <egui::TextureHandle>,
capture: capture::Capture, gui_frames: u64,
gui_frames: usize, camera_frames: u64,
camera_frames: usize, gui_fps: u64,
camera_fps: u64,
latency: Duration,
send_to_ctl: mpsc::SyncSender <MsgToController>,
recv_at_gui: mpsc::Receiver <MsgToGui>,
driver_thread: thread::JoinHandle <()>,
requesting_frames: bool,
next_stat_refresh: Instant,
} }
impl MyEguiApp { impl App {
fn new(_cc: &eframe::CreationContext<'_>) -> Self { fn new (
// Customize egui here with cc.egui_ctx.set_fonts and cc.egui_ctx.set_visuals. cc: &eframe::CreationContext<'_>,
// Restore app state using cc.storage (requires the "persistence" feature). ) -> Self
// Use the cc.gl (a glow::Context) to create graphics shaders and buffers that you can use {
// for e.g. egui::PaintCallback. let (send_to_ctl, recv_at_ctl) = mpsc::sync_channel (8);
let (send_to_gui, recv_at_gui) = mpsc::sync_channel (8);
let gui_ctx = cc.egui_ctx.clone ();
let mut driver = Driver::new
(
send_to_ctl.clone (),
recv_at_ctl,
gui_ctx,
send_to_gui,
);
let driver_thread = thread::spawn (move || driver.run ());
let img = egui::ColorImage::new ([1280,720], egui::Color32::TEMPORARY_COLOR); let img = egui::ColorImage::new ([1280,720], egui::Color32::TEMPORARY_COLOR);
@ -148,14 +163,21 @@ impl MyEguiApp {
{ {
img, img,
texture: None, texture: None,
capture: capture::Capture::new ().unwrap (),
gui_frames: 0, gui_frames: 0,
camera_frames: 0, camera_frames: 0,
gui_fps: 0,
camera_fps: 0,
latency: Duration::from_millis(0),
send_to_ctl,
recv_at_gui,
driver_thread,
requesting_frames: true,
next_stat_refresh: Instant::now (),
} }
} }
} }
impl eframe::App for MyEguiApp { impl eframe::App for App {
fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) { fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
egui::CentralPanel::default().show(ctx, |ui| { egui::CentralPanel::default().show(ctx, |ui| {
let texture = self.texture.get_or_insert_with(|| let texture = self.texture.get_or_insert_with(||
@ -163,20 +185,37 @@ impl eframe::App for MyEguiApp {
ui.ctx ().load_texture ("my_image", self.img.clone (), Default::default()) ui.ctx ().load_texture ("my_image", self.img.clone (), Default::default())
}); });
match self.capture.will_block () if self.requesting_frames
{ {
Ok (false) => self.send_to_ctl.send (MsgToController::GuiNeedsRgbaFrame).unwrap ();
}
while let Ok (msg) = self.recv_at_gui.try_recv()
{
match msg
{ {
self.capture.wait_for_frame(self.img.as_raw_mut()).unwrap (); MsgToGui::NewRgbaFrame ((data, gen)) =>
texture.set (self.img.clone (), Default::default ()); {
self.camera_frames += 1; self.latency = Instant::now () - gen;
}, self.camera_frames += 1;
_ => (), texture.set (egui::ColorImage::from_rgba_premultiplied([1280,720], &data), Default::default ());
},
}
} }
let texture: &egui::TextureHandle = texture; let texture: &egui::TextureHandle = texture;
ui.heading (format! ("{0}, {1}", self.camera_frames, self.gui_frames)); let now = Instant::now ();
if now >= self.next_stat_refresh
{
self.gui_fps = self.gui_frames;
self.camera_fps = self.camera_frames;
self.gui_frames = 0;
self.camera_frames = 0;
self.next_stat_refresh += Duration::from_secs(1);
}
ui.heading (format! ("{0}, {1}, {2}", self.camera_fps, self.gui_fps, self.latency.as_millis ()));
let available = ui.available_size(); let available = ui.available_size();
let tex_size = egui::Vec2::new (texture.size()[0] as f32, texture.size()[1] as f32); let tex_size = egui::Vec2::new (texture.size()[0] as f32, texture.size()[1] as f32);
let scaled_width = available.y * tex_size.x / tex_size.y; let scaled_width = available.y * tex_size.x / tex_size.y;
@ -193,417 +232,176 @@ impl eframe::App for MyEguiApp {
ui.image (texture, size); ui.image (texture, size);
self.gui_frames += 1; self.gui_frames += 1;
ui.checkbox(&mut self.requesting_frames, "Run");
}); });
}
}
enum MsgToController
{
GuiNeedsRgbaFrame,
GotCapture ((capture::Capture, JpegFrame)),
DecodedJpegToRgba ((Vec <u8>, Instant)),
}
enum MsgToGui
{
NewRgbaFrame ((Vec <u8>, Instant)),
}
enum MsgFromController
{
Idle,
}
#[derive (Clone)]
struct JpegFrame
{
data: Vec <u8>,
time: Instant,
}
struct Driver
{
send: mpsc::SyncSender <MsgToController>,
recv: mpsc::Receiver <MsgToController>,
gui_ctx: egui::Context,
send_to_gui: mpsc::SyncSender <MsgToGui>,
gui_rgba_gen: Instant,
gui_needs_frame: bool,
capture: task::Task <capture::Capture, ()>,
jpeg_decoder: task::Task <(), Instant>,
jpeg_needs_frame: bool,
rgba_frame: (Vec <u8>, Instant),
jpeg_frame: JpegFrame,
}
impl Driver
{
fn new (
send: mpsc::SyncSender <MsgToController>,
recv: mpsc::Receiver <MsgToController>,
gui_ctx: egui::Context,
send_to_gui: mpsc::SyncSender <MsgToGui>,
) -> Self
{
let now = Instant::now ();
ctx.request_repaint(); Self {
send,
recv,
gui_ctx,
send_to_gui,
gui_needs_frame: false,
gui_rgba_gen: now,
capture: capture::Capture::new ().unwrap ().into (),
jpeg_decoder: ().into (),
jpeg_needs_frame: false,
rgba_frame: (Default::default (), now),
jpeg_frame: JpegFrame
{
data: Default::default (),
time: now,
},
}
}
fn run (&mut self)
{
let pool = rayon::ThreadPoolBuilder::new().build().unwrap ();
loop
{
match self.recv.recv().unwrap ()
{
MsgToController::DecodedJpegToRgba ((data, gen)) =>
{
self.jpeg_decoder.stop (()).unwrap ();
self.rgba_frame = (data, gen);
},
MsgToController::GotCapture ((capture, jpeg)) =>
{
self.capture.stop (capture).unwrap ();
self.jpeg_frame = jpeg;
},
MsgToController::GuiNeedsRgbaFrame =>
{
self.gui_needs_frame = true;
},
}
if self.gui_needs_frame && self.rgba_frame.1 > self.gui_rgba_gen
{
self.send_to_gui.send (MsgToGui::NewRgbaFrame (self.rgba_frame.clone ())).unwrap ();
self.gui_needs_frame = false;
self.gui_rgba_gen = self.rgba_frame.1;
self.gui_ctx.request_repaint();
}
if
self.gui_needs_frame &&
! self.jpeg_decoder.is_running () &&
self.jpeg_frame.time > self.rgba_frame.1
{
let send = self.send.clone ();
let jpeg_frame = self.jpeg_frame.clone ();
self.jpeg_decoder.start(jpeg_frame.time).unwrap ();
self.jpeg_needs_frame = false;
pool.spawn (move ||
{
let mut decoder = zune_jpeg::JpegDecoder::new_with_options (&jpeg_frame.data, zune_core::options::DecoderOptions::new_fast().jpeg_set_out_colorspace(zune_core::colorspace::ColorSpace::RGBA));
decoder.decode_headers().unwrap ();
let mut rgba = vec![0u8;decoder.output_buffer_size().unwrap ()];
decoder.decode_into(&mut rgba).unwrap ();
send.send (MsgToController::DecodedJpegToRgba ((rgba, jpeg_frame.time))).unwrap ();
});
}
if self.gui_needs_frame && self.jpeg_frame.time == self.rgba_frame.1
{
self.jpeg_needs_frame = true;
}
if self.jpeg_needs_frame && ! self.capture.is_running()
{
let mut capture = self.capture.start (()).unwrap ();
let send = self.send.clone ();
pool.spawn (move ||
{
let mut data = vec! [0u8; capture.size_image()];
capture.wait_for_frame(&mut data).unwrap ();
let frame = JpegFrame
{
data,
time: Instant::now (),
};
send.send (MsgToController::GotCapture ((capture, frame))).unwrap ();
});
}
}
} }
} }
fn main_egui () fn main_egui ()
{ {
let native_options = eframe::NativeOptions::default(); let native_options = eframe::NativeOptions::default();
eframe::run_native("My egui App", native_options, Box::new(|cc| Box::new(MyEguiApp::new(cc)))).unwrap ();
}
struct ThreadSystem <'a>
{
pool: &'a ThreadPool,
send: mpsc::SyncSender <TaskOutput>,
}
impl <'a> ThreadSystem <'a>
{
fn dispatch <F: FnOnce () -> TaskOutput + Send + 'static> (&self, work: F)
{
let send = self.send.clone ();
self.pool.spawn (move ||
{
let output = work ();
send.send (output).ok ();
});
}
}
struct Driver <'a>
{
thread_sys: ThreadSystem <'a>,
recv: mpsc::Receiver <TaskOutput>,
ctl: Controller, eframe::run_native("Five Five Five", native_options, Box::new(|cc| Box::new(App::new(cc)))).unwrap ();
capture: Task <capture::Capture, ()>,
encoder: Task <Encoder, EncoderTaskMetadata>,
transmitter: Task <Transmitter, ()>,
}
impl <'a> Driver <'_>
{
fn new (pool: &'a ThreadPool) -> Result <Driver <'a>, Error>
{
let (send, recv) = mpsc::sync_channel (8);
let thread_sys = ThreadSystem
{
pool,
send,
};
Ok (Driver
{
thread_sys,
recv,
ctl: Default::default (),
capture: capture::Capture::new ()?.into (),
encoder: Encoder::default ().into (),
transmitter: Transmitter::default ().into (),
})
}
fn main (&mut self) -> Result <(), TaskError>
{
for _ in 0..50
{
let ev = self.ctl.poll ();
if let Some (event) = ev.tx
{
dbg! (&event);
self.handle_tx_event (event)?;
match self.recv.try_recv ()
{
Ok (output) => self.handle_task_output (output)?,
Err (_) => (),
}
}
else
{
match self.recv.recv ()
{
Ok (output) => self.handle_task_output (output)?,
Err (_) => (),
}
}
}
Ok (())
}
fn handle_task_output (&mut self, output: TaskOutput) -> Result <(), TaskError>
{
dbg! (&output);
match output
{
TaskOutput::TxCapture (work) => work.finish (self)?,
TaskOutput::TxEncode (work) => work.finish (self)?,
TaskOutput::TxTransmit (work) => work.finish (self)?,
}
Ok (())
}
fn handle_tx_event (&mut self, event: TxPipelineEvent) -> Result <(), TaskError>
{
match event
{
TxPipelineEvent::Capture => CaptureWork::dispatch (self, ())?,
TxPipelineEvent::Encode (buf_raw) => EncodeWork::dispatch (self, buf_raw)?,
TxPipelineEvent::Transmit (buf_enc) => TransmitWork::dispatch (self, buf_enc)?,
TxPipelineEvent::PollEncoder =>
{
let encoder = self.encoder.try_inner_mut ()?;
let mut buf = vec! [];
let may_have_data = encoder.poll_encoded (&mut buf);
if may_have_data
{
self.ctl.handle_encoded_packet (Some (BufEncoded {buf})).unwrap ();
}
else
{
self.ctl.handle_encoded_packet (None).unwrap ();
}
},
}
Ok (())
}
}
/// This is probably just a fancy Cell or something.
enum Task <S, R>
{
Stopped (S),
Running (R),
}
#[derive (Debug, thiserror::Error)]
enum TaskError
{
#[error ("tried to start already-running task")]
AlreadyRunning,
#[error ("tried to stop already-stopped task")]
AlreadyStopped,
#[error ("cannot access a task's inner object while it's running")]
CantAccessWhileRunning,
}
impl <S, R> From <S> for Task <S, R>
{
fn from (x: S) -> Self
{
Self::Stopped (x)
}
}
impl <S, R> Task <S, R>
{
fn try_inner (&self) -> Result <&S, TaskError>
{
match self
{
Self::Running (_) => Err (TaskError::CantAccessWhileRunning),
Self::Stopped (x) => Ok (x),
}
}
fn try_inner_mut (&mut self) -> Result <&mut S, TaskError>
{
match self
{
Self::Running (_) => Err (TaskError::CantAccessWhileRunning),
Self::Stopped (x) => Ok (x),
}
}
fn start (&mut self, x: R) -> Result <S, TaskError>
{
match std::mem::replace (self, Self::Running (x))
{
Self::Running (_) => Err (TaskError::AlreadyRunning),
Self::Stopped (x) => Ok (x),
}
}
fn stop (&mut self, x: S) -> Result <R, TaskError>
{
match std::mem::replace (self, Self::Stopped (x))
{
Self::Stopped (_) => Err (TaskError::AlreadyStopped),
Self::Running (x) => Ok (x),
}
}
}
struct EncoderTaskMetadata
{
}
enum TaskOutput
{
TxCapture (CaptureWork),
TxEncode (EncodeWork),
TxTransmit (TransmitWork),
}
impl fmt::Debug for TaskOutput {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self
{
Self::TxCapture (_) => f.debug_tuple ("TaskOutput::TxCapture").finish (),
Self::TxEncode (_) => f.debug_tuple ("TaskOutput::TxEncode").finish (),
Self::TxTransmit (_) => f.debug_tuple ("TaskOutput::TxTransmit").finish (),
}
}
}
/// Wraps non-thread-safe encoder state like from ffmpeg
#[derive (Default)]
struct Encoder
{
/// AVCodecContext
ctx: (),
/// Encoded AVPacket
pkt_enc: Option <()>,
/// Bogus debugging thing
internal_buffer: usize,
}
impl Encoder
{
/// If any encoded bytes are ready, and the caller's buffer
/// is big enough, copy encoded bytes into the caller's buffer,
/// consuming them from internal buffers.
///
/// Won't block
fn poll_encoded (&mut self, _buf_enc: &mut [u8]) -> bool
{
if let Some (_pkt) = self.pkt_enc.take ()
{
// Do copy
return true;
}
if self.internal_buffer >= 2
{
self.internal_buffer -= 2;
// Do copy
return true;
}
false
}
/// Insert raw data. If the internal buffers are full, the new
/// data is rejected.
/// Call poll_encoded before calling this, to drain internal
/// buffers if needed.
///
/// Blocks if the internals perform encoding
fn try_handle_insert_data (&mut self, _buf_raw: &[u8]) -> Result <(), ControlError>
{
if self.pkt_enc.is_some ()
{
return Err (ControlError::AlreadyHaveRawData);
}
if self.internal_buffer >= 3
{
return Err (ControlError::AlreadyHaveRawData);
}
self.internal_buffer += 3;
Ok (())
}
}
/// Wraps a network transmitter like a TCP send stream or a QUIC
/// outgoing uni stream
#[derive (Default)]
struct Transmitter
{
}
impl Transmitter
{
/// Tries to transmit data over the network.
/// If Transmitter sees congestion, it may reject the transmission
/// until a later time.
fn try_send (&mut self, _now: Instant, _buf: &[u8]) -> bool
{
false
}
}
struct CaptureWork
{
cap: capture::Capture,
buf_raw: BufRaw,
}
impl CaptureWork
{
fn dispatch (driver: &mut Driver, _input: ()) -> Result <(), TaskError>
{
let metadata = ();
let cap = driver.capture.start (metadata)?;
driver.thread_sys.dispatch (move ||
{
let dur_capture = Duration::from_millis (1000);
thread::sleep (dur_capture);
let buf_raw = BufRaw
{
buf: vec![],
};
TaskOutput::TxCapture (Self {cap, buf_raw})
});
Ok (())
}
fn finish (self, driver: &mut Driver) -> Result <(), TaskError>
{
driver.capture.stop (self.cap)?;
driver.ctl.handle_capture_frame (self.buf_raw).unwrap ();
Ok (())
}
}
struct EncodeWork
{
encoder: Encoder,
}
impl EncodeWork
{
fn dispatch (driver: &mut Driver, buf_raw: BufRaw) -> Result <(), TaskError>
{
let mut encoder = driver.encoder.start (EncoderTaskMetadata {})?;
driver.thread_sys.dispatch (move ||
{
let dur_encode = Duration::from_millis (20);
thread::sleep (dur_encode);
encoder.try_handle_insert_data (&buf_raw.buf).unwrap ();
TaskOutput::TxEncode (Self {encoder})
});
Ok (())
}
fn finish (self, driver: &mut Driver) -> Result <(), TaskError>
{
let _metadata = driver.encoder.stop (self.encoder)?;
driver.ctl.handle_encoder_finished ();
Ok (())
}
}
struct TransmitWork
{
tx: Transmitter,
busy: bool,
}
impl TransmitWork
{
fn dispatch (driver: &mut Driver, buf_enc: BufEncoded) -> Result <(), TaskError>
{
let metadata = ();
let mut tx = driver.transmitter.start (metadata)?;
driver.thread_sys.dispatch (move ||
{
let dur_transmit = Duration::from_millis (200);
thread::sleep (dur_transmit);
let busy = tx.try_send (Instant::now (), &buf_enc.buf);
TaskOutput::TxTransmit (Self
{
tx,
busy,
})
});
Ok (())
}
fn finish (self, driver: &mut Driver) -> Result <(), TaskError>
{
driver.transmitter.stop (self.tx)?;
driver.ctl.handle_transmitted (self.busy);
Ok (())
}
} }
#[derive (Debug, thiserror::Error)] #[derive (Debug, thiserror::Error)]
@ -612,5 +410,5 @@ enum Error
#[error ("capture")] #[error ("capture")]
Capture (#[from] capture::Error), Capture (#[from] capture::Error),
#[error ("task")] #[error ("task")]
Task (#[from] TaskError), Task (#[from] task::Error),
} }

74
src/task.rs Normal file
View File

@ -0,0 +1,74 @@
/// This is probably just a fancy Cell or something.
pub enum Task <S, R>
{
Stopped (S),
Running (R),
}
#[derive (Debug, thiserror::Error)]
pub enum Error
{
#[error ("tried to start already-running task")]
AlreadyRunning,
#[error ("tried to stop already-stopped task")]
AlreadyStopped,
#[error ("cannot access a task's inner object while it's running")]
CantAccessWhileRunning,
}
impl <S, R> From <S> for Task <S, R>
{
fn from (x: S) -> Self
{
Self::Stopped (x)
}
}
impl <S, R> Task <S, R>
{
pub fn is_running (&self) -> bool
{
match self
{
Self::Running (_) => true,
Self::Stopped (_) => false,
}
}
pub fn try_inner (&self) -> Result <&S, Error>
{
match self
{
Self::Running (_) => Err (Error::CantAccessWhileRunning),
Self::Stopped (x) => Ok (x),
}
}
pub fn try_inner_mut (&mut self) -> Result <&mut S, Error>
{
match self
{
Self::Running (_) => Err (Error::CantAccessWhileRunning),
Self::Stopped (x) => Ok (x),
}
}
pub fn start (&mut self, x: R) -> Result <S, Error>
{
match std::mem::replace (self, Self::Running (x))
{
Self::Running (_) => Err (Error::AlreadyRunning),
Self::Stopped (x) => Ok (x),
}
}
pub fn stop (&mut self, x: S) -> Result <R, Error>
{
match std::mem::replace (self, Self::Stopped (x))
{
Self::Stopped (_) => Err (Error::AlreadyStopped),
Self::Running (x) => Ok (x),
}
}
}