music_player/src/main.rs

631 lines
14 KiB
Rust

use std::{
fs::File,
sync::{
Arc,
Condvar,
Mutex,
mpsc,
},
thread::{
self,
},
time::{Instant},
};
use anyhow::{
anyhow,
bail,
Result,
};
use cpal::traits::{
DeviceTrait,
HostTrait,
StreamTrait,
};
use fltk::{
app,
button::Button,
enums::CallbackTrigger,
frame::Frame,
group::Flex,
prelude::*,
window::Window,
};
mod decoder;
mod net_reader;
const BUFFER_SIZE: usize = 5_000;
fn main () -> Result <()> {
let args: Vec <_> = std::env::args ().collect ();
match args.get (1).map (|s| &s[..]) {
None => bail! ("First argument must be a subcommand like `play`"),
Some ("debug-net") => cmd_debug_net (&args [1..]),
Some ("gui") => cmd_gui (&args [1..]),
Some ("play") => cmd_play (&args [1..]),
Some (_) => bail! ("Unrecognized subcommand"),
}
}
fn cmd_debug_net (args: &[String]) -> Result <()> {
tracing_subscriber::fmt::init ();
let mut urls: Vec <_> = args.iter ().skip (1).map (|s| s.to_string ()).collect ();
urls.reverse ();
let (fltk_tx, fltk_rx) = app::channel::<Message> ();
let app = app::App::default ();
let window_title = "Music Player".to_string ();
let mut wind = Window::new (100, 100, 300, 300, None)
.with_label (&window_title);
wind.make_resizable (true);
wind.end ();
wind.show ();
let mut shared_state = SharedState::default ();
let silence = vec! [0; 12_000 * 4 * 2];
shared_state.pcm_buffers.produce_bytes (silence);
let shared_state = Arc::new ((Mutex::new (shared_state), Condvar::new ()));
let thread_decoder = DecoderThread::new (
Arc::clone (&shared_state),
Some (fltk_tx.clone ()),
);
let mut audio_output = AudioOutput::new (
Arc::clone (&shared_state),
thread_decoder.tx.clone (),
)?;
if let Some (url) = urls.pop () {
thread_decoder.tx.send (MessageToDecoder::SetUrl (url))?;
}
else {
bail! ("No URLs to play");
}
while app.wait () {
tracing::trace! ("Main thread is waiting on messages...");
let msg = match fltk_rx.recv () {
None => continue,
Some (x) => x,
};
match msg {
Message::DecoderStatus (x) => {
if ! x {
tracing::debug! ("decoder ended");
if let Some (url) = urls.pop () {
thread_decoder.tx.send (MessageToDecoder::SetUrl (url))?;
}
else {
break;
}
}
},
_ => {},
}
}
audio_output.pause ()?;
thread_decoder.tx.send (MessageToDecoder::Quit)?;
thread_decoder.join ()?;
Ok (())
}
#[derive (Default)]
pub struct SharedState {
pub pcm_buffers: decoder::PcmBuffers,
}
struct Gui {
intend_to_play: bool,
can_decode: bool,
audio_is_streaming: bool,
trying_to_stream: bool,
but_pause: Button,
but_play: Button,
}
fn set_active <W: WidgetExt> (w: &mut W, b: bool) {
if b {
w.activate ();
}
else {
w.deactivate ();
}
}
impl Gui {
pub fn sync (&mut self) {
set_active (
&mut self.but_pause,
self.intend_to_play,
);
set_active (
&mut self.but_play,
! self.intend_to_play &&
self.can_decode
);
}
}
#[derive (Clone, Copy, Debug)]
enum Message {
DecoderStatus (bool),
RawPlay,
RawPause,
RawSet,
RawSkip,
}
fn cmd_gui (args: &[String]) -> Result <()> {
use std::io::BufRead;
tracing_subscriber::fmt::init ();
let playlist = {
let mut files = Vec::with_capacity (2000);
let file = File::open ("/home/user/music/playlist.m3u")?;
let reader = std::io::BufReader::new (file);
for line in reader.lines () {
files.push (line?);
}
files
};
let filenames: Vec <_> = args.iter ()
.skip (1)
.map (|s| s.to_string ())
.collect ();
let (fltk_tx, fltk_rx) = app::channel::<Message> ();
let app = app::App::default ();
let window_title = "Music Player".to_string ();
let mut wind = Window::new (100, 100, 600, 600, None)
.with_label (&window_title);
wind.make_resizable (true);
let mut col = Flex::default ().column ().size_of_parent ();
{
let mut row = Flex::default ().row ();
let mut label = Frame::default ().with_label ("Audio output");
let mut but_audio_play = Button::default ().with_label ("Play");
let mut but_audio_pause = Button::default ().with_label ("Pause");
row.set_size (&mut label, 100);
row.set_size (&mut but_audio_play, 100);
row.set_size (&mut but_audio_pause, 100);
but_audio_play.set_trigger (CallbackTrigger::Release);
but_audio_pause.set_trigger (CallbackTrigger::Release);
but_audio_play.emit (fltk_tx, Message::RawPlay);
but_audio_pause.emit (fltk_tx, Message::RawPause);
row.end ();
col.add (&row);
col.set_size (&mut row, 30);
}
{
let mut row = Flex::default ().row ();
let mut label = Frame::default ().with_label ("Decoder");
let mut but_set = Button::default ().with_label ("Set");
let mut but_skip = Button::default ().with_label ("Skip");
row.set_size (&mut label, 100);
row.set_size (&mut but_set, 100);
row.set_size (&mut but_skip, 100);
but_set.set_trigger (CallbackTrigger::Release);
but_skip.set_trigger (CallbackTrigger::Release);
but_set.emit (fltk_tx, Message::RawSet);
but_skip.emit (fltk_tx, Message::RawSkip);
row.end ();
col.add (&row);
col.set_size (&mut row, 30);
}
col.end ();
wind.end ();
wind.show ();
let mut shared_state = SharedState::default ();
let silence = vec! [0; 12_000 * 4 * 2];
shared_state.pcm_buffers.produce_bytes (silence);
let shared_state = Arc::new ((Mutex::new (shared_state), Condvar::new ()));
let thread_decoder = DecoderThread::new (
Arc::clone (&shared_state),
Some (fltk_tx.clone ()),
);
let mut audio_output = AudioOutput::new (
Arc::clone (&shared_state),
thread_decoder.tx.clone (),
)?;
// Doesn't work.
// audio_output.pause ()?;
/*
for filename in filenames {
//gui.can_decode = true;
thread_decoder.tx.send (MessageToDecoder::QueueFile (filename))?;
}
*/
// let mut play_queue = vec! [];
let mut current_file = None;
while app.wait () {
let msg = match fltk_rx.recv () {
None => continue,
Some (x) => x,
};
match msg {
Message::DecoderStatus (x) => {
if ! x {
tracing::info! ("Decoder ran dry");
current_file = None;
}
},
Message::RawPlay => {
audio_output.play ()?;
},
Message::RawPause => {
audio_output.pause ()?;
},
Message::RawSet => {
use rand::Rng;
let idx = rand::thread_rng ().gen_range (0..playlist.len ());
let filename = &playlist [idx];
tracing::info! ("Set decoder to {}", filename);
thread_decoder.tx.send (MessageToDecoder::SetFile (format! ("/home/user/music/{}", filename)))?;
current_file = Some (filename);
},
Message::RawSkip => {
thread_decoder.tx.send (MessageToDecoder::Clear)?;
},
}
}
tracing::info! ("Received FLTK quit signal");
audio_output.pause ()?;
tracing::debug! ("Joining decoder thread...");
thread_decoder.tx.send (MessageToDecoder::Quit)?;
thread_decoder.join ()?;
tracing::debug! ("Joining PCM thread...");
drop (audio_output);
tracing::info! ("Exiting cleanly.");
Ok (())
}
fn cmd_play (args: &[String]) -> Result <()> {
unimplemented! ();
/*
tracing_subscriber::fmt::init ();
let filenames: Vec <_> = args.iter ()
.skip (1)
.map (|s| s.to_string ())
.collect ();
// let (decoder_tx, decoder_rx) = mpsc::sync_channel (1);
let (main_tx, main_rx) = mpsc::sync_channel (1);
let shared_state = Arc::new ((Mutex::new (SharedState::default ()), Condvar::new ()));
let pcm_quit = Arc::new ((Mutex::new (false), Condvar::new ()));
let thread_decoder = DecoderThread::new (
Arc::clone (&shared_state),
main_tx.clone (),
);
let audio_output = AudioOutput::new (
Arc::clone (&pcm_quit),
Arc::clone (&shared_state),
thread_decoder.tx.clone (),
)?;
for filename in filenames {
thread_decoder.tx.send (MessageToDecoder::QueueFile (filename))?;
}
while let Ok (msg) = main_rx.recv () { match msg {
MessageToMain::DecoderEmpty => break,
}}
tracing::debug! ("Asking decoder to quit...");
thread_decoder.tx.send (MessageToDecoder::Quit)?;
tracing::debug! ("Joining decoder thread...");
thread_decoder.join ()?;
tracing::debug! ("Joining PCM thread...");
drop (audio_output);
tracing::info! ("Exiting cleanly.");
Ok (())
*/
}
struct DecoderThread {
join_handle: thread::JoinHandle <Result <()>>,
pub tx: mpsc::SyncSender <MessageToDecoder>,
}
enum MessageToDecoder {
NeedPcm,
SetFile (String),
SetUrl (String),
Clear,
Quit,
}
enum Decoder {
File (decoder::Decoder <decoder::FfmpegDemuxer>),
Http (decoder::Decoder <decoder::HttpOggDemuxer>),
}
impl DecoderThread {
pub fn new (
shared_state: Arc <(Mutex <SharedState>, Condvar)>,
main_tx: Option <fltk::app::Sender <Message>>,
) -> Self
{
let (tx, rx) = mpsc::sync_channel (4);
let self_tx = tx.clone ();
let join_handle = thread::spawn (move|| {
let mut decoder: Option <Decoder> = None;
'thread: while let Ok (msg) = rx.recv () { match msg {
MessageToDecoder::Clear => {
decoder = None;
let new_buffers = decoder::PcmBuffers::default ();
self_tx.send (MessageToDecoder::NeedPcm)?;
let mut shared_state = shared_state.0.lock ().unwrap ();
shared_state.pcm_buffers = new_buffers;
},
MessageToDecoder::NeedPcm => {
// tracing::trace! ("DecoderThread recv'd NeedPcm");
let mut bytes_produced = 0;
let mut status = false;
'pipeline: while bytes_produced < BUFFER_SIZE * 4 * 2 {
// tracing::trace! ("Pumping multi-file decode pipeline");
let frame = match &mut decoder {
None => None,
Some (Decoder::File (x)) => x.next ()?,
Some (Decoder::Http (x)) => x.next ()?,
};
if let Some (frame) = frame {
// tracing::trace! ("Decoded bytes");
status = true;
let mut shared_state = shared_state.0.lock ().unwrap ();
bytes_produced += frame.data ().len ();
shared_state.pcm_buffers.produce_bytes (frame.data ().into ());
continue 'pipeline;
}
else if decoder.is_some () {
decoder = None;
tracing::trace! ("Decoder ended, telling main thread");
main_tx.as_ref ().map (|x| x.send (Message::DecoderStatus (false)));
}
{
// tracing::trace! ("Decoder thread is producing silence");
let frame = vec! [0; 2_000 * 4 * 2];
{
let mut shared_state = shared_state.0.lock ().unwrap ();
bytes_produced += frame.len ();
shared_state.pcm_buffers.produce_bytes (frame);
// tracing::trace! ("PCM buffer: {}", shared_state.pcm_buffers.samples_available ());
}
status = false;
continue 'pipeline;
}
}
},
MessageToDecoder::SetFile (x) => {
let (demuxer, codec) = decoder::FfmpegDemuxer::new (&x)?;
decoder = Some (Decoder::File (decoder::Decoder::new (demuxer, codec)?));
let new_buffers = decoder::PcmBuffers::default ();
self_tx.send (MessageToDecoder::NeedPcm)?;
let mut shared_state = shared_state.0.lock ().unwrap ();
shared_state.pcm_buffers = new_buffers;
},
MessageToDecoder::SetUrl (x) => {
let (demuxer, codec) = decoder::HttpOggDemuxer::new (x)?;
decoder = Some (Decoder::Http (decoder::Decoder::new (demuxer, codec)?));
let new_buffers = decoder::PcmBuffers::default ();
self_tx.send (MessageToDecoder::NeedPcm)?;
let mut shared_state = shared_state.0.lock ().unwrap ();
shared_state.pcm_buffers = new_buffers;
},
MessageToDecoder::Quit => {
tracing::debug! ("DecoderThread recv'd Quit");
break 'thread;
},
}}
tracing::info! ("Decoder thread quit gracefully");
Ok::<_, anyhow::Error> (())
});
Self {
join_handle,
tx,
}
}
pub fn join (self) -> Result <()> {
self.join_handle.join ().unwrap ()
}
}
struct AudioOutput {
_host: cpal::Host,
_device: cpal::Device,
stream: cpal::Stream,
}
impl AudioOutput {
pub fn new (
shared_state: Arc <(Mutex <SharedState>, Condvar)>,
decoder_tx: mpsc::SyncSender <MessageToDecoder>,
) -> Result <Self>
{
let host = cpal::default_host ();
let device = host.default_output_device ().ok_or_else (|| anyhow! ("can't open cpal device"))?;
let mut supported_configs_range = device.supported_output_configs ()?
.filter (|c| c.channels () == 2 && c.sample_format () == cpal::SampleFormat::F32);
let config = supported_configs_range.next ()
.ok_or_else (|| anyhow! ("can't get stereo f32 audio output"))?
.with_sample_rate (cpal::SampleRate (decoder::SAMPLE_RATE))
.config ();
let stream = device.build_output_stream (
&config,
move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
let (lock, _) = &*shared_state;
let time_start = Instant::now ();
let need_pcm;
let status;
{
let mut decoder_state = match lock.lock () {
Ok (x) => x,
Err (_) => return,
};
let time_stop = Instant::now ();
let dur = time_stop - time_start;
if dur.as_micros () > 100 {
tracing::warn! ("PCM thread waited {} us for a lock", dur.as_micros ());
}
let pcm_buffers = &mut decoder_state.pcm_buffers;
if ! pcm_buffers.consume_exact (data) {
tracing::warn! ("PCM buffer underflow");
status = false;
for x in data {
*x = 0.0;
}
}
else {
status = true;
}
need_pcm = pcm_buffers.samples_available () < BUFFER_SIZE;
}
if need_pcm {
decoder_tx.try_send (MessageToDecoder::NeedPcm).ok ();
// tracing::trace! ("AudioOutput sent NeedPcm");
}
},
move |err| {
tracing::error! ("{:?}", err);
},
)?;
Ok (Self {
_host: host,
_device: device,
stream,
})
}
pub fn play (&mut self) -> Result <()> {
Ok (self.stream.play ()?)
}
pub fn pause (&mut self) -> Result <()> {
Ok (self.stream.pause ()?)
}
}
#[cfg (test)]
mod test {
use super::*;
#[test]
fn decode () -> Result <()> {
ffmpeg_next::init ()?;
let mut input_ctx = ffmpeg_next::format::input (&"test.opus")?;
let stream = input_ctx
.streams ()
.best (ffmpeg_next::media::Type::Audio)
.ok_or_else (|| anyhow! ("can't find good audio stream"))?;
let best_stream_idx = stream.index ();
let mut decoder = stream.codec ().decoder ().audio ()?;
let mut packet_count = 0;
let mut frame_count = 0;
let mut frame = ffmpeg_next::util::frame::Audio::empty ();
let mut packets = input_ctx.packets ();
loop {
let mut did_anything = false;
while decoder.receive_frame (&mut frame).is_ok () {
frame_count += 1;
did_anything = true;
}
match packets.next () {
None => {},
Some ((stream, packet)) => {
did_anything = true;
if stream.index () == best_stream_idx {
packet_count += 1;
decoder.send_packet (&packet)?;
}
},
}
if ! did_anything {
break;
}
}
Ok (())
}
}