use std::{ fmt, marker::Send, sync::mpsc, thread, time::{ Duration, Instant, }, }; use rayon::ThreadPool; use crate::controller::*; mod capture; mod controller; fn main() -> Result <(), Error> { let mut args = std::env::args (); let _exe_name = args.next (); match args.next ().as_deref () { None => { let pool = rayon::ThreadPoolBuilder::new ().build ().unwrap (); let mut driver = Driver::new (&pool)?; driver.main ()?; }, Some ("capture") => { use std::io::Write; let mut cap = capture::Capture::new ()?; let mut buf = vec! [0u8; cap.size_image ()]; let mut bytesused = 0; for _ in 0..5 { let rc = cap.wait_for_frame (&mut buf); bytesused = rc.unwrap (); dbg! (bytesused); } let mut f = std::fs::File::create ("data.jpeg").unwrap (); f.write_all (&buf [0..bytesused]).unwrap (); }, Some (_) => eprintln! ("Unknown subcommand"), } Ok (()) } struct ThreadSystem <'a> { pool: &'a ThreadPool, send: mpsc::SyncSender , } impl <'a> ThreadSystem <'a> { fn dispatch TaskOutput + Send + 'static> (&self, work: F) { let send = self.send.clone (); self.pool.spawn (move || { let output = work (); send.send (output).ok (); }); } } struct Driver <'a> { thread_sys: ThreadSystem <'a>, recv: mpsc::Receiver , ctl: Controller, capture: Task , encoder: Task , transmitter: Task , } impl <'a> Driver <'_> { fn new (pool: &'a ThreadPool) -> Result , Error> { let (send, recv) = mpsc::sync_channel (8); let thread_sys = ThreadSystem { pool, send, }; Ok (Driver { thread_sys, recv, ctl: Default::default (), capture: capture::Capture::new ()?.into (), encoder: Encoder::default ().into (), transmitter: Transmitter::default ().into (), }) } fn main (&mut self) -> Result <(), TaskError> { for _ in 0..50 { let ev = self.ctl.poll (); if let Some (event) = ev.tx { dbg! (&event); self.handle_tx_event (event)?; match self.recv.try_recv () { Ok (output) => self.handle_task_output (output)?, Err (_) => (), } } else { match self.recv.recv () { Ok (output) => self.handle_task_output (output)?, Err (_) => (), } } } Ok (()) } fn handle_task_output (&mut self, output: TaskOutput) -> Result <(), TaskError> { dbg! (&output); match output { TaskOutput::TxCapture (work) => work.finish (self)?, TaskOutput::TxEncode (work) => work.finish (self)?, TaskOutput::TxTransmit (work) => work.finish (self)?, } Ok (()) } fn handle_tx_event (&mut self, event: TxPipelineEvent) -> Result <(), TaskError> { let thread_sys = &self.thread_sys; let pool = &self.thread_sys.pool; let send = self.thread_sys.send.clone (); match event { TxPipelineEvent::Capture => CaptureWork::dispatch (self, ())?, TxPipelineEvent::Encode (buf_raw) => EncodeWork::dispatch (self, buf_raw)?, TxPipelineEvent::Transmit (buf_enc) => TransmitWork::dispatch (self, buf_enc)?, TxPipelineEvent::PollEncoder => { let encoder = self.encoder.try_inner_mut ()?; let mut buf = vec! []; let may_have_data = encoder.poll_encoded (&mut buf); if may_have_data { self.ctl.handle_encoded_packet (Some (BufEncoded {buf})).unwrap (); } else { self.ctl.handle_encoded_packet (None).unwrap (); } }, } Ok (()) } } /// This is probably just a fancy Cell or something. enum Task { Stopped (S), Running (R), } #[derive (Debug, thiserror::Error)] enum TaskError { #[error ("tried to start already-running task")] AlreadyRunning, #[error ("tried to stop already-stopped task")] AlreadyStopped, #[error ("cannot access a task's inner object while it's running")] CantAccessWhileRunning, } impl From for Task { fn from (x: S) -> Self { Self::Stopped (x) } } impl Task { fn try_inner (&self) -> Result <&S, TaskError> { match self { Self::Running (_) => Err (TaskError::CantAccessWhileRunning), Self::Stopped (x) => Ok (x), } } fn try_inner_mut (&mut self) -> Result <&mut S, TaskError> { match self { Self::Running (_) => Err (TaskError::CantAccessWhileRunning), Self::Stopped (x) => Ok (x), } } fn start (&mut self, x: R) -> Result { match std::mem::replace (self, Self::Running (x)) { Self::Running (_) => Err (TaskError::AlreadyRunning), Self::Stopped (x) => Ok (x), } } fn stop (&mut self, x: S) -> Result { match std::mem::replace (self, Self::Stopped (x)) { Self::Stopped (_) => Err (TaskError::AlreadyStopped), Self::Running (x) => Ok (x), } } } struct EncoderTaskMetadata { } enum TaskOutput { TxCapture (CaptureWork), TxEncode (EncodeWork), TxTransmit (TransmitWork), } impl fmt::Debug for TaskOutput { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::TxCapture (_) => f.debug_tuple ("TaskOutput::TxCapture").finish (), Self::TxEncode (_) => f.debug_tuple ("TaskOutput::TxEncode").finish (), Self::TxTransmit (_) => f.debug_tuple ("TaskOutput::TxTransmit").finish (), } } } /// Wraps non-thread-safe encoder state like from ffmpeg #[derive (Default)] struct Encoder { /// AVCodecContext ctx: (), /// Encoded AVPacket pkt_enc: Option <()>, /// Bogus debugging thing internal_buffer: usize, } impl Encoder { /// If any encoded bytes are ready, and the caller's buffer /// is big enough, copy encoded bytes into the caller's buffer, /// consuming them from internal buffers. /// /// Won't block fn poll_encoded (&mut self, _buf_enc: &mut [u8]) -> bool { if let Some (_pkt) = self.pkt_enc.take () { // Do copy return true; } if self.internal_buffer >= 2 { self.internal_buffer -= 2; // Do copy return true; } false } /// Insert raw data. If the internal buffers are full, the new /// data is rejected. /// Call poll_encoded before calling this, to drain internal /// buffers if needed. /// /// Blocks if the internals perform encoding fn try_handle_insert_data (&mut self, _buf_raw: &[u8]) -> Result <(), ControlError> { if self.pkt_enc.is_some () { return Err (ControlError::AlreadyHaveRawData); } if self.internal_buffer >= 3 { return Err (ControlError::AlreadyHaveRawData); } self.internal_buffer += 3; Ok (()) } } /// Wraps a network transmitter like a TCP send stream or a QUIC /// outgoing uni stream #[derive (Default)] struct Transmitter { } impl Transmitter { /// Tries to transmit data over the network. /// If Transmitter sees congestion, it may reject the transmission /// until a later time. fn try_send (&mut self, _now: Instant, _buf: &[u8]) -> bool { false } } struct CaptureWork { cap: capture::Capture, buf_raw: BufRaw, } impl CaptureWork { fn dispatch (driver: &mut Driver, _input: ()) -> Result <(), TaskError> { let metadata = (); let mut cap = driver.capture.start (metadata)?; driver.thread_sys.dispatch (move || { let dur_capture = Duration::from_millis (1000); thread::sleep (dur_capture); let buf_raw = BufRaw { buf: vec![], }; TaskOutput::TxCapture (Self {cap, buf_raw}) }); Ok (()) } fn finish (self, driver: &mut Driver) -> Result <(), TaskError> { driver.capture.stop (self.cap)?; driver.ctl.handle_capture_frame (self.buf_raw).unwrap (); Ok (()) } } struct EncodeWork { encoder: Encoder, } impl EncodeWork { fn dispatch (driver: &mut Driver, buf_raw: BufRaw) -> Result <(), TaskError> { let mut encoder = driver.encoder.start (EncoderTaskMetadata {})?; driver.thread_sys.dispatch (move || { let dur_encode = Duration::from_millis (20); thread::sleep (dur_encode); encoder.try_handle_insert_data (&buf_raw.buf).unwrap (); TaskOutput::TxEncode (Self {encoder}) }); Ok (()) } fn finish (self, driver: &mut Driver) -> Result <(), TaskError> { let _metadata = driver.encoder.stop (self.encoder)?; driver.ctl.handle_encoder_finished (); Ok (()) } } struct TransmitWork { tx: Transmitter, busy: bool, } impl TransmitWork { fn dispatch (driver: &mut Driver, buf_enc: BufEncoded) -> Result <(), TaskError> { let metadata = (); let mut tx = driver.transmitter.start (metadata)?; driver.thread_sys.dispatch (move || { let dur_transmit = Duration::from_millis (200); thread::sleep (dur_transmit); let busy = tx.try_send (Instant::now (), &buf_enc.buf); TaskOutput::TxTransmit (Self { tx, busy, }) }); Ok (()) } fn finish (self, driver: &mut Driver) -> Result <(), TaskError> { driver.transmitter.stop (self.tx)?; driver.ctl.handle_transmitted (self.busy); Ok (()) } } #[derive (Debug, thiserror::Error)] enum Error { #[error ("capture")] Capture (#[from] capture::Error), #[error ("task")] Task (#[from] TaskError), }