fix some issues

main
_ 2021-11-12 15:54:24 -06:00
parent b11c4ac0b8
commit ceb06dad16
1 changed files with 100 additions and 58 deletions

View File

@ -62,6 +62,15 @@ impl PcmBuffers {
fn produce (&mut self, new_buffer: Vec <f32>) { fn produce (&mut self, new_buffer: Vec <f32>) {
self.buffers.push (new_buffer); self.buffers.push (new_buffer);
} }
fn produce_bytes (&mut self, new_buffer: &[u8]) {
let mut b = vec! [0.0f32; new_buffer.len () / 4];
let mut rdr = Cursor::new (new_buffer);
rdr.read_f32_into::<LittleEndian> (&mut b).unwrap ();
self.produce (b);
}
} }
fn main () -> Result <()> { fn main () -> Result <()> {
@ -76,18 +85,17 @@ fn main () -> Result <()> {
.with_sample_rate (cpal::SampleRate (SAMPLE_RATE)) .with_sample_rate (cpal::SampleRate (SAMPLE_RATE))
.config (); .config ();
let pair = Arc::new ((Mutex::new (PcmBuffers::default ()), Condvar::new ())); #[derive (Default)]
struct DecoderState {
pcm_buffers: PcmBuffers,
quit: bool,
}
let pair = Arc::new ((Mutex::new (DecoderState::default ()), Condvar::new ()));
let pair2 = Arc::clone (&pair); let pair2 = Arc::clone (&pair);
let pair3 = Arc::clone (&pair);
let thread_decoder = thread::spawn (move|| { let thread_decoder = thread::spawn (move|| {
/*
Pipeline:
- demuxer
- decoder
- resampler
- cpal
*/
let (lock, cvar) = &*pair2; let (lock, cvar) = &*pair2;
let mut input_ctx = ffmpeg_next::format::input (&"test.m4a")?; let mut input_ctx = ffmpeg_next::format::input (&"test.m4a")?;
@ -105,81 +113,115 @@ fn main () -> Result <()> {
ffmpeg_next::util::channel_layout::ChannelLayout::STEREO, ffmpeg_next::util::channel_layout::ChannelLayout::STEREO,
48000, 48000,
)?; )?;
let mut frame = ffmpeg_next::util::frame::Audio::empty ();
let mut frame_src = ffmpeg_next::util::frame::Audio::empty ();
let mut frame_resampled = ffmpeg_next::util::frame::Audio::empty ();
let mut packets = input_ctx.packets (); let mut packets = input_ctx.packets ();
loop { 'decoder_thread: loop {
// eprintln! ("decode thread parking"); // eprintln! ("decode thread parking");
let mut pcm_buffers = cvar.wait_while (lock.lock ().unwrap (), |pcm_buffers| { let mut decoder_state = cvar.wait_while (lock.lock ().unwrap (), |decoder_state| {
// eprintln! ("checking cvar..."); decoder_state.pcm_buffers.samples_available () >= 24_000 &&
pcm_buffers.samples_available () >= 48_000 ! decoder_state.quit
}).unwrap (); }).unwrap ();
let mut did_anything = false; if decoder_state.quit {
break 'decoder_thread;
}
//dbg! (resampler.delay ());
let pcm_buffers = &mut decoder_state.pcm_buffers;
while decoder.receive_frame (&mut frame).is_ok () { 'fill_buffer: while pcm_buffers.samples_available () < 48_000 {
did_anything = true; //eprintln! ("Decoder is trying to work...");
let mut buffer = vec! [0.0f32; frame.plane::<f32> (0).len () * 2];
let mut rdr_left = frame.plane::<f32> (0).iter (); match resampler.delay () {
let mut rdr_right = frame.plane::<f32> (1).iter (); Some (x) if x.milliseconds > 500 => {
eprintln! ("flushing resampler");
for x in buffer.chunks_mut (2) { if let Some (_) = resampler.flush (&mut frame_resampled)? {
x [0] = *rdr_left.next ().unwrap (); pcm_buffers.produce_bytes (frame_resampled.data (0));
x [1] = *rdr_right.next ().unwrap (); continue 'fill_buffer;
}
},
_ => {},
} }
pcm_buffers.produce (buffer); if decoder.receive_frame (&mut frame_src).is_ok () {
} //eprintln! ("decoder.receive_frame");
resampler.run (&frame_src, &mut frame_resampled)?;
pcm_buffers.produce_bytes (frame_resampled.data (0));
continue 'fill_buffer;
}
match packets.next () { //eprintln! ("Decoder is dry, fetching a new packet...");
None => {},
Some ((stream, packet)) => { 'get_packet: while let Some ((stream, packet)) = packets.next () {
did_anything = true;
if stream.index () == best_stream_idx { if stream.index () == best_stream_idx {
//eprintln! ("decoder.send_packet");
decoder.send_packet (&packet)?; decoder.send_packet (&packet)?;
continue 'fill_buffer;
} }
}, }
}
if ! did_anything { //eprintln! ("Decoder ran out of work");
break;
if resampler.delay ().is_some () {
eprintln! ("flushing resampler");
if let Some (_) = resampler.flush (&mut frame_resampled)? {
//eprintln! ("resampler.flush");
pcm_buffers.produce_bytes (frame_resampled.data (0));
continue 'fill_buffer;
}
}
break 'fill_buffer;
} }
} }
Ok::<_, anyhow::Error> (()) Ok::<_, anyhow::Error> (())
}); });
let stream = device.build_output_stream ( {
&config, let stream = device.build_output_stream (
move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { &config,
let (lock, cvar) = &*pair; move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
let (lock, cvar) = &*pair;
let mut pcm_buffers = match lock.lock () { let mut decoder_state = match lock.lock () {
Ok (x) => x, Ok (x) => x,
Err (_) => return, Err (_) => return,
}; };
if ! pcm_buffers.consume_exact (data) { let pcm_buffers = &mut decoder_state.pcm_buffers;
eprintln! ("PCM buffer underflow");
} if ! pcm_buffers.consume_exact (data) {
eprintln! ("PCM buffer underflow");
}
if pcm_buffers.samples_available () < 24_000 {
cvar.notify_one (); cvar.notify_one ();
} },
}, move |err| {
move |err| { // react to errors here.
// react to errors here. },
}, );
);
std::thread::sleep (std::time::Duration::from_millis (1_000)); std::thread::sleep (std::time::Duration::from_millis (180_000));
}
eprintln! ("Joining decoder thread");
{
let mut decoder_state = pair3.0.lock ().unwrap ();
decoder_state.quit = true;
pair3.1.notify_one ();
}
thread_decoder.join ().unwrap ()?; thread_decoder.join ().unwrap ()?;
eprintln! ("Joined decoder thread");
Ok (()) Ok (())
} }