👕 refactor

main
_ 2023-09-09 04:24:50 -05:00
parent fdec833fd0
commit 1acd125fdd
1 changed files with 92 additions and 60 deletions

View File

@ -6,16 +6,18 @@ use std::{
};
use rayon::ThreadPool;
use thiserror::Error;
use crate::controller::*;
mod controller;
fn main()
fn main() -> Result <(), TaskError>
{
let pool = rayon::ThreadPoolBuilder::new ().build ().unwrap ();
let mut driver = Driver::new (&pool);
driver.main ();
driver.main ()?;
Ok (())
}
struct Driver <'a>
@ -26,9 +28,9 @@ struct Driver <'a>
ctl: Controller,
capture: Option <Capture>,
encoder: EncoderHandle,
transmitter: Option <Transmitter>,
capture: Task <Capture, ()>,
encoder: Task <Encoder, EncoderTaskMetadata>,
transmitter: Task <Transmitter, ()>,
}
impl <'a> Driver <'_>
@ -45,13 +47,13 @@ impl <'a> Driver <'_>
ctl: Default::default (),
capture: Some (Default::default ()),
encoder: EncoderHandle::Stopped (Default::default ()),
transmitter: Some (Default::default ()),
capture: Capture::default ().into (),
encoder: Encoder::default ().into (),
transmitter: Transmitter::default ().into (),
}
}
fn main (&mut self)
fn main (&mut self) -> Result <(), TaskError>
{
for _ in 0..10
{
@ -60,11 +62,11 @@ impl <'a> Driver <'_>
if let Some (event) = ev.tx
{
dbg! (&event);
self.handle_tx_event (event);
self.handle_tx_event (event)?;
match self.recv.try_recv ()
{
Ok (output) => self.handle_task_output (output),
Ok (output) => self.handle_task_output (output)?,
Err (_) => (),
}
}
@ -72,52 +74,41 @@ impl <'a> Driver <'_>
{
match self.recv.recv ()
{
Ok (output) => self.handle_task_output (output),
Ok (output) => self.handle_task_output (output)?,
Err (_) => (),
}
}
}
Ok (())
}
fn handle_task_output (&mut self, output: TaskOutput)
fn handle_task_output (&mut self, output: TaskOutput) -> Result <(), TaskError>
{
dbg! (&output);
match output
{
TaskOutput::TxCapture ((cap, buf_raw)) =>
{
let _old = match std::mem::replace (&mut self.capture, Some (cap))
{
None => (),
_ => panic! ("tried to finish an already finished capture"),
};
self.capture.stop (cap)?;
self.ctl.handle_capture_frame (buf_raw).unwrap ();
},
TaskOutput::TxEncode (enc) =>
{
let _metadata = match std::mem::replace (&mut self.encoder, EncoderHandle::Stopped (enc))
{
EncoderHandle::Running (_) => (),
_ => panic! ("tried to finish an already finished encode"),
};
let _metadata = self.encoder.stop (enc)?;
self.ctl.handle_encoder_finished ();
},
TaskOutput::TxTransmit ((tx, busy)) =>
{
let _old = match std::mem::replace (&mut self.transmitter, Some (tx))
{
None => (),
_ => panic! ("tried to finish an already finished transmit"),
};
self.transmitter.stop (tx)?;
self.ctl.handle_network_busy (busy);
}
}
Ok (())
}
fn handle_tx_event (&mut self, event: TxPipelineEvent)
fn handle_tx_event (&mut self, event: TxPipelineEvent) -> Result <(), TaskError>
{
let dur_capture = Duration::from_millis (30);
let dur_encode = Duration::from_millis (20);
@ -128,14 +119,7 @@ impl <'a> Driver <'_>
match event
{
TxPipelineEvent::Capture => {
let cap = match std::mem::replace (&mut self.capture, None)
{
Some (x) => x,
_ => {
dbg! ("capture is already running");
return;
},
};
let cap = self.capture.start (())?;
self.pool.spawn (move ||
{
@ -149,11 +133,7 @@ impl <'a> Driver <'_>
},
TxPipelineEvent::PollEncoder =>
{
let encoder = match &mut self.encoder
{
EncoderHandle::Running (_) => panic! ("tried to poll encoder while it was running"),
EncoderHandle::Stopped (x) => x,
};
let encoder = self.encoder.try_inner_mut ()?;
let mut buf = vec! [];
@ -169,12 +149,7 @@ impl <'a> Driver <'_>
}
TxPipelineEvent::Encode (buf_raw) =>
{
let metadata = EncoderTaskMetadata {};
let mut encoder = match std::mem::replace (&mut self.encoder, EncoderHandle::Running (metadata))
{
EncoderHandle::Stopped (x) => x,
_ => panic! ("tried to run the same encoder twice"),
};
let mut encoder = self.encoder.start (EncoderTaskMetadata {})?;
self.pool.spawn (move ||
{
@ -184,12 +159,7 @@ impl <'a> Driver <'_>
});
},
TxPipelineEvent::Transmit (_buf_enc) => {
let tx = match std::mem::replace (&mut self.transmitter, None)
{
Some (x) => x,
_ => panic! ("tried to run the same transmitter twice"),
};
let tx = self.transmitter.start (())?;
self.pool.spawn (move ||
{
thread::sleep (dur_transmit);
@ -197,13 +167,75 @@ impl <'a> Driver <'_>
});
},
}
Ok (())
}
}
enum EncoderHandle
/// This is probably just a fancy Cell or something.
enum Task <S, R>
{
Stopped (Encoder),
Running (EncoderTaskMetadata),
Stopped (S),
Running (R),
}
#[derive (Debug, Error)]
enum TaskError
{
#[error ("tried to start already-running task")]
AlreadyRunning,
#[error ("tried to stop already-stopped task")]
AlreadyStopped,
#[error ("cannot access a task's inner object while it's running")]
CantAccessWhileRunning,
}
impl <S, R> From <S> for Task <S, R>
{
fn from (x: S) -> Self
{
Self::Stopped (x)
}
}
impl <S, R> Task <S, R>
{
fn try_inner (&self) -> Result <&S, TaskError>
{
match self
{
Self::Running (_) => Err (TaskError::CantAccessWhileRunning),
Self::Stopped (x) => Ok (x),
}
}
fn try_inner_mut (&mut self) -> Result <&mut S, TaskError>
{
match self
{
Self::Running (_) => Err (TaskError::CantAccessWhileRunning),
Self::Stopped (x) => Ok (x),
}
}
fn start (&mut self, x: R) -> Result <S, TaskError>
{
match std::mem::replace (self, Self::Running (x))
{
Self::Running (_) => Err (TaskError::AlreadyRunning),
Self::Stopped (x) => Ok (x),
}
}
fn stop (&mut self, x: S) -> Result <R, TaskError>
{
match std::mem::replace (self, Self::Stopped (x))
{
Self::Stopped (_) => Err (TaskError::AlreadyStopped),
Self::Running (x) => Ok (x),
}
}
}
struct EncoderTaskMetadata