From 722821e2c462df1133f551f44a029adfe454a5bc Mon Sep 17 00:00:00 2001 From: _ <> Date: Fri, 12 Nov 2021 23:51:56 +0000 Subject: [PATCH] :shirt: refactor: extract decoder module --- src/decoder.rs | 161 +++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 76 +++-------------------- 2 files changed, 168 insertions(+), 69 deletions(-) create mode 100644 src/decoder.rs diff --git a/src/decoder.rs b/src/decoder.rs new file mode 100644 index 0000000..8a9d04c --- /dev/null +++ b/src/decoder.rs @@ -0,0 +1,161 @@ +use std::{ + io::Cursor, +}; + +use anyhow::{ + anyhow, + Result, +}; + +use byteorder::{ + LittleEndian, + ReadBytesExt, +}; + +// This crate flitters between being very convenient and being a type labyrinth. + +use ffmpeg_next::{ + decoder::Audio as DecodeContext, + format::context::Input as DemuxContext, + software::resampling::Context as ResamplingContext, +}; + +pub const SAMPLE_RATE: u32 = 48000; + +#[derive (Default)] +pub struct PcmBuffers { + buffers: Vec >, + + // Always points into the first buffer, if any + consumer_cursor: usize, +} + +impl PcmBuffers { + pub fn samples_available (&self) -> usize { + self.buffers.iter ().map (|b| b.len ()).sum:: () - self.consumer_cursor + } + + #[warn(unused_must_use)] + pub fn consume_exact (&mut self, data: &mut [f32]) -> bool { + if data.len () > self.samples_available () { + return false; + } + + for x in data { + if self.consumer_cursor >= self.buffers [0].len () { + self.buffers.remove (0); + self.consumer_cursor = 0; + } + + *x = self.buffers [0][self.consumer_cursor]; + self.consumer_cursor += 1; + } + + true + } + + pub fn produce (&mut self, new_buffer: Vec ) { + self.buffers.push (new_buffer); + } + + pub fn produce_bytes (&mut self, new_buffer: &[u8]) { + let mut b = vec! [0.0f32; new_buffer.len () / 4]; + let mut rdr = Cursor::new (new_buffer); + + rdr.read_f32_into:: (&mut b).unwrap (); + + self.produce (b); + } +} + +#[derive (Default)] +pub struct SharedState { + pub pcm_buffers: PcmBuffers, + pub quit: bool, +} + +pub struct Decoder { + input_ctx: DemuxContext, + best_stream_idx: usize, + decoder: DecodeContext, + resampler: ResamplingContext, +} + +impl Decoder { + pub fn new (filename: &str) -> Result { + let input_ctx = ffmpeg_next::format::input (&filename)?; + let stream = input_ctx + .streams () + .best (ffmpeg_next::media::Type::Audio) + .ok_or_else (|| anyhow! ("can't find good audio stream"))?; + let best_stream_idx = stream.index (); + + let decoder = stream.codec ().decoder ().audio ()?; + let resampler = decoder.resampler ( + ffmpeg_next::util::format::sample::Sample::F32 ( + ffmpeg_next::util::format::sample::Type::Packed, + ), + ffmpeg_next::util::channel_layout::ChannelLayout::STEREO, + 48000, + )?; + + Ok (Self { + input_ctx, + best_stream_idx, + decoder, + resampler, + }) + } + + pub fn fill_buffer (&mut self, pcm_buffers: &mut PcmBuffers) -> Result { + match self.resampler.delay () { + Some (x) if x.milliseconds > 500 => { + // tracing::trace! ("flushing resampler"); + let mut frame_resampled = ffmpeg_next::util::frame::Audio::empty (); + + if self.resampler.flush (&mut frame_resampled).is_ok () { + pcm_buffers.produce_bytes (frame_resampled.data (0)); + return Ok (true); + } + else { + // tracing::warn! ("resampler flushed out a zero-length frame?"); + } + }, + _ => {}, + } + + let mut frame_src = ffmpeg_next::util::frame::Audio::empty (); + if self.decoder.receive_frame (&mut frame_src).is_ok () { + //eprintln! ("decoder.receive_frame"); + let mut frame_resampled = ffmpeg_next::util::frame::Audio::empty (); + self.resampler.run (&frame_src, &mut frame_resampled)?; + pcm_buffers.produce_bytes (frame_resampled.data (0)); + return Ok (true); + } + + //eprintln! ("Decoder is dry, fetching a new packet..."); + + while let Some ((stream, packet)) = self.input_ctx.packets ().next () { + if stream.index () == self.best_stream_idx { + //eprintln! ("decoder.send_packet"); + self.decoder.send_packet (&packet)?; + return Ok (true); + } + } + + //eprintln! ("Decoder ran out of work"); + + if self.resampler.delay ().is_some () { + tracing::trace! ("flushing resampler"); + let mut frame_resampled = ffmpeg_next::util::frame::Audio::empty (); + + if self.resampler.flush (&mut frame_resampled).is_ok () { + //eprintln! ("resampler.flush"); + pcm_buffers.produce_bytes (frame_resampled.data (0)); + return Ok (true); + } + } + + Ok (false) + } +} diff --git a/src/main.rs b/src/main.rs index 15df48a..0262230 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,4 @@ use std::{ - io::Cursor, sync::{ Arc, Condvar, @@ -14,80 +13,21 @@ use std::{ use anyhow::{ anyhow, - //bail, Result, }; -use byteorder::{ - LittleEndian, - ReadBytesExt, -}; - use cpal::traits::{ DeviceTrait, HostTrait, - StreamTrait, }; -const SAMPLE_RATE: u32 = 48000; - -#[derive (Default)] -struct PcmBuffers { - buffers: Vec >, - - // Always points into the first buffer, if any - consumer_cursor: usize, -} - -impl PcmBuffers { - fn samples_available (&self) -> usize { - self.buffers.iter ().map (|b| b.len ()).sum:: () - self.consumer_cursor - } - - #[warn(unused_must_use)] - fn consume_exact (&mut self, data: &mut [f32]) -> bool { - if data.len () > self.samples_available () { - return false; - } - - for x in data { - if self.consumer_cursor >= self.buffers [0].len () { - self.buffers.remove (0); - self.consumer_cursor = 0; - } - - *x = self.buffers [0][self.consumer_cursor]; - self.consumer_cursor += 1; - } - - true - } - - fn produce (&mut self, new_buffer: Vec ) { - self.buffers.push (new_buffer); - } - - fn produce_bytes (&mut self, new_buffer: &[u8]) { - let mut b = vec! [0.0f32; new_buffer.len () / 4]; - let mut rdr = Cursor::new (new_buffer); - - rdr.read_f32_into:: (&mut b).unwrap (); - - self.produce (b); - } -} +mod decoder; fn main () -> Result <()> { - #[derive (Default)] - struct DecoderState { - pcm_buffers: PcmBuffers, - quit: bool, - } - tracing_subscriber::fmt::init (); tracing::error! ("frik"); - let pair = Arc::new ((Mutex::new (DecoderState::default ()), Condvar::new ())); + let pair = Arc::new ((Mutex::new (decoder::SharedState::default ()), Condvar::new ())); let pair2 = Arc::clone (&pair); let pair3 = Arc::clone (&pair); @@ -166,7 +106,7 @@ fn main () -> Result <()> { //eprintln! ("Decoder is dry, fetching a new packet..."); - 'get_packet: while let Some ((stream, packet)) = packets.next () { + while let Some ((stream, packet)) = packets.next () { if stream.index () == best_stream_idx { //eprintln! ("decoder.send_packet"); decoder.send_packet (&packet)?; @@ -218,7 +158,7 @@ fn main () -> Result <()> { let config = supported_configs_range.next () .ok_or_else (|| anyhow! ("can't get stereo f32 audio output"))? - .with_sample_rate (cpal::SampleRate (SAMPLE_RATE)) + .with_sample_rate (cpal::SampleRate (decoder::SAMPLE_RATE)) .config (); dbg! (config.clone ()); @@ -253,13 +193,11 @@ fn main () -> Result <()> { cvar.notify_one (); }, - move |err| { + move |_err| { // react to errors here. }, ); - let mut stream = Some (stream); - // sleep (std::time::Duration::from_secs (3 * 60 + 40)); tracing::debug! ("Joining decoder thread..."); @@ -275,9 +213,9 @@ fn main () -> Result <()> { tracing::debug! ("Joining PCM thread..."); let (lock, cvar) = &*pcm_quit2; - cvar.wait (lock.lock ().unwrap ()); + let _ = cvar.wait (lock.lock ().unwrap ()).unwrap (); - stream = None; + drop (stream); sleep (Duration::from_secs (1));