use std::{ fs::File, sync::{ Arc, Condvar, Mutex, mpsc, }, thread::{ self, }, time::{Instant}, }; use anyhow::{ anyhow, bail, Result, }; use cpal::traits::{ DeviceTrait, HostTrait, StreamTrait, }; use fltk::{ app, button::Button, enums::CallbackTrigger, frame::Frame, group::Flex, prelude::*, window::Window, }; mod decoder; mod net_reader; const BUFFER_SIZE: usize = 5_000; fn main () -> Result <()> { let args: Vec <_> = std::env::args ().collect (); match args.get (1).map (|s| &s[..]) { None => bail! ("First argument must be a subcommand like `play`"), Some ("debug-net") => cmd_debug_net (&args [1..]), Some ("gui") => cmd_gui (&args [1..]), Some ("play") => cmd_play (&args [1..]), Some (_) => bail! ("Unrecognized subcommand"), } } fn cmd_debug_net (args: &[String]) -> Result <()> { tracing_subscriber::fmt::init (); let mut urls: Vec <_> = args.iter ().skip (1).map (|s| s.to_string ()).collect (); urls.reverse (); let (fltk_tx, fltk_rx) = app::channel:: (); let app = app::App::default (); let window_title = "Music Player".to_string (); let mut wind = Window::new (100, 100, 300, 300, None) .with_label (&window_title); wind.make_resizable (true); wind.end (); wind.show (); let mut shared_state = SharedState::default (); let silence = vec! [0; 12_000 * 4 * 2]; shared_state.pcm_buffers.produce_bytes (silence); let shared_state = Arc::new ((Mutex::new (shared_state), Condvar::new ())); let thread_decoder = DecoderThread::new ( Arc::clone (&shared_state), Some (fltk_tx.clone ()), ); let mut audio_output = AudioOutput::new ( Arc::clone (&shared_state), thread_decoder.tx.clone (), )?; if let Some (url) = urls.pop () { thread_decoder.tx.send (MessageToDecoder::SetUrl (url))?; } else { bail! ("No URLs to play"); } while app.wait () { tracing::trace! ("Main thread is waiting on messages..."); let msg = match fltk_rx.recv () { None => continue, Some (x) => x, }; match msg { Message::DecoderStatus (x) => { if ! x { tracing::debug! ("decoder ended"); if let Some (url) = urls.pop () { thread_decoder.tx.send (MessageToDecoder::SetUrl (url))?; } else { break; } } }, _ => {}, } } audio_output.pause ()?; thread_decoder.tx.send (MessageToDecoder::Quit)?; thread_decoder.join ()?; Ok (()) } #[derive (Default)] pub struct SharedState { pub pcm_buffers: decoder::PcmBuffers, } struct Gui { intend_to_play: bool, can_decode: bool, audio_is_streaming: bool, trying_to_stream: bool, but_pause: Button, but_play: Button, } fn set_active (w: &mut W, b: bool) { if b { w.activate (); } else { w.deactivate (); } } impl Gui { pub fn sync (&mut self) { set_active ( &mut self.but_pause, self.intend_to_play, ); set_active ( &mut self.but_play, ! self.intend_to_play && self.can_decode ); } } #[derive (Clone, Copy, Debug)] enum Message { DecoderStatus (bool), RawPlay, RawPause, RawSet, RawSkip, } fn cmd_gui (args: &[String]) -> Result <()> { use std::io::BufRead; tracing_subscriber::fmt::init (); let playlist = { let mut files = Vec::with_capacity (2000); let file = File::open ("/home/user/music/playlist.m3u")?; let reader = std::io::BufReader::new (file); for line in reader.lines () { files.push (line?); } files }; let filenames: Vec <_> = args.iter () .skip (1) .map (|s| s.to_string ()) .collect (); let (fltk_tx, fltk_rx) = app::channel:: (); let app = app::App::default (); let window_title = "Music Player".to_string (); let mut wind = Window::new (100, 100, 600, 600, None) .with_label (&window_title); wind.make_resizable (true); let mut col = Flex::default ().column ().size_of_parent (); { let mut row = Flex::default ().row (); let mut label = Frame::default ().with_label ("Audio output"); let mut but_audio_play = Button::default ().with_label ("Play"); let mut but_audio_pause = Button::default ().with_label ("Pause"); row.set_size (&mut label, 100); row.set_size (&mut but_audio_play, 100); row.set_size (&mut but_audio_pause, 100); but_audio_play.set_trigger (CallbackTrigger::Release); but_audio_pause.set_trigger (CallbackTrigger::Release); but_audio_play.emit (fltk_tx, Message::RawPlay); but_audio_pause.emit (fltk_tx, Message::RawPause); row.end (); col.add (&row); col.set_size (&mut row, 30); } { let mut row = Flex::default ().row (); let mut label = Frame::default ().with_label ("Decoder"); let mut but_set = Button::default ().with_label ("Set"); let mut but_skip = Button::default ().with_label ("Skip"); row.set_size (&mut label, 100); row.set_size (&mut but_set, 100); row.set_size (&mut but_skip, 100); but_set.set_trigger (CallbackTrigger::Release); but_skip.set_trigger (CallbackTrigger::Release); but_set.emit (fltk_tx, Message::RawSet); but_skip.emit (fltk_tx, Message::RawSkip); row.end (); col.add (&row); col.set_size (&mut row, 30); } col.end (); wind.end (); wind.show (); let mut shared_state = SharedState::default (); let silence = vec! [0; 12_000 * 4 * 2]; shared_state.pcm_buffers.produce_bytes (silence); let shared_state = Arc::new ((Mutex::new (shared_state), Condvar::new ())); let thread_decoder = DecoderThread::new ( Arc::clone (&shared_state), Some (fltk_tx.clone ()), ); let mut audio_output = AudioOutput::new ( Arc::clone (&shared_state), thread_decoder.tx.clone (), )?; // Doesn't work. // audio_output.pause ()?; /* for filename in filenames { //gui.can_decode = true; thread_decoder.tx.send (MessageToDecoder::QueueFile (filename))?; } */ // let mut play_queue = vec! []; let mut current_file = None; while app.wait () { let msg = match fltk_rx.recv () { None => continue, Some (x) => x, }; match msg { Message::DecoderStatus (x) => { if ! x { tracing::info! ("Decoder ran dry"); current_file = None; } }, Message::RawPlay => { audio_output.play ()?; }, Message::RawPause => { audio_output.pause ()?; }, Message::RawSet => { use rand::Rng; let idx = rand::thread_rng ().gen_range (0..playlist.len ()); let filename = &playlist [idx]; tracing::info! ("Set decoder to {}", filename); thread_decoder.tx.send (MessageToDecoder::SetFile (format! ("/home/user/music/{}", filename)))?; current_file = Some (filename); }, Message::RawSkip => { thread_decoder.tx.send (MessageToDecoder::Clear)?; }, } } tracing::info! ("Received FLTK quit signal"); audio_output.pause ()?; tracing::debug! ("Joining decoder thread..."); thread_decoder.tx.send (MessageToDecoder::Quit)?; thread_decoder.join ()?; tracing::debug! ("Joining PCM thread..."); drop (audio_output); tracing::info! ("Exiting cleanly."); Ok (()) } fn cmd_play (args: &[String]) -> Result <()> { unimplemented! (); /* tracing_subscriber::fmt::init (); let filenames: Vec <_> = args.iter () .skip (1) .map (|s| s.to_string ()) .collect (); // let (decoder_tx, decoder_rx) = mpsc::sync_channel (1); let (main_tx, main_rx) = mpsc::sync_channel (1); let shared_state = Arc::new ((Mutex::new (SharedState::default ()), Condvar::new ())); let pcm_quit = Arc::new ((Mutex::new (false), Condvar::new ())); let thread_decoder = DecoderThread::new ( Arc::clone (&shared_state), main_tx.clone (), ); let audio_output = AudioOutput::new ( Arc::clone (&pcm_quit), Arc::clone (&shared_state), thread_decoder.tx.clone (), )?; for filename in filenames { thread_decoder.tx.send (MessageToDecoder::QueueFile (filename))?; } while let Ok (msg) = main_rx.recv () { match msg { MessageToMain::DecoderEmpty => break, }} tracing::debug! ("Asking decoder to quit..."); thread_decoder.tx.send (MessageToDecoder::Quit)?; tracing::debug! ("Joining decoder thread..."); thread_decoder.join ()?; tracing::debug! ("Joining PCM thread..."); drop (audio_output); tracing::info! ("Exiting cleanly."); Ok (()) */ } struct DecoderThread { join_handle: thread::JoinHandle >, pub tx: mpsc::SyncSender , } enum MessageToDecoder { NeedPcm, SetFile (String), SetUrl (String), Clear, Quit, } enum Decoder { File (decoder::Decoder ), Http (decoder::Decoder ), } impl DecoderThread { pub fn new ( shared_state: Arc <(Mutex , Condvar)>, main_tx: Option >, ) -> Self { let (tx, rx) = mpsc::sync_channel (4); let self_tx = tx.clone (); let join_handle = thread::spawn (move|| { let mut decoder: Option = None; 'thread: while let Ok (msg) = rx.recv () { match msg { MessageToDecoder::Clear => { decoder = None; let new_buffers = decoder::PcmBuffers::default (); self_tx.send (MessageToDecoder::NeedPcm)?; let mut shared_state = shared_state.0.lock ().unwrap (); shared_state.pcm_buffers = new_buffers; }, MessageToDecoder::NeedPcm => { // tracing::trace! ("DecoderThread recv'd NeedPcm"); let mut bytes_produced = 0; let mut status = false; 'pipeline: while bytes_produced < BUFFER_SIZE * 4 * 2 { // tracing::trace! ("Pumping multi-file decode pipeline"); let frame = match &mut decoder { None => None, Some (Decoder::File (x)) => x.next ()?, Some (Decoder::Http (x)) => x.next ()?, }; if let Some (frame) = frame { // tracing::trace! ("Decoded bytes"); status = true; let mut shared_state = shared_state.0.lock ().unwrap (); bytes_produced += frame.data ().len (); shared_state.pcm_buffers.produce_bytes (frame.data ().into ()); continue 'pipeline; } else if decoder.is_some () { decoder = None; tracing::trace! ("Decoder ended, telling main thread"); main_tx.as_ref ().map (|x| x.send (Message::DecoderStatus (false))); } { // tracing::trace! ("Decoder thread is producing silence"); let frame = vec! [0; 2_000 * 4 * 2]; { let mut shared_state = shared_state.0.lock ().unwrap (); bytes_produced += frame.len (); shared_state.pcm_buffers.produce_bytes (frame); // tracing::trace! ("PCM buffer: {}", shared_state.pcm_buffers.samples_available ()); } status = false; continue 'pipeline; } } }, MessageToDecoder::SetFile (x) => { let (demuxer, codec) = decoder::FfmpegDemuxer::new (&x)?; decoder = Some (Decoder::File (decoder::Decoder::new (demuxer, codec)?)); let new_buffers = decoder::PcmBuffers::default (); self_tx.send (MessageToDecoder::NeedPcm)?; let mut shared_state = shared_state.0.lock ().unwrap (); shared_state.pcm_buffers = new_buffers; }, MessageToDecoder::SetUrl (x) => { let (demuxer, codec) = decoder::HttpOggDemuxer::new (x)?; decoder = Some (Decoder::Http (decoder::Decoder::new (demuxer, codec)?)); let new_buffers = decoder::PcmBuffers::default (); self_tx.send (MessageToDecoder::NeedPcm)?; let mut shared_state = shared_state.0.lock ().unwrap (); shared_state.pcm_buffers = new_buffers; }, MessageToDecoder::Quit => { tracing::debug! ("DecoderThread recv'd Quit"); break 'thread; }, }} tracing::info! ("Decoder thread quit gracefully"); Ok::<_, anyhow::Error> (()) }); Self { join_handle, tx, } } pub fn join (self) -> Result <()> { self.join_handle.join ().unwrap () } } struct AudioOutput { _host: cpal::Host, _device: cpal::Device, stream: cpal::Stream, } impl AudioOutput { pub fn new ( shared_state: Arc <(Mutex , Condvar)>, decoder_tx: mpsc::SyncSender , ) -> Result { let host = cpal::default_host (); let device = host.default_output_device ().ok_or_else (|| anyhow! ("can't open cpal device"))?; let mut supported_configs_range = device.supported_output_configs ()? .filter (|c| c.channels () == 2 && c.sample_format () == cpal::SampleFormat::F32); let config = supported_configs_range.next () .ok_or_else (|| anyhow! ("can't get stereo f32 audio output"))? .with_sample_rate (cpal::SampleRate (decoder::SAMPLE_RATE)) .config (); let stream = device.build_output_stream ( &config, move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { let (lock, _) = &*shared_state; let time_start = Instant::now (); let need_pcm; let status; { let mut decoder_state = match lock.lock () { Ok (x) => x, Err (_) => return, }; let time_stop = Instant::now (); let dur = time_stop - time_start; if dur.as_micros () > 100 { tracing::warn! ("PCM thread waited {} us for a lock", dur.as_micros ()); } let pcm_buffers = &mut decoder_state.pcm_buffers; if ! pcm_buffers.consume_exact (data) { tracing::warn! ("PCM buffer underflow"); status = false; for x in data { *x = 0.0; } } else { status = true; } need_pcm = pcm_buffers.samples_available () < BUFFER_SIZE; } if need_pcm { decoder_tx.try_send (MessageToDecoder::NeedPcm).ok (); // tracing::trace! ("AudioOutput sent NeedPcm"); } }, move |err| { tracing::error! ("{:?}", err); }, )?; Ok (Self { _host: host, _device: device, stream, }) } pub fn play (&mut self) -> Result <()> { Ok (self.stream.play ()?) } pub fn pause (&mut self) -> Result <()> { Ok (self.stream.pause ()?) } } #[cfg (test)] mod test { use super::*; #[test] fn decode () -> Result <()> { ffmpeg_next::init ()?; let mut input_ctx = ffmpeg_next::format::input (&"test.opus")?; let stream = input_ctx .streams () .best (ffmpeg_next::media::Type::Audio) .ok_or_else (|| anyhow! ("can't find good audio stream"))?; let best_stream_idx = stream.index (); let mut decoder = stream.codec ().decoder ().audio ()?; let mut packet_count = 0; let mut frame_count = 0; let mut frame = ffmpeg_next::util::frame::Audio::empty (); let mut packets = input_ctx.packets (); loop { let mut did_anything = false; while decoder.receive_frame (&mut frame).is_ok () { frame_count += 1; did_anything = true; } match packets.next () { None => {}, Some ((stream, packet)) => { did_anything = true; if stream.index () == best_stream_idx { packet_count += 1; decoder.send_packet (&packet)?; } }, } if ! did_anything { break; } } Ok (()) } }