From 677a8c7ebd9948cd510331a85cb385019c7bb6e4 Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Sat, 9 Sep 2023 03:47:31 -0500 Subject: [PATCH] initial commit --- .gitignore | 1 + Cargo.lock | 186 ++++++++++++++++ Cargo.toml | 10 + src/main.rs | 599 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 796 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..0502722 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,186 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "either" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" + +[[package]] +name = "five_five_five" +version = "0.1.0" +dependencies = [ + "rayon", + "thiserror", +] + +[[package]] +name = "hermit-abi" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" + +[[package]] +name = "libc" +version = "0.2.147" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" + +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "proc-macro2" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rayon" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "num_cpus", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "syn" +version = "2.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "1.0.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..eed44ae --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "five_five_five" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +rayon = "1.7.0" +thiserror = "1.0.48" diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..22088b8 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,599 @@ +use std::{ + fmt, + sync::mpsc, + thread, + time::Duration, +}; + +use rayon::ThreadPool; +use thiserror::Error; + +fn main() +{ + let pool = rayon::ThreadPoolBuilder::new ().build ().unwrap (); + let mut driver = Driver::new (&pool); + driver.main (); +} + +struct Driver <'a> +{ + pool: &'a ThreadPool, + send: mpsc::SyncSender , + recv: mpsc::Receiver , + + ctl: Controller, + + capture: Option , + encoder: EncoderHandle, + transmitter: Option , +} + +impl <'a> Driver <'_> +{ + fn new (pool: &'a ThreadPool) -> Driver <'a> + { + let (send, recv) = mpsc::sync_channel (8); + + Driver + { + pool, + send, + recv, + + ctl: Default::default (), + + capture: Some (Default::default ()), + encoder: EncoderHandle::Stopped (Default::default ()), + transmitter: Some (Default::default ()), + } + } + + fn main (&mut self) + { + for _ in 0..10 + { + let ev = self.ctl.poll (); + + if let Some (event) = ev.tx + { + dbg! (&event); + self.handle_tx_event (event); + + match self.recv.try_recv () + { + Ok (output) => self.handle_task_output (output), + Err (_) => (), + } + } + else + { + match self.recv.recv () + { + Ok (output) => self.handle_task_output (output), + Err (_) => (), + } + } + } + } + + fn handle_task_output (&mut self, output: TaskOutput) + { + dbg! (&output); + match output + { + TaskOutput::TxCapture ((cap, buf_raw)) => + { + let _old = match std::mem::replace (&mut self.capture, Some (cap)) + { + None => (), + _ => panic! ("tried to finish an already finished capture"), + }; + + self.ctl.tx_pipe.handle_capture_frame (buf_raw).unwrap (); + }, + TaskOutput::TxEncode (enc) => + { + let _metadata = match std::mem::replace (&mut self.encoder, EncoderHandle::Stopped (enc)) + { + EncoderHandle::Running (_) => (), + _ => panic! ("tried to finish an already finished encode"), + }; + + self.ctl.tx_pipe.encoder_is_busy = false; + }, + TaskOutput::TxTransmit ((tx, busy)) => + { + let _old = match std::mem::replace (&mut self.transmitter, Some (tx)) + { + None => (), + _ => panic! ("tried to finish an already finished transmit"), + }; + + self.ctl.tx_pipe.handle_network_busy (busy); + } + } + } + + fn handle_tx_event (&mut self, event: TxPipelineEvent) + { + let dur_capture = Duration::from_millis (30); + let dur_encode = Duration::from_millis (20); + let dur_transmit = Duration::from_millis (200); + + let send = self.send.clone (); + + match event + { + TxPipelineEvent::Capture => { + let cap = match std::mem::replace (&mut self.capture, None) + { + Some (x) => x, + _ => { + dbg! ("capture is already running"); + return; + }, + }; + + self.pool.spawn (move || + { + thread::sleep (dur_capture); + let buf_raw = BufRaw + { + buf: vec![], + }; + send.send (TaskOutput::TxCapture ((cap, buf_raw))).unwrap (); + }); + }, + TxPipelineEvent::PollEncoder => + { + let encoder = match &mut self.encoder + { + EncoderHandle::Running (_) => panic! ("tried to poll encoder while it was running"), + EncoderHandle::Stopped (x) => x, + }; + + let mut buf = vec! []; + + let may_have_data = encoder.poll_encoded (&mut buf); + if may_have_data + { + self.ctl.tx_pipe.handle_encoded_packet (BufEncoded {buf}).unwrap (); + } + self.ctl.tx_pipe.encoder_has_data = may_have_data; + } + TxPipelineEvent::Encode (buf_raw) => + { + let metadata = EncoderTaskMetadata {}; + let mut encoder = match std::mem::replace (&mut self.encoder, EncoderHandle::Running (metadata)) + { + EncoderHandle::Stopped (x) => x, + _ => panic! ("tried to run the same encoder twice"), + }; + + self.pool.spawn (move || + { + thread::sleep (dur_encode); + encoder.try_handle_insert_data (&buf_raw.buf).unwrap (); + send.send (TaskOutput::TxEncode (encoder)).unwrap (); + }); + }, + TxPipelineEvent::Transmit (_buf_enc) => { + let tx = match std::mem::replace (&mut self.transmitter, None) + { + Some (x) => x, + _ => panic! ("tried to run the same transmitter twice"), + }; + + self.pool.spawn (move || + { + thread::sleep (dur_transmit); + send.send (TaskOutput::TxTransmit ((tx, false))).unwrap (); + }); + }, + } + } +} + +enum EncoderHandle +{ + Stopped (Encoder), + Running (EncoderTaskMetadata), +} + +struct EncoderTaskMetadata +{ + +} + +enum TaskOutput +{ + TxCapture ((Capture, BufRaw)), + TxEncode (Encoder), + TxTransmit ((Transmitter, bool)), +} + +impl fmt::Debug for TaskOutput { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self + { + Self::TxCapture (_) => f.debug_tuple ("TaskOutput::TxCapture").finish (), + Self::TxEncode (_) => f.debug_tuple ("TaskOutput::TxEncode").finish (), + Self::TxTransmit (_) => f.debug_tuple ("TaskOutput::TxTransmit").finish (), + } + } +} + +#[derive (Default)] +struct Controller +{ + /// Pipeline for pulling data from the network, decoding it, + /// and displaying it. + + rx_pipe: RxPipeline, + + /// Pipeline for pulling data from capture devices, encoding it, + /// and sending it over the network. + + tx_pipe: TxPipeline, +} + +#[derive (Debug)] +struct ControlEvent +{ + rx: Option , + tx: Option , +} + +impl Controller +{ + fn poll (&mut self) -> ControlEvent + { + let rx = self.rx_pipe.poll (); + let tx = self.tx_pipe.poll (); + + ControlEvent { + rx, + tx, + } + } +} + +#[derive (Default)] +struct RxPipeline +{ + buf_enc: Option , + buf_raw: Option , +} + +#[derive (Debug)] +enum RxPipelineEvent +{ + DisplayRaw (BufRaw), + Decode (BufEncoded), + Receive, +} + +#[derive (Debug, Error)] +enum RxPipeError +{ + #[error ("already have encoded data")] + AlreadyHaveEncodedData, + #[error ("already have raw data")] + AlreadyHaveRawData, +} + +impl RxPipeline +{ + fn handle_encoded_packet (&mut self, buf_enc: BufEncoded) -> Result <(), RxPipeError> + { + if self.has_encoded_data () + { + return Err (RxPipeError::AlreadyHaveEncodedData); + } + + self.buf_enc = Some (buf_enc); + + Ok (()) + } + + fn handle_raw_frame (&mut self, buf_raw: BufRaw) -> Result <(), RxPipeError> + { + if self.has_raw_data () + { + return Err (RxPipeError::AlreadyHaveRawData); + } + + self.buf_raw = Some (buf_raw); + + Ok (()) + } + + fn poll (&mut self) -> Option + { + if let Some (buf_raw) = self.buf_raw.take () + { + return Some (RxPipelineEvent::DisplayRaw (buf_raw)); + } + + if ! self.has_raw_data () + { + if let Some (buf_enc) = self.buf_enc.take () + { + return Some (RxPipelineEvent::Decode (buf_enc)); + } + } + + if ! self.has_encoded_data () + { + return Some (RxPipelineEvent::Receive); + } + + None + } + + /// True if we have a buffer of received data, ready + /// to be decoded. + + fn has_encoded_data (&self) -> bool + { + self.buf_enc.is_some () + } + + fn has_raw_data (&self) -> bool + { + self.buf_raw.is_some () + } +} + +#[derive (Default)] +struct TxPipeline +{ + buf_enc: Option , + buf_raw: Option , + capture_is_busy: bool, + encoder_has_data: bool, + encoder_is_busy: bool, + network_busy: bool, +} + +#[derive (Debug)] +enum TxPipelineEvent +{ + /// Capture a raw frame and pass it to handle_capture_frame + Capture, + + /// Poll the encoder. If it returns a packet, pass it to + /// handle_encoded_packet. + PollEncoder, + + /// Encode this raw frame and pass the encoded frame to + /// handle_encoded_packet + Encode (BufRaw), + + /// Transmit this encoded frame to the network + Transmit (BufEncoded), +} + +impl TxPipeline +{ + fn handle_capture_frame (&mut self, buf_raw: BufRaw) + -> Result <(), ControlError> + { + self.capture_is_busy = false; + + if self.has_raw_frame () + { + return Err (ControlError::AlreadyHaveRawData); + } + + self.buf_raw = Some (buf_raw); + + Ok (()) + } + + fn handle_encoded_packet (&mut self, buf_enc: BufEncoded) -> Result <(), ControlError> + { + self.encoder_is_busy = false; + + if self.has_encoded_packet () + { + return Err (ControlError::AlreadyHaveEncodedData); + } + + self.buf_enc = Some (buf_enc); + + Ok (()) + } + + fn handle_network_busy (&mut self, busy: bool) + { + self.network_busy = busy; + } + + fn has_encoded_packet (&self) -> bool + { + self.buf_enc.is_some () + } + + fn has_raw_frame (&self) -> bool + { + self.buf_raw.is_some () + } + + fn poll (&mut self) -> Option + { + if ! self.network_busy + { + if let Some (buf_enc) = self.buf_enc.take () + { + return Some (TxPipelineEvent::Transmit (buf_enc)); + } + } + + if ! self.has_encoded_packet () && ! self.encoder_is_busy + { + if self.encoder_has_data + { + return Some (TxPipelineEvent::PollEncoder); + } + + if let Some (buf_raw) = self.buf_raw.take () + { + self.encoder_has_data = true; + self.encoder_is_busy = true; + return Some (TxPipelineEvent::Encode (buf_raw)); + } + } + + if ! self.has_raw_frame () + { + if ! self.capture_is_busy + { + self.capture_is_busy = true; + return Some (TxPipelineEvent::Capture); + } + } + + None + } +} + +/// Wraps non-thread-safe capture device like a v4l webcam + +#[derive (Default)] +struct Capture +{ + +} + +impl Capture +{ + /// Blocks until the capture device gets us a frame + + fn wait_for_frame (&mut self, _buf: &mut [u8]) -> bool + { + false + } +} + +/// Wraps non-thread-safe encoder state like from ffmpeg + +#[derive (Default)] +struct Encoder +{ + /// AVCodecContext + ctx: (), + + /// Encoded AVPacket + pkt_enc: Option <()>, + + /// Bogus debugging thing + internal_buffer: usize, +} + +impl Encoder +{ + /// If any encoded bytes are ready, and the caller's buffer + /// is big enough, copy encoded bytes into the caller's buffer, + /// consuming them from internal buffers. + /// + /// Won't block + + fn poll_encoded (&mut self, _buf_enc: &mut [u8]) -> bool + { + if let Some (_pkt) = self.pkt_enc.take () + { + // Do copy + return true; + } + + if self.internal_buffer >= 2 + { + self.internal_buffer -= 2; + // Do copy + return true; + } + + false + } + + /// Insert raw data. If the internal buffers are full, the new + /// data is rejected. + /// Call poll_encoded before calling this, to drain internal + /// buffers if needed. + /// + /// Blocks if the internals perform encoding + + fn try_handle_insert_data (&mut self, _buf_raw: &[u8]) -> Result <(), ControlError> + { + if self.pkt_enc.is_some () + { + return Err (ControlError::AlreadyHaveRawData); + } + + if self.internal_buffer >= 3 + { + return Err (ControlError::AlreadyHaveRawData); + } + + self.internal_buffer += 3; + + Ok (()) + } +} + +/// Wraps a network transmitter like a TCP send stream or a QUIC +/// outgoing uni stream + +#[derive (Default)] +struct Transmitter +{ + +} + +impl Transmitter +{ + /// Tries to transmit data over the network. + /// If Transmitter sees congestion, it may reject the transmission + /// until a later time. + + fn try_send (&mut self, _now: std::time::Instant, _buf: &[u8]) -> bool + { + false + } +} + +struct BufEncoded +{ + buf: Vec +} + +impl fmt::Debug for BufEncoded { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f + .debug_struct("BufEncoded") + .finish() + } +} + +struct BufRaw +{ + buf: Vec +} + +impl fmt::Debug for BufRaw { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f + .debug_struct("BufRaw") + .finish() + } +} + +#[derive (Debug, Error)] +enum ControlError +{ + #[error ("already have encoded data")] + AlreadyHaveEncodedData, + #[error ("already have raw data")] + AlreadyHaveRawData, +}