👕 refactor: extract decoder module

main
_ 2021-11-12 23:51:56 +00:00
parent a98fea30b4
commit 722821e2c4
2 changed files with 168 additions and 69 deletions

161
src/decoder.rs Normal file
View File

@ -0,0 +1,161 @@
use std::{
io::Cursor,
};
use anyhow::{
anyhow,
Result,
};
use byteorder::{
LittleEndian,
ReadBytesExt,
};
// This crate flitters between being very convenient and being a type labyrinth.
use ffmpeg_next::{
decoder::Audio as DecodeContext,
format::context::Input as DemuxContext,
software::resampling::Context as ResamplingContext,
};
pub const SAMPLE_RATE: u32 = 48000;
#[derive (Default)]
pub struct PcmBuffers {
buffers: Vec <Vec <f32>>,
// Always points into the first buffer, if any
consumer_cursor: usize,
}
impl PcmBuffers {
pub fn samples_available (&self) -> usize {
self.buffers.iter ().map (|b| b.len ()).sum::<usize> () - self.consumer_cursor
}
#[warn(unused_must_use)]
pub fn consume_exact (&mut self, data: &mut [f32]) -> bool {
if data.len () > self.samples_available () {
return false;
}
for x in data {
if self.consumer_cursor >= self.buffers [0].len () {
self.buffers.remove (0);
self.consumer_cursor = 0;
}
*x = self.buffers [0][self.consumer_cursor];
self.consumer_cursor += 1;
}
true
}
pub fn produce (&mut self, new_buffer: Vec <f32>) {
self.buffers.push (new_buffer);
}
pub fn produce_bytes (&mut self, new_buffer: &[u8]) {
let mut b = vec! [0.0f32; new_buffer.len () / 4];
let mut rdr = Cursor::new (new_buffer);
rdr.read_f32_into::<LittleEndian> (&mut b).unwrap ();
self.produce (b);
}
}
#[derive (Default)]
pub struct SharedState {
pub pcm_buffers: PcmBuffers,
pub quit: bool,
}
pub struct Decoder {
input_ctx: DemuxContext,
best_stream_idx: usize,
decoder: DecodeContext,
resampler: ResamplingContext,
}
impl Decoder {
pub fn new (filename: &str) -> Result <Self> {
let input_ctx = ffmpeg_next::format::input (&filename)?;
let stream = input_ctx
.streams ()
.best (ffmpeg_next::media::Type::Audio)
.ok_or_else (|| anyhow! ("can't find good audio stream"))?;
let best_stream_idx = stream.index ();
let decoder = stream.codec ().decoder ().audio ()?;
let resampler = decoder.resampler (
ffmpeg_next::util::format::sample::Sample::F32 (
ffmpeg_next::util::format::sample::Type::Packed,
),
ffmpeg_next::util::channel_layout::ChannelLayout::STEREO,
48000,
)?;
Ok (Self {
input_ctx,
best_stream_idx,
decoder,
resampler,
})
}
pub fn fill_buffer (&mut self, pcm_buffers: &mut PcmBuffers) -> Result <bool> {
match self.resampler.delay () {
Some (x) if x.milliseconds > 500 => {
// tracing::trace! ("flushing resampler");
let mut frame_resampled = ffmpeg_next::util::frame::Audio::empty ();
if self.resampler.flush (&mut frame_resampled).is_ok () {
pcm_buffers.produce_bytes (frame_resampled.data (0));
return Ok (true);
}
else {
// tracing::warn! ("resampler flushed out a zero-length frame?");
}
},
_ => {},
}
let mut frame_src = ffmpeg_next::util::frame::Audio::empty ();
if self.decoder.receive_frame (&mut frame_src).is_ok () {
//eprintln! ("decoder.receive_frame");
let mut frame_resampled = ffmpeg_next::util::frame::Audio::empty ();
self.resampler.run (&frame_src, &mut frame_resampled)?;
pcm_buffers.produce_bytes (frame_resampled.data (0));
return Ok (true);
}
//eprintln! ("Decoder is dry, fetching a new packet...");
while let Some ((stream, packet)) = self.input_ctx.packets ().next () {
if stream.index () == self.best_stream_idx {
//eprintln! ("decoder.send_packet");
self.decoder.send_packet (&packet)?;
return Ok (true);
}
}
//eprintln! ("Decoder ran out of work");
if self.resampler.delay ().is_some () {
tracing::trace! ("flushing resampler");
let mut frame_resampled = ffmpeg_next::util::frame::Audio::empty ();
if self.resampler.flush (&mut frame_resampled).is_ok () {
//eprintln! ("resampler.flush");
pcm_buffers.produce_bytes (frame_resampled.data (0));
return Ok (true);
}
}
Ok (false)
}
}

