diff --git a/.gitignore b/.gitignore index ea8c4bf..38fa69c 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ +/deps /target diff --git a/Cargo.lock b/Cargo.lock index 0502722..54ae0a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "cfg-if" version = "1.0.0" @@ -44,7 +50,7 @@ dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", - "memoffset", + "memoffset 0.9.0", "scopeguard", ] @@ -67,6 +73,7 @@ checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" name = "five_five_five" version = "0.1.0" dependencies = [ + "linuxvideo", "rayon", "thiserror", ] @@ -83,6 +90,30 @@ version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +[[package]] +name = "linuxvideo" +version = "0.3.1" +dependencies = [ + "bitflags", + "log", + "nix", +] + +[[package]] +name = "log" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "memoffset" version = "0.9.0" @@ -92,6 +123,19 @@ dependencies = [ "autocfg", ] +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags", + "cfg-if", + "libc", + "memoffset 0.7.1", + "pin-utils", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -102,6 +146,12 @@ dependencies = [ "libc", ] +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro2" version = "1.0.66" diff --git a/Cargo.toml b/Cargo.toml index eed44ae..f58f45f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,5 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +linuxvideo = { path = "deps/LinuxVideo" } rayon = "1.7.0" thiserror = "1.0.48" diff --git a/src/capture.rs b/src/capture.rs new file mode 100644 index 0000000..9574175 --- /dev/null +++ b/src/capture.rs @@ -0,0 +1,69 @@ +/// Wraps non-thread-safe capture device like a v4l webcam + +use linuxvideo:: +{ + format:: + { + PixFormat, + PixelFormat, + }, + Device, + stream::ReadStream, +}; + +pub struct Capture +{ + size_image: usize, + stream: ReadStream, +} + +impl Capture +{ + pub fn new () -> Result + { + let x = Device::open ("/dev/video0")?; + dbg! (x.formats (linuxvideo::BufType::VIDEO_CAPTURE).collect::> ()); + let x = x.video_capture (PixFormat::new (u32::MAX, u32::MAX, PixelFormat::MJPG))?; + dbg! (x.format ()); + let size_image = usize::try_from (x.format ().size_image ()).unwrap (); + let stream = x.into_stream ()?; + + Ok (Self + { + size_image, + stream, + }) + } + + pub fn size_image (&self) -> usize + { + self.size_image + } + + /// Blocks until the capture device gets us a frame + + pub fn wait_for_frame (&mut self, output: &mut [u8]) -> Result + { + if output.len () < self.size_image () + { + return Err (Error::OutputBufferTooSmall); + } + + Ok (self.stream.dequeue (|view| + { + let bytesused = usize::try_from (view.bytesused ()).unwrap (); + let input = &view [0..bytesused]; + &mut output [0..bytesused].copy_from_slice (&input); + Ok (input.len ()) + })?) + } +} + +#[derive (Debug, thiserror::Error)] +pub enum Error +{ + #[error ("I/O")] + Io (#[from] std::io::Error), + #[error ("output buffer too small")] + OutputBufferTooSmall, +} \ No newline at end of file diff --git a/src/controller.rs b/src/controller.rs index b62c28a..a949d38 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -41,9 +41,9 @@ impl Controller self.tx_pipe.handle_encoded_packet (buf_enc) } - pub fn handle_network_busy (&mut self, x: bool) + pub fn handle_transmitted (&mut self, x: bool) { - self.tx_pipe.handle_network_busy (x) + self.tx_pipe.handle_transmitted (x) } pub fn poll (&mut self) -> ControlEvent @@ -147,7 +147,8 @@ struct TxPipeline capture_is_busy: bool, encoder_has_data: bool, encoder_is_busy: bool, - network_busy: bool, + network_congested: bool, + transmitter_is_busy: bool, } #[derive (Debug)] @@ -206,9 +207,10 @@ impl TxPipeline Ok (()) } - fn handle_network_busy (&mut self, busy: bool) + fn handle_transmitted (&mut self, busy: bool) { - self.network_busy = busy; + self.network_congested = busy; + self.transmitter_is_busy = false; } fn has_encoded_packet (&self) -> bool @@ -223,10 +225,11 @@ impl TxPipeline fn poll (&mut self) -> Option { - if ! self.network_busy + if ! self.network_congested && ! self.transmitter_is_busy { if let Some (buf_enc) = self.buf_enc.take () { + self.transmitter_is_busy = true; return Some (TxPipelineEvent::Transmit (buf_enc)); } } diff --git a/src/main.rs b/src/main.rs index bdbd184..df079c6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,17 +10,44 @@ use std::{ }; use rayon::ThreadPool; -use thiserror::Error; use crate::controller::*; +mod capture; mod controller; -fn main() -> Result <(), TaskError> +fn main() -> Result <(), Error> { - let pool = rayon::ThreadPoolBuilder::new ().build ().unwrap (); - let mut driver = Driver::new (&pool); - driver.main ()?; + let mut args = std::env::args (); + let _exe_name = args.next (); + match args.next ().as_deref () + { + None => + { + let pool = rayon::ThreadPoolBuilder::new ().build ().unwrap (); + let mut driver = Driver::new (&pool)?; + driver.main ()?; + }, + Some ("capture") => + { + use std::io::Write; + let mut cap = capture::Capture::new ()?; + let mut buf = vec! [0u8; cap.size_image ()]; + let mut bytesused = 0; + + for _ in 0..5 + { + let rc = cap.wait_for_frame (&mut buf); + bytesused = rc.unwrap (); + dbg! (bytesused); + } + + let mut f = std::fs::File::create ("data.jpeg").unwrap (); + f.write_all (&buf [0..bytesused]).unwrap (); + }, + Some (_) => eprintln! ("Unknown subcommand"), + } + Ok (()) } @@ -50,14 +77,14 @@ struct Driver <'a> ctl: Controller, - capture: Task , + capture: Task , encoder: Task , transmitter: Task , } impl <'a> Driver <'_> { - fn new (pool: &'a ThreadPool) -> Driver <'a> + fn new (pool: &'a ThreadPool) -> Result , Error> { let (send, recv) = mpsc::sync_channel (8); @@ -67,22 +94,22 @@ impl <'a> Driver <'_> send, }; - Driver + Ok (Driver { thread_sys, recv, ctl: Default::default (), - capture: Capture::default ().into (), + capture: capture::Capture::new ()?.into (), encoder: Encoder::default ().into (), transmitter: Transmitter::default ().into (), - } + }) } fn main (&mut self) -> Result <(), TaskError> { - for _ in 0..10 + for _ in 0..50 { let ev = self.ctl.poll (); @@ -132,6 +159,9 @@ impl <'a> Driver <'_> match event { TxPipelineEvent::Capture => CaptureWork::dispatch (self, ())?, + TxPipelineEvent::Encode (buf_raw) => EncodeWork::dispatch (self, buf_raw)?, + TxPipelineEvent::Transmit (buf_enc) => TransmitWork::dispatch (self, buf_enc)?, + TxPipelineEvent::PollEncoder => { let encoder = self.encoder.try_inner_mut ()?; @@ -147,9 +177,7 @@ impl <'a> Driver <'_> { self.ctl.handle_encoded_packet (None).unwrap (); } - } - TxPipelineEvent::Encode (buf_raw) => EncodeWork::dispatch (self, buf_raw)?, - TxPipelineEvent::Transmit (buf_enc) => TransmitWork::dispatch (self, buf_enc)?, + }, } Ok (()) @@ -164,7 +192,7 @@ enum Task Running (R), } -#[derive (Debug, Error)] +#[derive (Debug, thiserror::Error)] enum TaskError { #[error ("tried to start already-running task")] @@ -245,24 +273,6 @@ impl fmt::Debug for TaskOutput { } } -/// Wraps non-thread-safe capture device like a v4l webcam - -#[derive (Default)] -struct Capture -{ - -} - -impl Capture -{ - /// Blocks until the capture device gets us a frame - - fn wait_for_frame (&mut self, _buf: &mut [u8]) -> bool - { - false - } -} - /// Wraps non-thread-safe encoder state like from ffmpeg #[derive (Default)] @@ -352,7 +362,7 @@ impl Transmitter struct CaptureWork { - cap: Capture, + cap: capture::Capture, buf_raw: BufRaw, } @@ -364,7 +374,7 @@ impl CaptureWork let mut cap = driver.capture.start (metadata)?; driver.thread_sys.dispatch (move || { - let dur_capture = Duration::from_millis (30); + let dur_capture = Duration::from_millis (1000); thread::sleep (dur_capture); let buf_raw = BufRaw { @@ -441,7 +451,16 @@ impl TransmitWork fn finish (self, driver: &mut Driver) -> Result <(), TaskError> { driver.transmitter.stop (self.tx)?; - driver.ctl.handle_network_busy (self.busy); + driver.ctl.handle_transmitted (self.busy); Ok (()) } } + +#[derive (Debug, thiserror::Error)] +enum Error +{ + #[error ("capture")] + Capture (#[from] capture::Error), + #[error ("task")] + Task (#[from] TaskError), +}