diff --git a/Cargo.lock b/Cargo.lock index 2602400..8c88db7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -322,6 +322,17 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "getrandom" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "wasi", +] + [[package]] name = "glob" version = "0.3.0" @@ -469,6 +480,7 @@ dependencies = [ "cpal", "ffmpeg-next", "fltk", + "rand", "tracing", "tracing-subscriber", ] @@ -697,6 +709,12 @@ version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12295df4f294471248581bc09bef3c38a5e46f1e36d6a37353621a0c6c357e1f" +[[package]] +name = "ppv-lite86" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed0cfbc8191465bed66e1718596ee0b0b35d5ee1f41c5df2189d0fe8bde535ba" + [[package]] name = "proc-macro-crate" version = "0.1.5" @@ -734,6 +752,46 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", + "rand_hc", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" +dependencies = [ + "rand_core", +] + [[package]] name = "redox_syscall" version = "0.2.10" @@ -966,6 +1024,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + [[package]] name = "wasm-bindgen" version = "0.2.78" diff --git a/Cargo.toml b/Cargo.toml index d6a6fcb..7e8b866 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,5 +11,6 @@ byteorder = "1.4.3" cpal = "0.13.4" ffmpeg-next = "4.4.0" fltk = "1.2.15" +rand = "0.8.4" tracing = "0.1.29" tracing-subscriber = { version = "0.3.1", features = ["env-filter"] } diff --git a/src/decoder.rs b/src/decoder.rs index 2c633c7..713ae07 100644 --- a/src/decoder.rs +++ b/src/decoder.rs @@ -91,9 +91,9 @@ impl PcmBuffers { } pub struct Decoder { - pub input_ctx: DemuxContext, - pub decoder: DecodeContext, - pub resampler: ResamplingContext, + input_ctx: DemuxContext, + decoder: DecodeContext, + resampler: ResamplingContext, best_stream_idx: usize, dummy_frame: Option , diff --git a/src/main.rs b/src/main.rs index 191f130..5ed6731 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ use std::{ Arc, Condvar, Mutex, + mpsc, }, thread::{ self, @@ -36,11 +37,7 @@ use fltk::{ mod decoder; -#[derive (Clone, Copy)] -enum Message { - Play, - Pause, -} +const BUFFER_SIZE: usize = 5_000; fn main () -> Result <()> { let args: Vec <_> = std::env::args ().collect (); @@ -98,12 +95,68 @@ fn cmd_debug_pipe (args: &[String]) -> Result <()> { #[derive (Default)] pub struct SharedState { pub pcm_buffers: decoder::PcmBuffers, - pub quit: bool, +} + +struct Gui { + intend_to_play: bool, + can_decode: bool, + audio_is_streaming: bool, + trying_to_stream: bool, + + but_pause: Button, + but_play: Button, +} + +fn set_active (w: &mut W, b: bool) { + if b { + w.activate (); + } + else { + w.deactivate (); + } +} + +impl Gui { + pub fn sync (&mut self) { + set_active ( + &mut self.but_pause, + self.intend_to_play, + ); + set_active ( + &mut self.but_play, + ! self.intend_to_play && + self.can_decode + ); + } +} + +#[derive (Clone, Copy, Debug)] +enum Message { + AudioStreamStatus (bool), + DecoderStatus (bool), + RawPlay, + RawPause, + RawSet, + RawSkip, } fn cmd_gui (args: &[String]) -> Result <()> { + use std::io::BufRead; + tracing_subscriber::fmt::init (); + let playlist = { + let mut files = Vec::with_capacity (2000); + let file = File::open ("/home/user/music/playlist.m3u")?; + let reader = std::io::BufReader::new (file); + + for line in reader.lines () { + files.push (line?); + } + + files + }; + let filenames: Vec <_> = args.iter () .skip (1) .map (|s| s.to_string ()) @@ -117,49 +170,146 @@ fn cmd_gui (args: &[String]) -> Result <()> { .with_label (&window_title); wind.make_resizable (true); - let mut row = Flex::default ().row ().size_of_parent (); + let mut col = Flex::default ().column ().size_of_parent (); - let mut but_play = Button::default ().with_label ("▶️"); - but_play.set_trigger (CallbackTrigger::Release); - but_play.emit (fltk_tx, Message::Play); - row.set_size (&mut but_play, 30); + { + let mut row = Flex::default ().row (); + + let mut label = Frame::default ().with_label ("Audio output"); + let mut but_audio_play = Button::default ().with_label ("Play"); + let mut but_audio_pause = Button::default ().with_label ("Pause"); + + row.set_size (&mut label, 100); + row.set_size (&mut but_audio_play, 100); + row.set_size (&mut but_audio_pause, 100); + + but_audio_play.set_trigger (CallbackTrigger::Release); + but_audio_pause.set_trigger (CallbackTrigger::Release); + + but_audio_play.emit (fltk_tx, Message::RawPlay); + but_audio_pause.emit (fltk_tx, Message::RawPause); + + row.end (); + + col.add (&row); + col.set_size (&mut row, 30); + } - let mut but_pause = Button::default ().with_label ("■"); - but_pause.set_trigger (CallbackTrigger::Release); - but_pause.emit (fltk_tx, Message::Pause); - row.set_size (&mut but_pause, 30); + { + let mut row = Flex::default ().row (); + + let mut label = Frame::default ().with_label ("Decoder"); + let mut but_set = Button::default ().with_label ("Set"); + let mut but_skip = Button::default ().with_label ("Skip"); + + row.set_size (&mut label, 100); + row.set_size (&mut but_set, 100); + row.set_size (&mut but_skip, 100); + + but_set.set_trigger (CallbackTrigger::Release); + but_skip.set_trigger (CallbackTrigger::Release); + + but_set.emit (fltk_tx, Message::RawSet); + but_skip.emit (fltk_tx, Message::RawSkip); + + row.end (); + + col.add (&row); + col.set_size (&mut row, 30); + } - row.end (); + col.end (); wind.end (); wind.show (); + /* + let mut gui = Gui { + can_decode: false, + audio_is_streaming: false, + intend_to_play: false, + trying_to_stream: false, + but_pause, + but_play, + }; - let shared_state = Arc::new ((Mutex::new (SharedState::default ()), Condvar::new ())); - let pcm_quit = Arc::new ((Mutex::new (false), Condvar::new ())); + gui.sync (); + */ + let mut shared_state = SharedState::default (); + let silence = vec! [0; 12_000 * 4 * 2]; + shared_state.pcm_buffers.produce_bytes (silence); - let thread_decoder = DecoderThread::new (Arc::clone (&shared_state), filenames); - let mut audio_output = AudioOutput::new (Arc::clone (&pcm_quit), Arc::clone (&shared_state))?; + let shared_state = Arc::new ((Mutex::new (shared_state), Condvar::new ())); + + let thread_decoder = DecoderThread::new ( + Arc::clone (&shared_state), + fltk_tx.clone (), + ); + let mut audio_output = AudioOutput::new ( + Arc::clone (&shared_state), + thread_decoder.tx.clone (), + fltk_tx.clone (), + )?; + + // Doesn't work. + // audio_output.pause ()?; + + /* + for filename in filenames { + //gui.can_decode = true; + thread_decoder.tx.send (MessageToDecoder::QueueFile (filename))?; + } + */ + + // let mut play_queue = vec! []; + + let mut current_file = None; while app.wait () { - match fltk_rx.recv () { - Some (Message::Play) => { - tracing::info! ("play"); + let msg = match fltk_rx.recv () { + None => continue, + Some (x) => x, + }; + + match msg { + Message::AudioStreamStatus (x) => { + + }, + Message::DecoderStatus (x) => { + if ! x { + tracing::info! ("Decoder ran dry"); + current_file = None; + } + }, + Message::RawPlay => { audio_output.play ()?; }, - Some (Message::Pause) => { - tracing::info! ("pause"); + Message::RawPause => { audio_output.pause ()?; }, - None => (), + Message::RawSet => { + use rand::Rng; + let idx = rand::thread_rng ().gen_range (0..playlist.len ()); + let filename = &playlist [idx]; + tracing::info! ("Set decoder to {}", filename); + thread_decoder.tx.send (MessageToDecoder::SetFile (Some (format! ("/home/user/music/{}", filename))))?; + current_file = Some (filename); + }, + Message::RawSkip => { + thread_decoder.tx.send (MessageToDecoder::SetFile (None))?; + }, } } + tracing::info! ("Received FLTK quit signal"); + + audio_output.pause ()?; + tracing::debug! ("Joining decoder thread..."); + thread_decoder.tx.send (MessageToDecoder::Quit)?; thread_decoder.join ()?; tracing::debug! ("Joining PCM thread..."); - let (lock, cvar) = &*pcm_quit; - let _ = cvar.wait (lock.lock ().unwrap ()).unwrap (); + drop (audio_output); tracing::info! ("Exiting cleanly."); @@ -167,6 +317,8 @@ fn cmd_gui (args: &[String]) -> Result <()> { } fn cmd_play (args: &[String]) -> Result <()> { + unimplemented! (); + /* tracing_subscriber::fmt::init (); let filenames: Vec <_> = args.iter () @@ -174,68 +326,140 @@ fn cmd_play (args: &[String]) -> Result <()> { .map (|s| s.to_string ()) .collect (); + // let (decoder_tx, decoder_rx) = mpsc::sync_channel (1); + let (main_tx, main_rx) = mpsc::sync_channel (1); + let shared_state = Arc::new ((Mutex::new (SharedState::default ()), Condvar::new ())); let pcm_quit = Arc::new ((Mutex::new (false), Condvar::new ())); - let thread_decoder = DecoderThread::new (Arc::clone (&shared_state), filenames); - let audio_output = AudioOutput::new (Arc::clone (&pcm_quit), Arc::clone (&shared_state))?; + let thread_decoder = DecoderThread::new ( + Arc::clone (&shared_state), + main_tx.clone (), + ); + let audio_output = AudioOutput::new ( + Arc::clone (&pcm_quit), + Arc::clone (&shared_state), + thread_decoder.tx.clone (), + )?; + for filename in filenames { + thread_decoder.tx.send (MessageToDecoder::QueueFile (filename))?; + } + + while let Ok (msg) = main_rx.recv () { match msg { + MessageToMain::DecoderEmpty => break, + }} + + tracing::debug! ("Asking decoder to quit..."); + thread_decoder.tx.send (MessageToDecoder::Quit)?; tracing::debug! ("Joining decoder thread..."); thread_decoder.join ()?; tracing::debug! ("Joining PCM thread..."); - let (lock, cvar) = &*pcm_quit; - let _ = cvar.wait (lock.lock ().unwrap ()).unwrap (); - drop (audio_output); tracing::info! ("Exiting cleanly."); Ok (()) + */ } struct DecoderThread { join_handle: thread::JoinHandle >, + pub tx: mpsc::SyncSender , +} + +enum MessageToDecoder { + NeedPcm, + SetFile (Option ), + Quit, } impl DecoderThread { pub fn new ( shared_state: Arc <(Mutex , Condvar)>, - filenames: Vec , + main_tx: fltk::app::Sender , ) -> Self { + let (tx, rx) = mpsc::sync_channel (4); + + let self_tx = tx.clone (); + let join_handle = thread::spawn (move|| { - let (lock, cvar) = &*shared_state; + let mut decoder: Option = None; - 'many_files: for filename in &filenames { - let mut decoder = decoder::Decoder::new (&filename)?; - - 'one_file: loop { - let frame = match decoder.next ()? { - Some (x) => x, - None => { - tracing::debug! ("Decoder thread finished a file"); - break 'one_file; - }, - }; + 'thread: while let Ok (msg) = rx.recv () { match msg { + MessageToDecoder::NeedPcm => { + // tracing::trace! ("DecoderThread recv'd NeedPcm"); + let mut bytes_produced = 0; + let mut status = false; - let mut decoder_state = cvar.wait_while (lock.lock ().unwrap (), |decoder_state| { - decoder_state.pcm_buffers.samples_available () >= 12_000 && - ! decoder_state.quit - }).unwrap (); - - if decoder_state.quit { - break 'many_files; + 'pipeline: while bytes_produced < BUFFER_SIZE * 4 * 2 { + // tracing::trace! ("Pumping multi-file decode pipeline"); + let frame = if let Some (decoder) = decoder.as_mut () { + decoder.next ()? + } + else { + None + }; + + if let Some (frame) = frame { + // tracing::trace! ("Decoded bytes"); + status = true; + let mut shared_state = shared_state.0.lock ().unwrap (); + bytes_produced += frame.data ().len (); + shared_state.pcm_buffers.produce_bytes (frame.data ().into ()); + continue 'pipeline; + } + else if decoder.is_some () { + decoder = None; + main_tx.send (Message::DecoderStatus (false)); + } + + { + // tracing::trace! ("Decoder thread is producing silence"); + let frame = vec! [0; 2_000 * 4 * 2]; + { + let mut shared_state = shared_state.0.lock ().unwrap (); + + bytes_produced += frame.len (); + shared_state.pcm_buffers.produce_bytes (frame); + // tracing::trace! ("PCM buffer: {}", shared_state.pcm_buffers.samples_available ()); + } + status = false; + continue 'pipeline; + } } + // tracing::trace! ("Left NeedPcm handler"); + // main_tx.send (Message::DecoderStatus (status)); + }, + MessageToDecoder::SetFile (x) => { + if let Some (x) = x.as_ref () { + decoder = Some (decoder::Decoder::new (x)?); + } + else { + decoder = None; + } + // main_tx.send (Message::DecoderStatus (x.is_some ())); + let new_buffers = decoder::PcmBuffers::default (); + self_tx.send (MessageToDecoder::NeedPcm)?; - decoder_state.pcm_buffers.produce_bytes (frame.data ().into ()); - } - } + let mut shared_state = shared_state.0.lock ().unwrap (); + shared_state.pcm_buffers = new_buffers; + }, + MessageToDecoder::Quit => { + tracing::debug! ("DecoderThread recv'd Quit"); + break 'thread; + }, + }} + + tracing::info! ("Decoder thread quit gracefully"); Ok::<_, anyhow::Error> (()) }); Self { join_handle, + tx, } } @@ -252,8 +476,9 @@ struct AudioOutput { impl AudioOutput { pub fn new ( - pcm_quit: Arc <(Mutex , Condvar)>, shared_state: Arc <(Mutex , Condvar)>, + decoder_tx: mpsc::SyncSender , + main_tx: fltk::app::Sender , ) -> Result { let host = cpal::default_host (); @@ -270,39 +495,47 @@ impl AudioOutput { let stream = device.build_output_stream ( &config, move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { - let (lock, cvar) = &*shared_state; + let (lock, _) = &*shared_state; let time_start = Instant::now (); - let mut decoder_state = match lock.lock () { - Ok (x) => x, - Err (_) => return, - }; - let time_stop = Instant::now (); - let dur = time_stop - time_start; - if dur.as_micros () > 100 { - tracing::warn! ("PCM thread waited {} us for a lock", dur.as_micros ()); - } - - let pcm_buffers = &mut decoder_state.pcm_buffers; - - if ! pcm_buffers.consume_exact (data) { - tracing::warn! ("PCM buffer underflow"); - - for x in data { - *x = 0.0; + let need_pcm; + let status; + { + let mut decoder_state = match lock.lock () { + Ok (x) => x, + Err (_) => return, + }; + let time_stop = Instant::now (); + let dur = time_stop - time_start; + if dur.as_micros () > 100 { + tracing::warn! ("PCM thread waited {} us for a lock", dur.as_micros ()); } - let (lock, cvar) = &*pcm_quit; + let pcm_buffers = &mut decoder_state.pcm_buffers; - let mut pcm_quit = lock.lock ().unwrap (); - *pcm_quit = true; - cvar.notify_one (); + if ! pcm_buffers.consume_exact (data) { + tracing::warn! ("PCM buffer underflow"); + status = false; + + for x in data { + *x = 0.0; + } + } + else { + status = true; + } + + need_pcm = pcm_buffers.samples_available () < BUFFER_SIZE; } - cvar.notify_one (); + if need_pcm { + decoder_tx.try_send (MessageToDecoder::NeedPcm).ok (); + // tracing::trace! ("AudioOutput sent NeedPcm"); + } + // main_tx.send (Message::AudioStreamStatus (status)); }, - move |_err| { - // react to errors here. + move |err| { + tracing::error! ("{:?}", err); }, )?;