👕 refactor

main
_ 2023-09-09 11:05:19 -05:00
parent d93b3036bb
commit 28daa28d90
1 changed files with 110 additions and 54 deletions

View File

@ -3,7 +3,10 @@ use std::{
marker::Send, marker::Send,
sync::mpsc, sync::mpsc,
thread, thread,
time::Duration, time::{
Duration,
Instant,
},
}; };
use rayon::ThreadPool; use rayon::ThreadPool;
@ -112,21 +115,9 @@ impl <'a> Driver <'_>
dbg! (&output); dbg! (&output);
match output match output
{ {
TaskOutput::TxCapture ((cap, buf_raw)) => TaskOutput::TxCapture (work) => work.finish (self)?,
{ TaskOutput::TxEncode (work) => work.finish (self)?,
self.capture.stop (cap)?; TaskOutput::TxTransmit (work) => work.finish (self)?,
self.ctl.handle_capture_frame (buf_raw).unwrap ();
},
TaskOutput::TxEncode (enc) =>
{
let _metadata = self.encoder.stop (enc)?;
self.ctl.handle_encoder_finished ();
},
TaskOutput::TxTransmit ((tx, busy)) =>
{
self.transmitter.stop (tx)?;
self.ctl.handle_network_busy (busy);
}
} }
Ok (()) Ok (())
@ -134,28 +125,13 @@ impl <'a> Driver <'_>
fn handle_tx_event (&mut self, event: TxPipelineEvent) -> Result <(), TaskError> fn handle_tx_event (&mut self, event: TxPipelineEvent) -> Result <(), TaskError>
{ {
let dur_capture = Duration::from_millis (30);
let dur_encode = Duration::from_millis (20);
let dur_transmit = Duration::from_millis (200);
let thread_sys = &self.thread_sys; let thread_sys = &self.thread_sys;
let pool = &self.thread_sys.pool; let pool = &self.thread_sys.pool;
let send = self.thread_sys.send.clone (); let send = self.thread_sys.send.clone ();
match event match event
{ {
TxPipelineEvent::Capture => { TxPipelineEvent::Capture => CaptureWork::dispatch (self, ())?,
let cap = self.capture.start (())?;
thread_sys.dispatch (move ||
{
thread::sleep (dur_capture);
let buf_raw = BufRaw
{
buf: vec![],
};
TaskOutput::TxCapture ((cap, buf_raw))
});
},
TxPipelineEvent::PollEncoder => TxPipelineEvent::PollEncoder =>
{ {
let encoder = self.encoder.try_inner_mut ()?; let encoder = self.encoder.try_inner_mut ()?;
@ -172,24 +148,8 @@ impl <'a> Driver <'_>
self.ctl.handle_encoded_packet (None).unwrap (); self.ctl.handle_encoded_packet (None).unwrap ();
} }
} }
TxPipelineEvent::Encode (buf_raw) => TxPipelineEvent::Encode (buf_raw) => EncodeWork::dispatch (self, buf_raw)?,
{ TxPipelineEvent::Transmit (buf_enc) => TransmitWork::dispatch (self, buf_enc)?,
let mut encoder = self.encoder.start (EncoderTaskMetadata {})?;
thread_sys.dispatch (move ||
{
thread::sleep (dur_encode);
encoder.try_handle_insert_data (&buf_raw.buf).unwrap ();
TaskOutput::TxEncode (encoder)
});
},
TxPipelineEvent::Transmit (_buf_enc) => {
let tx = self.transmitter.start (())?;
thread_sys.dispatch (move ||
{
thread::sleep (dur_transmit);
TaskOutput::TxTransmit ((tx, false))
});
},
} }
Ok (()) Ok (())
@ -269,9 +229,9 @@ struct EncoderTaskMetadata
enum TaskOutput enum TaskOutput
{ {
TxCapture ((Capture, BufRaw)), TxCapture (CaptureWork),
TxEncode (Encoder), TxEncode (EncodeWork),
TxTransmit ((Transmitter, bool)), TxTransmit (TransmitWork),
} }
impl fmt::Debug for TaskOutput { impl fmt::Debug for TaskOutput {
@ -384,8 +344,104 @@ impl Transmitter
/// If Transmitter sees congestion, it may reject the transmission /// If Transmitter sees congestion, it may reject the transmission
/// until a later time. /// until a later time.
fn try_send (&mut self, _now: std::time::Instant, _buf: &[u8]) -> bool fn try_send (&mut self, _now: Instant, _buf: &[u8]) -> bool
{ {
false false
} }
} }
struct CaptureWork
{
cap: Capture,
buf_raw: BufRaw,
}
impl CaptureWork
{
fn dispatch (driver: &mut Driver, _input: ()) -> Result <(), TaskError>
{
let metadata = ();
let mut cap = driver.capture.start (metadata)?;
driver.thread_sys.dispatch (move ||
{
let dur_capture = Duration::from_millis (30);
thread::sleep (dur_capture);
let buf_raw = BufRaw
{
buf: vec![],
};
TaskOutput::TxCapture (Self {cap, buf_raw})
});
Ok (())
}
fn finish (self, driver: &mut Driver) -> Result <(), TaskError>
{
driver.capture.stop (self.cap)?;
driver.ctl.handle_capture_frame (self.buf_raw).unwrap ();
Ok (())
}
}
struct EncodeWork
{
encoder: Encoder,
}
impl EncodeWork
{
fn dispatch (driver: &mut Driver, buf_raw: BufRaw) -> Result <(), TaskError>
{
let mut encoder = driver.encoder.start (EncoderTaskMetadata {})?;
driver.thread_sys.dispatch (move ||
{
let dur_encode = Duration::from_millis (20);
thread::sleep (dur_encode);
encoder.try_handle_insert_data (&buf_raw.buf).unwrap ();
TaskOutput::TxEncode (Self {encoder})
});
Ok (())
}
fn finish (self, driver: &mut Driver) -> Result <(), TaskError>
{
let _metadata = driver.encoder.stop (self.encoder)?;
driver.ctl.handle_encoder_finished ();
Ok (())
}
}
struct TransmitWork
{
tx: Transmitter,
busy: bool,
}
impl TransmitWork
{
fn dispatch (driver: &mut Driver, buf_enc: BufEncoded) -> Result <(), TaskError>
{
let metadata = ();
let mut tx = driver.transmitter.start (metadata)?;
driver.thread_sys.dispatch (move ||
{
let dur_transmit = Duration::from_millis (200);
thread::sleep (dur_transmit);
let busy = tx.try_send (Instant::now (), &buf_enc.buf);
TaskOutput::TxTransmit (Self
{
tx,
busy,
})
});
Ok (())
}
fn finish (self, driver: &mut Driver) -> Result <(), TaskError>
{
driver.transmitter.stop (self.tx)?;
driver.ctl.handle_network_busy (self.busy);
Ok (())
}
}