use std::{ sync::{ Arc, Condvar, Mutex, }, thread::{ self, sleep, }, time::Duration, }; use anyhow::{ anyhow, bail, Result, }; use cpal::traits::{ DeviceTrait, HostTrait, }; mod decoder; fn main () -> Result <()> { let args: Vec <_> = std::env::args ().collect (); match args.get (1).map (|s| &s[..]) { None => bail! ("First argument must be a subcommand like `play`"), Some ("debug") => cmd_debug (&args [1..]), Some ("play") => cmd_play (&args [1..]), Some (_) => bail! ("Unrecognized subcommand"), } } fn cmd_debug (args: &[String]) -> Result <()> { tracing_subscriber::fmt::init (); let filename = args.get (1) .map (|s| s.to_string ()) .unwrap_or_else (|| "test-short.m4a".to_string ()); let mut decoder = decoder::Decoder::new (&filename)?; let mut pcm_buffers = decoder::PcmBuffers::default (); tracing::debug! ("Constructed decoder"); sleep (Duration::from_secs (3)); tracing::debug! ("Decoding..."); 'decoding: loop { while pcm_buffers.samples_available () < 48_000 { if ! decoder.fill_buffer (&mut pcm_buffers)? { tracing::info! ("Decoder finished"); break 'decoding; } } while pcm_buffers.samples_available () > 0 { let mut buf = vec! [0.0f32; pcm_buffers.samples_available ()]; pcm_buffers.consume_exact (&mut buf); } } sleep (Duration::from_secs (3)); tracing::debug! ("Dropping resampler..."); dbg! (decoder.resampler.delay ()); drop (decoder.resampler); sleep (Duration::from_secs (3)); tracing::debug! ("Dropping decoder..."); drop (decoder.decoder); sleep (Duration::from_secs (3)); tracing::debug! ("Dropping input_ctx..."); drop (decoder.input_ctx); sleep (Duration::from_secs (3)); Ok (()) } fn cmd_play (args: &[String]) -> Result <()> { tracing_subscriber::fmt::init (); let filename = args.get (1) .map (|s| s.to_string ()) .unwrap_or_else (|| "test-short.m4a".to_string ()); let pair = Arc::new ((Mutex::new (decoder::SharedState::default ()), Condvar::new ())); let pair2 = Arc::clone (&pair); let pair3 = Arc::clone (&pair); let thread_decoder = thread::spawn (move|| { let (lock, cvar) = &*pair2; let mut decoder = decoder::Decoder::new (&filename)?; 'decoder_thread: loop { // tracing::trace! ("decode thread parking"); let mut decoder_state = cvar.wait_while (lock.lock ().unwrap (), |decoder_state| { decoder_state.pcm_buffers.samples_available () >= 12_000 && ! decoder_state.quit }).unwrap (); if decoder_state.quit { break 'decoder_thread; } //dbg! (resampler.delay ()); let pcm_buffers = &mut decoder_state.pcm_buffers; while pcm_buffers.samples_available () < 24_000 { // tracing::trace! ("Decoder is trying to work..."); if ! decoder.fill_buffer (pcm_buffers)? { tracing::info! ("Decoder thread is out of work, quitting"); break 'decoder_thread; } } } Ok::<_, anyhow::Error> (()) }); let host = cpal::default_host (); let device = host.default_output_device ().ok_or_else (|| anyhow! ("can't open cpal device"))?; let mut supported_configs_range = device.supported_output_configs ()? .filter (|c| c.channels () == 2 && c.sample_format () == cpal::SampleFormat::F32); let config = supported_configs_range.next () .ok_or_else (|| anyhow! ("can't get stereo f32 audio output"))? .with_sample_rate (cpal::SampleRate (decoder::SAMPLE_RATE)) .config (); let pcm_quit = Arc::new ((Mutex::new (false), Condvar::new ())); let pcm_quit2 = Arc::clone (&pcm_quit); let stream = device.build_output_stream ( &config, move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { let (lock, cvar) = &*pair; let mut decoder_state = match lock.lock () { Ok (x) => x, Err (_) => return, }; let pcm_buffers = &mut decoder_state.pcm_buffers; if ! pcm_buffers.consume_exact (data) { // tracing::warn! ("PCM buffer underflow"); for x in data { *x = 0.0; } let (lock, cvar) = &*pcm_quit; let mut pcm_quit = lock.lock ().unwrap (); *pcm_quit = true; cvar.notify_one (); } cvar.notify_one (); }, move |_err| { // react to errors here. }, ); // sleep (std::time::Duration::from_secs (3 * 60 + 40)); tracing::debug! ("Joining decoder thread..."); if false { let mut decoder_state = pair3.0.lock ().unwrap (); decoder_state.quit = true; pair3.1.notify_one (); } thread_decoder.join ().unwrap ()?; tracing::debug! ("Joining PCM thread..."); let (lock, cvar) = &*pcm_quit2; let _ = cvar.wait (lock.lock ().unwrap ()).unwrap (); drop (stream); sleep (Duration::from_secs (1)); tracing::info! ("Exiting cleanly."); Ok (()) } #[cfg (test)] mod test { use super::*; #[test] fn decode () -> Result <()> { ffmpeg_next::init ()?; let mut input_ctx = ffmpeg_next::format::input (&"test.opus")?; let stream = input_ctx .streams () .best (ffmpeg_next::media::Type::Audio) .ok_or_else (|| anyhow! ("can't find good audio stream"))?; let best_stream_idx = stream.index (); let mut decoder = stream.codec ().decoder ().audio ()?; let mut packet_count = 0; let mut frame_count = 0; let mut frame = ffmpeg_next::util::frame::Audio::empty (); let mut packets = input_ctx.packets (); loop { let mut did_anything = false; while decoder.receive_frame (&mut frame).is_ok () { frame_count += 1; did_anything = true; } match packets.next () { None => {}, Some ((stream, packet)) => { did_anything = true; if stream.index () == best_stream_idx { packet_count += 1; decoder.send_packet (&packet)?; } }, } if ! did_anything { break; } } assert_eq! (packet_count, 5496); assert_eq! (frame_count, 5496); Ok (()) } }