View File

@ -1,5 +1,4 @@
use std::{ use std::{
io::Cursor,
sync::{ sync::{
Arc, Arc,
Condvar, Condvar,
@ -14,80 +13,21 @@ use std::{
use anyhow::{ use anyhow::{
anyhow, anyhow,
//bail,
Result, Result,
}; };
use byteorder::{
LittleEndian,
ReadBytesExt,
};
use cpal::traits::{ use cpal::traits::{
DeviceTrait, DeviceTrait,
HostTrait, HostTrait,
StreamTrait,
}; };
const SAMPLE_RATE: u32 = 48000; mod decoder;
#[derive (Default)]
struct PcmBuffers {
buffers: Vec <Vec <f32>>,
// Always points into the first buffer, if any
consumer_cursor: usize,
}
impl PcmBuffers {
fn samples_available (&self) -> usize {
self.buffers.iter ().map (|b| b.len ()).sum::<usize> () - self.consumer_cursor
}
#[warn(unused_must_use)]
fn consume_exact (&mut self, data: &mut [f32]) -> bool {
if data.len () > self.samples_available () {
return false;
}
for x in data {
if self.consumer_cursor >= self.buffers [0].len () {
self.buffers.remove (0);
self.consumer_cursor = 0;
}
*x = self.buffers [0][self.consumer_cursor];
self.consumer_cursor += 1;
}
true
}
fn produce (&mut self, new_buffer: Vec <f32>) {
self.buffers.push (new_buffer);
}
fn produce_bytes (&mut self, new_buffer: &[u8]) {
let mut b = vec! [0.0f32; new_buffer.len () / 4];
let mut rdr = Cursor::new (new_buffer);
rdr.read_f32_into::<LittleEndian> (&mut b).unwrap ();
self.produce (b);
}
}
fn main () -> Result <()> { fn main () -> Result <()> {
#[derive (Default)]
struct DecoderState {
pcm_buffers: PcmBuffers,
quit: bool,
}
tracing_subscriber::fmt::init (); tracing_subscriber::fmt::init ();
tracing::error! ("frik"); tracing::error! ("frik");
let pair = Arc::new ((Mutex::new (DecoderState::default ()), Condvar::new ())); let pair = Arc::new ((Mutex::new (decoder::SharedState::default ()), Condvar::new ()));
let pair2 = Arc::clone (&pair); let pair2 = Arc::clone (&pair);
let pair3 = Arc::clone (&pair); let pair3 = Arc::clone (&pair);
@ -166,7 +106,7 @@ fn main () -> Result <()> {
//eprintln! ("Decoder is dry, fetching a new packet..."); //eprintln! ("Decoder is dry, fetching a new packet...");
'get_packet: while let Some ((stream, packet)) = packets.next () { while let Some ((stream, packet)) = packets.next () {
if stream.index () == best_stream_idx { if stream.index () == best_stream_idx {
//eprintln! ("decoder.send_packet"); //eprintln! ("decoder.send_packet");
decoder.send_packet (&packet)?; decoder.send_packet (&packet)?;
@ -218,7 +158,7 @@ fn main () -> Result <()> {
let config = supported_configs_range.next () let config = supported_configs_range.next ()
.ok_or_else (|| anyhow! ("can't get stereo f32 audio output"))? .ok_or_else (|| anyhow! ("can't get stereo f32 audio output"))?
.with_sample_rate (cpal::SampleRate (SAMPLE_RATE)) .with_sample_rate (cpal::SampleRate (decoder::SAMPLE_RATE))
.config (); .config ();
dbg! (config.clone ()); dbg! (config.clone ());
@ -253,13 +193,11 @@ fn main () -> Result <()> {
cvar.notify_one (); cvar.notify_one ();
}, },
move |err| { move |_err| {
// react to errors here. // react to errors here.
}, },
); );
let mut stream = Some (stream);
// sleep (std::time::Duration::from_secs (3 * 60 + 40)); // sleep (std::time::Duration::from_secs (3 * 60 + 40));
tracing::debug! ("Joining decoder thread..."); tracing::debug! ("Joining decoder thread...");
@ -275,9 +213,9 @@ fn main () -> Result <()> {
tracing::debug! ("Joining PCM thread..."); tracing::debug! ("Joining PCM thread...");
let (lock, cvar) = &*pcm_quit2; let (lock, cvar) = &*pcm_quit2;
cvar.wait (lock.lock ().unwrap ()); let _ = cvar.wait (lock.lock ().unwrap ()).unwrap ();
stream = None; drop (stream);
sleep (Duration::from_secs (1)); sleep (Duration::from_secs (1));