use std::{ fmt, sync::mpsc, thread, time::Duration, }; use rayon::ThreadPool; use crate::controller::*; mod controller; fn main() { let pool = rayon::ThreadPoolBuilder::new ().build ().unwrap (); let mut driver = Driver::new (&pool); driver.main (); } struct Driver <'a> { pool: &'a ThreadPool, send: mpsc::SyncSender , recv: mpsc::Receiver , ctl: Controller, capture: Option , encoder: EncoderHandle, transmitter: Option , } impl <'a> Driver <'_> { fn new (pool: &'a ThreadPool) -> Driver <'a> { let (send, recv) = mpsc::sync_channel (8); Driver { pool, send, recv, ctl: Default::default (), capture: Some (Default::default ()), encoder: EncoderHandle::Stopped (Default::default ()), transmitter: Some (Default::default ()), } } fn main (&mut self) { for _ in 0..10 { let ev = self.ctl.poll (); if let Some (event) = ev.tx { dbg! (&event); self.handle_tx_event (event); match self.recv.try_recv () { Ok (output) => self.handle_task_output (output), Err (_) => (), } } else { match self.recv.recv () { Ok (output) => self.handle_task_output (output), Err (_) => (), } } } } fn handle_task_output (&mut self, output: TaskOutput) { dbg! (&output); match output { TaskOutput::TxCapture ((cap, buf_raw)) => { let _old = match std::mem::replace (&mut self.capture, Some (cap)) { None => (), _ => panic! ("tried to finish an already finished capture"), }; self.ctl.handle_capture_frame (buf_raw).unwrap (); }, TaskOutput::TxEncode (enc) => { let _metadata = match std::mem::replace (&mut self.encoder, EncoderHandle::Stopped (enc)) { EncoderHandle::Running (_) => (), _ => panic! ("tried to finish an already finished encode"), }; self.ctl.handle_encoder_finished (); }, TaskOutput::TxTransmit ((tx, busy)) => { let _old = match std::mem::replace (&mut self.transmitter, Some (tx)) { None => (), _ => panic! ("tried to finish an already finished transmit"), }; self.ctl.handle_network_busy (busy); } } } fn handle_tx_event (&mut self, event: TxPipelineEvent) { let dur_capture = Duration::from_millis (30); let dur_encode = Duration::from_millis (20); let dur_transmit = Duration::from_millis (200); let send = self.send.clone (); match event { TxPipelineEvent::Capture => { let cap = match std::mem::replace (&mut self.capture, None) { Some (x) => x, _ => { dbg! ("capture is already running"); return; }, }; self.pool.spawn (move || { thread::sleep (dur_capture); let buf_raw = BufRaw { buf: vec![], }; send.send (TaskOutput::TxCapture ((cap, buf_raw))).unwrap (); }); }, TxPipelineEvent::PollEncoder => { let encoder = match &mut self.encoder { EncoderHandle::Running (_) => panic! ("tried to poll encoder while it was running"), EncoderHandle::Stopped (x) => x, }; let mut buf = vec! []; let may_have_data = encoder.poll_encoded (&mut buf); if may_have_data { self.ctl.handle_encoded_packet (Some (BufEncoded {buf})).unwrap (); } else { self.ctl.handle_encoded_packet (None).unwrap (); } } TxPipelineEvent::Encode (buf_raw) => { let metadata = EncoderTaskMetadata {}; let mut encoder = match std::mem::replace (&mut self.encoder, EncoderHandle::Running (metadata)) { EncoderHandle::Stopped (x) => x, _ => panic! ("tried to run the same encoder twice"), }; self.pool.spawn (move || { thread::sleep (dur_encode); encoder.try_handle_insert_data (&buf_raw.buf).unwrap (); send.send (TaskOutput::TxEncode (encoder)).unwrap (); }); }, TxPipelineEvent::Transmit (_buf_enc) => { let tx = match std::mem::replace (&mut self.transmitter, None) { Some (x) => x, _ => panic! ("tried to run the same transmitter twice"), }; self.pool.spawn (move || { thread::sleep (dur_transmit); send.send (TaskOutput::TxTransmit ((tx, false))).unwrap (); }); }, } } } enum EncoderHandle { Stopped (Encoder), Running (EncoderTaskMetadata), } struct EncoderTaskMetadata { } enum TaskOutput { TxCapture ((Capture, BufRaw)), TxEncode (Encoder), TxTransmit ((Transmitter, bool)), } impl fmt::Debug for TaskOutput { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::TxCapture (_) => f.debug_tuple ("TaskOutput::TxCapture").finish (), Self::TxEncode (_) => f.debug_tuple ("TaskOutput::TxEncode").finish (), Self::TxTransmit (_) => f.debug_tuple ("TaskOutput::TxTransmit").finish (), } } } /// Wraps non-thread-safe capture device like a v4l webcam #[derive (Default)] struct Capture { } impl Capture { /// Blocks until the capture device gets us a frame fn wait_for_frame (&mut self, _buf: &mut [u8]) -> bool { false } } /// Wraps non-thread-safe encoder state like from ffmpeg #[derive (Default)] struct Encoder { /// AVCodecContext ctx: (), /// Encoded AVPacket pkt_enc: Option <()>, /// Bogus debugging thing internal_buffer: usize, } impl Encoder { /// If any encoded bytes are ready, and the caller's buffer /// is big enough, copy encoded bytes into the caller's buffer, /// consuming them from internal buffers. /// /// Won't block fn poll_encoded (&mut self, _buf_enc: &mut [u8]) -> bool { if let Some (_pkt) = self.pkt_enc.take () { // Do copy return true; } if self.internal_buffer >= 2 { self.internal_buffer -= 2; // Do copy return true; } false } /// Insert raw data. If the internal buffers are full, the new /// data is rejected. /// Call poll_encoded before calling this, to drain internal /// buffers if needed. /// /// Blocks if the internals perform encoding fn try_handle_insert_data (&mut self, _buf_raw: &[u8]) -> Result <(), ControlError> { if self.pkt_enc.is_some () { return Err (ControlError::AlreadyHaveRawData); } if self.internal_buffer >= 3 { return Err (ControlError::AlreadyHaveRawData); } self.internal_buffer += 3; Ok (()) } } /// Wraps a network transmitter like a TCP send stream or a QUIC /// outgoing uni stream #[derive (Default)] struct Transmitter { } impl Transmitter { /// Tries to transmit data over the network. /// If Transmitter sees congestion, it may reject the transmission /// until a later time. fn try_send (&mut self, _now: std::time::Instant, _buf: &[u8]) -> bool { false } }