diff --git a/src/main.rs b/src/main.rs index 722274f..b6a5d90 100644 --- a/src/main.rs +++ b/src/main.rs @@ -62,6 +62,15 @@ impl PcmBuffers { fn produce (&mut self, new_buffer: Vec ) { self.buffers.push (new_buffer); } + + fn produce_bytes (&mut self, new_buffer: &[u8]) { + let mut b = vec! [0.0f32; new_buffer.len () / 4]; + let mut rdr = Cursor::new (new_buffer); + + rdr.read_f32_into:: (&mut b).unwrap (); + + self.produce (b); + } } fn main () -> Result <()> { @@ -76,18 +85,17 @@ fn main () -> Result <()> { .with_sample_rate (cpal::SampleRate (SAMPLE_RATE)) .config (); - let pair = Arc::new ((Mutex::new (PcmBuffers::default ()), Condvar::new ())); + #[derive (Default)] + struct DecoderState { + pcm_buffers: PcmBuffers, + quit: bool, + } + + let pair = Arc::new ((Mutex::new (DecoderState::default ()), Condvar::new ())); let pair2 = Arc::clone (&pair); + let pair3 = Arc::clone (&pair); let thread_decoder = thread::spawn (move|| { - /* - Pipeline: - - demuxer - - decoder - - resampler - - cpal - */ - let (lock, cvar) = &*pair2; let mut input_ctx = ffmpeg_next::format::input (&"test.m4a")?; @@ -105,81 +113,115 @@ fn main () -> Result <()> { ffmpeg_next::util::channel_layout::ChannelLayout::STEREO, 48000, )?; - let mut frame = ffmpeg_next::util::frame::Audio::empty (); + + let mut frame_src = ffmpeg_next::util::frame::Audio::empty (); + let mut frame_resampled = ffmpeg_next::util::frame::Audio::empty (); let mut packets = input_ctx.packets (); - loop { + 'decoder_thread: loop { // eprintln! ("decode thread parking"); - let mut pcm_buffers = cvar.wait_while (lock.lock ().unwrap (), |pcm_buffers| { - // eprintln! ("checking cvar..."); - pcm_buffers.samples_available () >= 48_000 + let mut decoder_state = cvar.wait_while (lock.lock ().unwrap (), |decoder_state| { + decoder_state.pcm_buffers.samples_available () >= 24_000 && + ! decoder_state.quit }).unwrap (); - let mut did_anything = false; + if decoder_state.quit { + break 'decoder_thread; + } + //dbg! (resampler.delay ()); + let pcm_buffers = &mut decoder_state.pcm_buffers; - while decoder.receive_frame (&mut frame).is_ok () { - did_anything = true; - let mut buffer = vec! [0.0f32; frame.plane:: (0).len () * 2]; + 'fill_buffer: while pcm_buffers.samples_available () < 48_000 { + //eprintln! ("Decoder is trying to work..."); - let mut rdr_left = frame.plane:: (0).iter (); - let mut rdr_right = frame.plane:: (1).iter (); - - for x in buffer.chunks_mut (2) { - x [0] = *rdr_left.next ().unwrap (); - x [1] = *rdr_right.next ().unwrap (); + match resampler.delay () { + Some (x) if x.milliseconds > 500 => { + eprintln! ("flushing resampler"); + if let Some (_) = resampler.flush (&mut frame_resampled)? { + pcm_buffers.produce_bytes (frame_resampled.data (0)); + continue 'fill_buffer; + } + }, + _ => {}, } - pcm_buffers.produce (buffer); - } - - match packets.next () { - None => {}, - Some ((stream, packet)) => { - did_anything = true; + if decoder.receive_frame (&mut frame_src).is_ok () { + //eprintln! ("decoder.receive_frame"); + resampler.run (&frame_src, &mut frame_resampled)?; + pcm_buffers.produce_bytes (frame_resampled.data (0)); + continue 'fill_buffer; + } + + //eprintln! ("Decoder is dry, fetching a new packet..."); + + 'get_packet: while let Some ((stream, packet)) = packets.next () { if stream.index () == best_stream_idx { + //eprintln! ("decoder.send_packet"); decoder.send_packet (&packet)?; + continue 'fill_buffer; } - }, - } - - if ! did_anything { - break; + } + + //eprintln! ("Decoder ran out of work"); + + if resampler.delay ().is_some () { + eprintln! ("flushing resampler"); + if let Some (_) = resampler.flush (&mut frame_resampled)? { + //eprintln! ("resampler.flush"); + pcm_buffers.produce_bytes (frame_resampled.data (0)); + continue 'fill_buffer; + } + } + + break 'fill_buffer; } } Ok::<_, anyhow::Error> (()) }); - let stream = device.build_output_stream ( - &config, - move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { - let (lock, cvar) = &*pair; - - let mut pcm_buffers = match lock.lock () { - Ok (x) => x, - Err (_) => return, - }; - - if ! pcm_buffers.consume_exact (data) { - eprintln! ("PCM buffer underflow"); - } - - if pcm_buffers.samples_available () < 24_000 { + { + let stream = device.build_output_stream ( + &config, + move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { + let (lock, cvar) = &*pair; + + let mut decoder_state = match lock.lock () { + Ok (x) => x, + Err (_) => return, + }; + + let pcm_buffers = &mut decoder_state.pcm_buffers; + + if ! pcm_buffers.consume_exact (data) { + eprintln! ("PCM buffer underflow"); + } + cvar.notify_one (); - } - }, - move |err| { - // react to errors here. - }, - ); + }, + move |err| { + // react to errors here. + }, + ); + + std::thread::sleep (std::time::Duration::from_millis (180_000)); + } - std::thread::sleep (std::time::Duration::from_millis (1_000)); + eprintln! ("Joining decoder thread"); + + { + let mut decoder_state = pair3.0.lock ().unwrap (); + decoder_state.quit = true; + pair3.1.notify_one (); + } thread_decoder.join ().unwrap ()?; + eprintln! ("Joined decoder thread"); + Ok (()) }