initial commit

main
_ 2023-09-09 03:47:31 -05:00
commit 677a8c7ebd
4 changed files with 796 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

186
Cargo.lock generated Normal file
View File

@ -0,0 +1,186 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "crossbeam-channel"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7"
dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294"
dependencies = [
"cfg-if",
]
[[package]]
name = "either"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
[[package]]
name = "five_five_five"
version = "0.1.0"
dependencies = [
"rayon",
"thiserror",
]
[[package]]
name = "hermit-abi"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b"
[[package]]
name = "libc"
version = "0.2.147"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3"
[[package]]
name = "memoffset"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c"
dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "proc-macro2"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rayon"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"num_cpus",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "syn"
version = "2.0.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "718fa2415bcb8d8bd775917a1bf12a7931b6dfa890753378538118181e0cb398"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "thiserror"
version = "1.0.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.48"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "unicode-ident"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c"

10
Cargo.toml Normal file
View File

@ -0,0 +1,10 @@
[package]
name = "five_five_five"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rayon = "1.7.0"
thiserror = "1.0.48"

599
src/main.rs Normal file
View File

@ -0,0 +1,599 @@
use std::{
fmt,
sync::mpsc,
thread,
time::Duration,
};
use rayon::ThreadPool;
use thiserror::Error;
fn main()
{
let pool = rayon::ThreadPoolBuilder::new ().build ().unwrap ();
let mut driver = Driver::new (&pool);
driver.main ();
}
struct Driver <'a>
{
pool: &'a ThreadPool,
send: mpsc::SyncSender <TaskOutput>,
recv: mpsc::Receiver <TaskOutput>,
ctl: Controller,
capture: Option <Capture>,
encoder: EncoderHandle,
transmitter: Option <Transmitter>,
}
impl <'a> Driver <'_>
{
fn new (pool: &'a ThreadPool) -> Driver <'a>
{
let (send, recv) = mpsc::sync_channel (8);
Driver
{
pool,
send,
recv,
ctl: Default::default (),
capture: Some (Default::default ()),
encoder: EncoderHandle::Stopped (Default::default ()),
transmitter: Some (Default::default ()),
}
}
fn main (&mut self)
{
for _ in 0..10
{
let ev = self.ctl.poll ();
if let Some (event) = ev.tx
{
dbg! (&event);
self.handle_tx_event (event);
match self.recv.try_recv ()
{
Ok (output) => self.handle_task_output (output),
Err (_) => (),
}
}
else
{
match self.recv.recv ()
{
Ok (output) => self.handle_task_output (output),
Err (_) => (),
}
}
}
}
fn handle_task_output (&mut self, output: TaskOutput)
{
dbg! (&output);
match output
{
TaskOutput::TxCapture ((cap, buf_raw)) =>
{
let _old = match std::mem::replace (&mut self.capture, Some (cap))
{
None => (),
_ => panic! ("tried to finish an already finished capture"),
};
self.ctl.tx_pipe.handle_capture_frame (buf_raw).unwrap ();
},
TaskOutput::TxEncode (enc) =>
{
let _metadata = match std::mem::replace (&mut self.encoder, EncoderHandle::Stopped (enc))
{
EncoderHandle::Running (_) => (),
_ => panic! ("tried to finish an already finished encode"),
};
self.ctl.tx_pipe.encoder_is_busy = false;
},
TaskOutput::TxTransmit ((tx, busy)) =>
{
let _old = match std::mem::replace (&mut self.transmitter, Some (tx))
{
None => (),
_ => panic! ("tried to finish an already finished transmit"),
};
self.ctl.tx_pipe.handle_network_busy (busy);
}
}
}
fn handle_tx_event (&mut self, event: TxPipelineEvent)
{
let dur_capture = Duration::from_millis (30);
let dur_encode = Duration::from_millis (20);
let dur_transmit = Duration::from_millis (200);
let send = self.send.clone ();
match event
{
TxPipelineEvent::Capture => {
let cap = match std::mem::replace (&mut self.capture, None)
{
Some (x) => x,
_ => {
dbg! ("capture is already running");
return;
},
};
self.pool.spawn (move ||
{
thread::sleep (dur_capture);
let buf_raw = BufRaw
{
buf: vec![],
};
send.send (TaskOutput::TxCapture ((cap, buf_raw))).unwrap ();
});
},
TxPipelineEvent::PollEncoder =>
{
let encoder = match &mut self.encoder
{
EncoderHandle::Running (_) => panic! ("tried to poll encoder while it was running"),
EncoderHandle::Stopped (x) => x,
};
let mut buf = vec! [];
let may_have_data = encoder.poll_encoded (&mut buf);
if may_have_data
{
self.ctl.tx_pipe.handle_encoded_packet (BufEncoded {buf}).unwrap ();
}
self.ctl.tx_pipe.encoder_has_data = may_have_data;
}
TxPipelineEvent::Encode (buf_raw) =>
{
let metadata = EncoderTaskMetadata {};
let mut encoder = match std::mem::replace (&mut self.encoder, EncoderHandle::Running (metadata))
{
EncoderHandle::Stopped (x) => x,
_ => panic! ("tried to run the same encoder twice"),
};
self.pool.spawn (move ||
{
thread::sleep (dur_encode);
encoder.try_handle_insert_data (&buf_raw.buf).unwrap ();
send.send (TaskOutput::TxEncode (encoder)).unwrap ();
});
},
TxPipelineEvent::Transmit (_buf_enc) => {
let tx = match std::mem::replace (&mut self.transmitter, None)
{
Some (x) => x,
_ => panic! ("tried to run the same transmitter twice"),
};
self.pool.spawn (move ||
{
thread::sleep (dur_transmit);
send.send (TaskOutput::TxTransmit ((tx, false))).unwrap ();
});
},
}
}
}
enum EncoderHandle
{
Stopped (Encoder),
Running (EncoderTaskMetadata),
}
struct EncoderTaskMetadata
{
}
enum TaskOutput
{
TxCapture ((Capture, BufRaw)),
TxEncode (Encoder),
TxTransmit ((Transmitter, bool)),
}
impl fmt::Debug for TaskOutput {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self
{
Self::TxCapture (_) => f.debug_tuple ("TaskOutput::TxCapture").finish (),
Self::TxEncode (_) => f.debug_tuple ("TaskOutput::TxEncode").finish (),
Self::TxTransmit (_) => f.debug_tuple ("TaskOutput::TxTransmit").finish (),
}
}
}
#[derive (Default)]
struct Controller
{
/// Pipeline for pulling data from the network, decoding it,
/// and displaying it.
rx_pipe: RxPipeline,
/// Pipeline for pulling data from capture devices, encoding it,
/// and sending it over the network.
tx_pipe: TxPipeline,
}
#[derive (Debug)]
struct ControlEvent
{
rx: Option <RxPipelineEvent>,
tx: Option <TxPipelineEvent>,
}
impl Controller
{
fn poll (&mut self) -> ControlEvent
{
let rx = self.rx_pipe.poll ();
let tx = self.tx_pipe.poll ();
ControlEvent {
rx,
tx,
}
}
}
#[derive (Default)]
struct RxPipeline
{
buf_enc: Option <BufEncoded>,
buf_raw: Option <BufRaw>,
}
#[derive (Debug)]
enum RxPipelineEvent
{
DisplayRaw (BufRaw),
Decode (BufEncoded),
Receive,
}
#[derive (Debug, Error)]
enum RxPipeError
{
#[error ("already have encoded data")]
AlreadyHaveEncodedData,
#[error ("already have raw data")]
AlreadyHaveRawData,
}
impl RxPipeline
{
fn handle_encoded_packet (&mut self, buf_enc: BufEncoded) -> Result <(), RxPipeError>
{
if self.has_encoded_data ()
{
return Err (RxPipeError::AlreadyHaveEncodedData);
}
self.buf_enc = Some (buf_enc);
Ok (())
}
fn handle_raw_frame (&mut self, buf_raw: BufRaw) -> Result <(), RxPipeError>
{
if self.has_raw_data ()
{
return Err (RxPipeError::AlreadyHaveRawData);
}
self.buf_raw = Some (buf_raw);
Ok (())
}
fn poll (&mut self) -> Option <RxPipelineEvent>
{
if let Some (buf_raw) = self.buf_raw.take ()
{
return Some (RxPipelineEvent::DisplayRaw (buf_raw));
}
if ! self.has_raw_data ()
{
if let Some (buf_enc) = self.buf_enc.take ()
{
return Some (RxPipelineEvent::Decode (buf_enc));
}
}
if ! self.has_encoded_data ()
{
return Some (RxPipelineEvent::Receive);
}
None
}
/// True if we have a buffer of received data, ready
/// to be decoded.
fn has_encoded_data (&self) -> bool
{
self.buf_enc.is_some ()
}
fn has_raw_data (&self) -> bool
{
self.buf_raw.is_some ()
}
}
#[derive (Default)]
struct TxPipeline
{
buf_enc: Option <BufEncoded>,
buf_raw: Option <BufRaw>,
capture_is_busy: bool,
encoder_has_data: bool,
encoder_is_busy: bool,
network_busy: bool,
}
#[derive (Debug)]
enum TxPipelineEvent
{
/// Capture a raw frame and pass it to handle_capture_frame
Capture,
/// Poll the encoder. If it returns a packet, pass it to
/// handle_encoded_packet.
PollEncoder,
/// Encode this raw frame and pass the encoded frame to
/// handle_encoded_packet
Encode (BufRaw),
/// Transmit this encoded frame to the network
Transmit (BufEncoded),
}
impl TxPipeline
{
fn handle_capture_frame (&mut self, buf_raw: BufRaw)
-> Result <(), ControlError>
{
self.capture_is_busy = false;
if self.has_raw_frame ()
{
return Err (ControlError::AlreadyHaveRawData);
}
self.buf_raw = Some (buf_raw);
Ok (())
}
fn handle_encoded_packet (&mut self, buf_enc: BufEncoded) -> Result <(), ControlError>
{
self.encoder_is_busy = false;
if self.has_encoded_packet ()
{
return Err (ControlError::AlreadyHaveEncodedData);
}
self.buf_enc = Some (buf_enc);
Ok (())
}
fn handle_network_busy (&mut self, busy: bool)
{
self.network_busy = busy;
}
fn has_encoded_packet (&self) -> bool
{
self.buf_enc.is_some ()
}
fn has_raw_frame (&self) -> bool
{
self.buf_raw.is_some ()
}
fn poll (&mut self) -> Option <TxPipelineEvent>
{
if ! self.network_busy
{
if let Some (buf_enc) = self.buf_enc.take ()
{
return Some (TxPipelineEvent::Transmit (buf_enc));
}
}
if ! self.has_encoded_packet () && ! self.encoder_is_busy
{
if self.encoder_has_data
{
return Some (TxPipelineEvent::PollEncoder);
}
if let Some (buf_raw) = self.buf_raw.take ()
{
self.encoder_has_data = true;
self.encoder_is_busy = true;
return Some (TxPipelineEvent::Encode (buf_raw));
}
}
if ! self.has_raw_frame ()
{
if ! self.capture_is_busy
{
self.capture_is_busy = true;
return Some (TxPipelineEvent::Capture);
}
}
None
}
}
/// Wraps non-thread-safe capture device like a v4l webcam
#[derive (Default)]
struct Capture
{
}
impl Capture
{
/// Blocks until the capture device gets us a frame
fn wait_for_frame (&mut self, _buf: &mut [u8]) -> bool
{
false
}
}
/// Wraps non-thread-safe encoder state like from ffmpeg
#[derive (Default)]
struct Encoder
{
/// AVCodecContext
ctx: (),
/// Encoded AVPacket
pkt_enc: Option <()>,
/// Bogus debugging thing
internal_buffer: usize,
}
impl Encoder
{
/// If any encoded bytes are ready, and the caller's buffer
/// is big enough, copy encoded bytes into the caller's buffer,
/// consuming them from internal buffers.
///
/// Won't block
fn poll_encoded (&mut self, _buf_enc: &mut [u8]) -> bool
{
if let Some (_pkt) = self.pkt_enc.take ()
{
// Do copy
return true;
}
if self.internal_buffer >= 2
{
self.internal_buffer -= 2;
// Do copy
return true;
}
false
}
/// Insert raw data. If the internal buffers are full, the new
/// data is rejected.
/// Call poll_encoded before calling this, to drain internal
/// buffers if needed.
///
/// Blocks if the internals perform encoding
fn try_handle_insert_data (&mut self, _buf_raw: &[u8]) -> Result <(), ControlError>
{
if self.pkt_enc.is_some ()
{
return Err (ControlError::AlreadyHaveRawData);
}
if self.internal_buffer >= 3
{
return Err (ControlError::AlreadyHaveRawData);
}
self.internal_buffer += 3;
Ok (())
}
}
/// Wraps a network transmitter like a TCP send stream or a QUIC
/// outgoing uni stream
#[derive (Default)]
struct Transmitter
{
}
impl Transmitter
{
/// Tries to transmit data over the network.
/// If Transmitter sees congestion, it may reject the transmission
/// until a later time.
fn try_send (&mut self, _now: std::time::Instant, _buf: &[u8]) -> bool
{
false
}
}
struct BufEncoded
{
buf: Vec <u8>
}
impl fmt::Debug for BufEncoded {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f
.debug_struct("BufEncoded")
.finish()
}
}
struct BufRaw
{
buf: Vec <u8>
}
impl fmt::Debug for BufRaw {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f
.debug_struct("BufRaw")
.finish()
}
}
#[derive (Debug, Error)]
enum ControlError
{
#[error ("already have encoded data")]
AlreadyHaveEncodedData,
#[error ("already have raw data")]
AlreadyHaveRawData,
}