use std::{ io::Cursor, sync::{ Arc, Condvar, Mutex, }, thread, }; use anyhow::{ anyhow, //bail, Result, }; use byteorder::{ LittleEndian, ReadBytesExt, }; use cpal::traits::{ DeviceTrait, HostTrait, StreamTrait, }; const SAMPLE_RATE: u32 = 48000; #[derive (Default)] struct PcmBuffers { buffers: Vec >, // Always points into the first buffer, if any consumer_cursor: usize, } impl PcmBuffers { fn samples_available (&self) -> usize { self.buffers.iter ().map (|b| b.len ()).sum:: () - self.consumer_cursor } #[warn(unused_must_use)] fn consume_exact (&mut self, data: &mut [f32]) -> bool { if data.len () > self.samples_available () { return false; } for x in data { if self.consumer_cursor >= self.buffers [0].len () { self.buffers.remove (0); self.consumer_cursor = 0; } *x = self.buffers [0][self.consumer_cursor]; self.consumer_cursor += 1; } true } fn produce (&mut self, new_buffer: Vec ) { self.buffers.push (new_buffer); } } fn main () -> Result <()> { let host = cpal::default_host (); let device = host.default_output_device ().ok_or_else (|| anyhow! ("can't open cpal device"))?; let mut supported_configs_range = device.supported_output_configs ()? .filter (|c| c.channels () == 2 && c.sample_format () == cpal::SampleFormat::F32); let config = supported_configs_range.next () .ok_or_else (|| anyhow! ("can't get stereo f32 audio output"))? .with_sample_rate (cpal::SampleRate (SAMPLE_RATE)) .config (); let pair = Arc::new ((Mutex::new (PcmBuffers::default ()), Condvar::new ())); let pair2 = Arc::clone (&pair); let thread_decoder = thread::spawn (move|| { /* Pipeline: - demuxer - decoder - resampler - cpal */ let (lock, cvar) = &*pair2; let mut input_ctx = ffmpeg_next::format::input (&"test.m4a")?; let stream = input_ctx .streams () .best (ffmpeg_next::media::Type::Audio) .ok_or_else (|| anyhow! ("can't find good audio stream"))?; let best_stream_idx = stream.index (); let mut decoder = stream.codec ().decoder ().audio ()?; let mut resampler = decoder.resampler ( ffmpeg_next::util::format::sample::Sample::F32 ( ffmpeg_next::util::format::sample::Type::Packed, ), ffmpeg_next::util::channel_layout::ChannelLayout::STEREO, 48000, )?; let mut frame = ffmpeg_next::util::frame::Audio::empty (); let mut packets = input_ctx.packets (); loop { // eprintln! ("decode thread parking"); let mut pcm_buffers = cvar.wait_while (lock.lock ().unwrap (), |pcm_buffers| { // eprintln! ("checking cvar..."); pcm_buffers.samples_available () >= 48_000 }).unwrap (); let mut did_anything = false; while decoder.receive_frame (&mut frame).is_ok () { did_anything = true; let mut buffer = vec! [0.0f32; frame.plane:: (0).len () * 2]; let mut rdr_left = frame.plane:: (0).iter (); let mut rdr_right = frame.plane:: (1).iter (); for x in buffer.chunks_mut (2) { x [0] = *rdr_left.next ().unwrap (); x [1] = *rdr_right.next ().unwrap (); } pcm_buffers.produce (buffer); } match packets.next () { None => {}, Some ((stream, packet)) => { did_anything = true; if stream.index () == best_stream_idx { decoder.send_packet (&packet)?; } }, } if ! did_anything { break; } } Ok::<_, anyhow::Error> (()) }); let stream = device.build_output_stream ( &config, move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { let (lock, cvar) = &*pair; let mut pcm_buffers = match lock.lock () { Ok (x) => x, Err (_) => return, }; if ! pcm_buffers.consume_exact (data) { eprintln! ("PCM buffer underflow"); } if pcm_buffers.samples_available () < 24_000 { cvar.notify_one (); } }, move |err| { // react to errors here. }, ); std::thread::sleep (std::time::Duration::from_millis (1_000)); thread_decoder.join ().unwrap ()?; Ok (()) } #[cfg (test)] mod test { use super::*; #[test] fn decode () -> Result <()> { ffmpeg_next::init ()?; let mut input_ctx = ffmpeg_next::format::input (&"test.opus")?; let stream = input_ctx .streams () .best (ffmpeg_next::media::Type::Audio) .ok_or_else (|| anyhow! ("can't find good audio stream"))?; let best_stream_idx = stream.index (); let mut decoder = stream.codec ().decoder ().audio ()?; let mut packet_count = 0; let mut frame_count = 0; let mut frame = ffmpeg_next::util::frame::Audio::empty (); let mut packets = input_ctx.packets (); loop { let mut did_anything = false; while decoder.receive_frame (&mut frame).is_ok () { frame_count += 1; did_anything = true; } match packets.next () { None => {}, Some ((stream, packet)) => { did_anything = true; if stream.index () == best_stream_idx { packet_count += 1; decoder.send_packet (&packet)?; } }, } if ! did_anything { break; } } assert_eq! (packet_count, 5496); assert_eq! (frame_count, 5496); Ok (()) } }