Compare commits

...

3 Commits

Author SHA1 Message Date
_ 84dc1a983f HTTP playback works, for Ogg/Vorbis only for now
This is really cool. Since I own the whole I/O stack, I could also make it run
over QUIC or some compressed format or anything I like.

And I can probably sniff the first packet by hand to suport both Vorbis and
Opus. That would cover most of my music library.
2021-11-14 19:31:54 +00:00
_ 99b3bfe231 ♻️ refactor 2021-11-14 18:12:07 +00:00
_ 57c53a31dd passing `cargo check` 2021-11-14 18:03:37 +00:00
2 changed files with 154 additions and 126 deletions

View File

@ -90,25 +90,18 @@ impl PcmBuffers {
} }
} }
pub struct Decoder { pub struct Decoder <T> {
input_ctx: DemuxContext, demuxer: T,
decoder: DecodeContext, decoder: DecodeContext,
resampler: ResamplingContext, resampler: ResamplingContext,
best_stream_idx: usize,
dummy_frame: Option <FfAudioFrame>, dummy_frame: Option <FfAudioFrame>,
} }
impl Decoder { impl <T: Demuxer> Decoder <T> {
pub fn new (filename: &str) -> Result <Self> { pub fn new (demuxer: T, codec: ffmpeg_next::codec::Context) -> Result <Self> {
let input_ctx = ffmpeg_next::format::input (&filename)?; let decoder = codec.decoder ().audio ()?;
let stream = input_ctx
.streams ()
.best (ffmpeg_next::media::Type::Audio)
.ok_or_else (|| anyhow! ("can't find good audio stream"))?;
let best_stream_idx = stream.index ();
let decoder = stream.codec ().decoder ().audio ()?;
let resampler = decoder.resampler ( let resampler = decoder.resampler (
Sample::F32 (sample::Type::Packed), Sample::F32 (sample::Type::Packed),
ChannelLayout::STEREO, ChannelLayout::STEREO,
@ -116,11 +109,10 @@ impl Decoder {
)?; )?;
Ok (Self { Ok (Self {
input_ctx, demuxer,
decoder, decoder,
resampler, resampler,
best_stream_idx,
dummy_frame: None, dummy_frame: None,
}) })
} }
@ -223,30 +215,28 @@ impl Decoder {
} }
pub fn pump_demuxer (&mut self) -> Result <bool> { pub fn pump_demuxer (&mut self) -> Result <bool> {
while let Some ((stream, packet)) = self.input_ctx.packets ().next () { let packet = self.demuxer.pump ()?;
if stream.index () == self.best_stream_idx { let packet = match packet {
// tracing::trace! ("demuxed packet"); Some (x) => x,
self.decoder.send_packet (&packet)?; None => return Ok (false),
return Ok (true); };
}
}
Ok (false) self.decoder.send_packet (&packet)?;
Ok (true)
} }
} }
trait Demuxer { pub trait Demuxer {
fn pump (&mut self) -> Result <Option <ffmpeg_next::Packet>>;
} }
struct FfmpegDemuxer { pub struct FfmpegDemuxer {
input_ctx: DemuxContext, input_ctx: DemuxContext,
best_stream_idx: usize, best_stream_idx: usize,
decoder: DecodeContext,
} }
impl FfmpegDemuxer { impl FfmpegDemuxer {
pub fn new (filename: &str) -> Result <Self> { pub fn new (filename: &str) -> Result <(Self, ffmpeg_next::codec::Context)> {
let input_ctx = ffmpeg_next::format::input (&filename)?; let input_ctx = ffmpeg_next::format::input (&filename)?;
let stream = input_ctx let stream = input_ctx
.streams () .streams ()
@ -254,35 +244,71 @@ impl FfmpegDemuxer {
.ok_or_else (|| anyhow! ("can't find good audio stream"))?; .ok_or_else (|| anyhow! ("can't find good audio stream"))?;
let best_stream_idx = stream.index (); let best_stream_idx = stream.index ();
let decoder = stream.codec ().decoder ().audio ()?; let codec = stream.codec ();
Ok (Self { Ok ((Self {
input_ctx, input_ctx,
best_stream_idx, best_stream_idx,
decoder, }, codec))
})
} }
} }
impl FfmpegDemuxer { impl Demuxer for FfmpegDemuxer {
fn pump (&mut self) -> Result < fn pump (&mut self) -> Result <Option <ffmpeg_next::Packet>> {
while let Some ((stream, packet)) = self.input_ctx.packets ().next () {
if stream.index () == self.best_stream_idx {
return Ok (Some (packet));
}
}
Ok (None)
}
} }
use crate::net_reader::NetReader; use crate::net_reader::NetReader;
struct HttpOggDemuxer { pub struct HttpOggDemuxer {
ogg_rdr: ogg::reading::PacketReader <std::io::BufReader <NetReader>>, ogg_rdr: ogg::reading::PacketReader <std::io::BufReader <NetReader>>,
} }
impl HttpOggDemuxer { impl HttpOggDemuxer {
pub fn new (url: String) -> Result <Self> { pub fn new (url: String) -> Result <(Self, ffmpeg_next::codec::Context)> {
let net_rdr = NetReader::new (url)?; let net_rdr = NetReader::new (url)?;
let buf_rdr = std::io::BufReader::new (net_rdr); let buf_rdr = std::io::BufReader::new (net_rdr);
let mut ogg_rdr = ogg::reading::PacketReader::new (buf_rdr); let mut ogg_rdr = ogg::reading::PacketReader::new (buf_rdr);
Ok (Self { let codec = {
// Open a local file as a placeholder for figuring out the remote file's codec
let input_ctx = ffmpeg_next::format::input (&"test.ogg")?;
let stream = input_ctx
.streams ()
.best (ffmpeg_next::media::Type::Audio)
.ok_or_else (|| anyhow! ("can't find good audio stream"))?;
stream.codec ()
};
Ok ((Self {
ogg_rdr, ogg_rdr,
}) }, codec))
}
}
impl Demuxer for HttpOggDemuxer {
fn pump (&mut self) -> Result <Option <ffmpeg_next::Packet>> {
let packet = match self.ogg_rdr.read_packet () {
Err (e) => {
tracing::error! ("Ogg read error: {:?}", e);
return Ok (None);
},
Ok (x) => x,
};
let packet = match packet {
None => return Ok (None),
Some (x) => x,
};
Ok (Some (ffmpeg_next::Packet::copy (&packet.data)))
} }
} }

View File

@ -1,6 +1,5 @@
use std::{ use std::{
fs::File, fs::File,
io::Write,
sync::{ sync::{
Arc, Arc,
Condvar, Condvar,
@ -45,9 +44,7 @@ fn main () -> Result <()> {
match args.get (1).map (|s| &s[..]) { match args.get (1).map (|s| &s[..]) {
None => bail! ("First argument must be a subcommand like `play`"), None => bail! ("First argument must be a subcommand like `play`"),
Some ("debug-dump") => cmd_debug_dump (&args [1..]),
Some ("debug-net") => cmd_debug_net (&args [1..]), Some ("debug-net") => cmd_debug_net (&args [1..]),
Some ("debug-pipe") => cmd_debug_pipe (&args [1..]),
Some ("gui") => cmd_gui (&args [1..]), Some ("gui") => cmd_gui (&args [1..]),
Some ("play") => cmd_play (&args [1..]), Some ("play") => cmd_play (&args [1..]),
Some (_) => bail! ("Unrecognized subcommand"), Some (_) => bail! ("Unrecognized subcommand"),
@ -55,71 +52,67 @@ fn main () -> Result <()> {
} }
fn cmd_debug_net (args: &[String]) -> Result <()> { fn cmd_debug_net (args: &[String]) -> Result <()> {
use std::{
io::{
self,
Read,
Seek,
SeekFrom,
},
};
tracing_subscriber::fmt::init (); tracing_subscriber::fmt::init ();
let url = args.get (1).ok_or_else (|| anyhow! ("Need a URL"))? let mut urls: Vec <_> = args.iter ().skip (1).map (|s| s.to_string ()).collect ();
.to_string (); urls.reverse ();
let net_rdr = net_reader::NetReader::new (url)?; let (fltk_tx, fltk_rx) = app::channel::<Message> ();
let buf_rdr = std::io::BufReader::new (net_rdr); let app = app::App::default ();
let mut ogg_rdr = ogg::reading::PacketReader::new (buf_rdr); let window_title = "Music Player".to_string ();
let mut wind = Window::new (100, 100, 300, 300, None)
.with_label (&window_title);
wind.make_resizable (true);
wind.end ();
wind.show ();
let mut packet_count = 0; let mut shared_state = SharedState::default ();
let silence = vec! [0; 12_000 * 4 * 2];
shared_state.pcm_buffers.produce_bytes (silence);
let shared_state = Arc::new ((Mutex::new (shared_state), Condvar::new ()));
while let Ok (Some (packet)) = ogg_rdr.read_packet () { let thread_decoder = DecoderThread::new (
packet_count += 1; Arc::clone (&shared_state),
Some (fltk_tx.clone ()),
);
let mut audio_output = AudioOutput::new (
Arc::clone (&shared_state),
thread_decoder.tx.clone (),
)?;
if let Some (url) = urls.pop () {
thread_decoder.tx.send (MessageToDecoder::SetUrl (url))?;
}
else {
bail! ("No URLs to play");
} }
tracing::info! ("Read {} packets", packet_count); while app.wait () {
tracing::trace! ("Main thread is waiting on messages...");
Ok (()) let msg = match fltk_rx.recv () {
} None => continue,
Some (x) => x,
fn cmd_debug_dump (args: &[String]) -> Result <()> { };
tracing_subscriber::fmt::init ();
match msg {
let filename = args.get (1) Message::DecoderStatus (x) => {
.map (|s| s.to_string ()) if ! x {
.unwrap_or_else (|| "test-short.m4a".to_string ()); tracing::debug! ("decoder ended");
if let Some (url) = urls.pop () {
let mut decoder = decoder::Decoder::new (&filename)?; thread_decoder.tx.send (MessageToDecoder::SetUrl (url))?;
}
tracing::debug! ("Decoding..."); else {
break;
let mut f = File::create ("pcm-dump.data")?; }
}
while let Some (frame) = decoder.next ()? { },
f.write_all (frame.data ())?; _ => {},
}
} }
Ok (()) audio_output.pause ()?;
} thread_decoder.tx.send (MessageToDecoder::Quit)?;
thread_decoder.join ()?;
fn cmd_debug_pipe (args: &[String]) -> Result <()> {
tracing_subscriber::fmt::init ();
let filename = args.get (1)
.map (|s| s.to_string ())
.unwrap_or_else (|| "test-short.m4a".to_string ());
let mut decoder = decoder::Decoder::new (&filename)?;
let mut sample_count = 0;
while let Some (frame) = decoder.next ()? {
sample_count += frame.samples ();
// dbg! (frame, sample_count);
}
dbg! (sample_count);
Ok (()) Ok (())
} }
@ -164,7 +157,6 @@ impl Gui {
#[derive (Clone, Copy, Debug)] #[derive (Clone, Copy, Debug)]
enum Message { enum Message {
AudioStreamStatus (bool),
DecoderStatus (bool), DecoderStatus (bool),
RawPlay, RawPlay,
RawPause, RawPause,
@ -258,17 +250,15 @@ fn cmd_gui (args: &[String]) -> Result <()> {
let mut shared_state = SharedState::default (); let mut shared_state = SharedState::default ();
let silence = vec! [0; 12_000 * 4 * 2]; let silence = vec! [0; 12_000 * 4 * 2];
shared_state.pcm_buffers.produce_bytes (silence); shared_state.pcm_buffers.produce_bytes (silence);
let shared_state = Arc::new ((Mutex::new (shared_state), Condvar::new ())); let shared_state = Arc::new ((Mutex::new (shared_state), Condvar::new ()));
let thread_decoder = DecoderThread::new ( let thread_decoder = DecoderThread::new (
Arc::clone (&shared_state), Arc::clone (&shared_state),
fltk_tx.clone (), Some (fltk_tx.clone ()),
); );
let mut audio_output = AudioOutput::new ( let mut audio_output = AudioOutput::new (
Arc::clone (&shared_state), Arc::clone (&shared_state),
thread_decoder.tx.clone (), thread_decoder.tx.clone (),
fltk_tx.clone (),
)?; )?;
// Doesn't work. // Doesn't work.
@ -292,9 +282,6 @@ fn cmd_gui (args: &[String]) -> Result <()> {
}; };
match msg { match msg {
Message::AudioStreamStatus (x) => {
},
Message::DecoderStatus (x) => { Message::DecoderStatus (x) => {
if ! x { if ! x {
tracing::info! ("Decoder ran dry"); tracing::info! ("Decoder ran dry");
@ -312,11 +299,11 @@ fn cmd_gui (args: &[String]) -> Result <()> {
let idx = rand::thread_rng ().gen_range (0..playlist.len ()); let idx = rand::thread_rng ().gen_range (0..playlist.len ());
let filename = &playlist [idx]; let filename = &playlist [idx];
tracing::info! ("Set decoder to {}", filename); tracing::info! ("Set decoder to {}", filename);
thread_decoder.tx.send (MessageToDecoder::SetFile (Some (format! ("/home/user/music/{}", filename))))?; thread_decoder.tx.send (MessageToDecoder::SetFile (format! ("/home/user/music/{}", filename)))?;
current_file = Some (filename); current_file = Some (filename);
}, },
Message::RawSkip => { Message::RawSkip => {
thread_decoder.tx.send (MessageToDecoder::SetFile (None))?; thread_decoder.tx.send (MessageToDecoder::Clear)?;
}, },
} }
} }
@ -391,14 +378,21 @@ struct DecoderThread {
enum MessageToDecoder { enum MessageToDecoder {
NeedPcm, NeedPcm,
SetFile (Option <String>), SetFile (String),
SetUrl (String),
Clear,
Quit, Quit,
} }
enum Decoder {
File (decoder::Decoder <decoder::FfmpegDemuxer>),
Http (decoder::Decoder <decoder::HttpOggDemuxer>),
}
impl DecoderThread { impl DecoderThread {
pub fn new ( pub fn new (
shared_state: Arc <(Mutex <SharedState>, Condvar)>, shared_state: Arc <(Mutex <SharedState>, Condvar)>,
main_tx: fltk::app::Sender <Message>, main_tx: Option <fltk::app::Sender <Message>>,
) -> Self ) -> Self
{ {
let (tx, rx) = mpsc::sync_channel (4); let (tx, rx) = mpsc::sync_channel (4);
@ -406,9 +400,17 @@ impl DecoderThread {
let self_tx = tx.clone (); let self_tx = tx.clone ();
let join_handle = thread::spawn (move|| { let join_handle = thread::spawn (move|| {
let mut decoder: Option <decoder::Decoder> = None; let mut decoder: Option <Decoder> = None;
'thread: while let Ok (msg) = rx.recv () { match msg { 'thread: while let Ok (msg) = rx.recv () { match msg {
MessageToDecoder::Clear => {
decoder = None;
let new_buffers = decoder::PcmBuffers::default ();
self_tx.send (MessageToDecoder::NeedPcm)?;
let mut shared_state = shared_state.0.lock ().unwrap ();
shared_state.pcm_buffers = new_buffers;
},
MessageToDecoder::NeedPcm => { MessageToDecoder::NeedPcm => {
// tracing::trace! ("DecoderThread recv'd NeedPcm"); // tracing::trace! ("DecoderThread recv'd NeedPcm");
let mut bytes_produced = 0; let mut bytes_produced = 0;
@ -416,11 +418,10 @@ impl DecoderThread {
'pipeline: while bytes_produced < BUFFER_SIZE * 4 * 2 { 'pipeline: while bytes_produced < BUFFER_SIZE * 4 * 2 {
// tracing::trace! ("Pumping multi-file decode pipeline"); // tracing::trace! ("Pumping multi-file decode pipeline");
let frame = if let Some (decoder) = decoder.as_mut () { let frame = match &mut decoder {
decoder.next ()? None => None,
} Some (Decoder::File (x)) => x.next ()?,
else { Some (Decoder::Http (x)) => x.next ()?,
None
}; };
if let Some (frame) = frame { if let Some (frame) = frame {
@ -433,7 +434,8 @@ impl DecoderThread {
} }
else if decoder.is_some () { else if decoder.is_some () {
decoder = None; decoder = None;
main_tx.send (Message::DecoderStatus (false)); tracing::trace! ("Decoder ended, telling main thread");
main_tx.as_ref ().map (|x| x.send (Message::DecoderStatus (false)));
} }
{ {
@ -450,20 +452,22 @@ impl DecoderThread {
continue 'pipeline; continue 'pipeline;
} }
} }
// tracing::trace! ("Left NeedPcm handler");
// main_tx.send (Message::DecoderStatus (status));
}, },
MessageToDecoder::SetFile (x) => { MessageToDecoder::SetFile (x) => {
if let Some (x) = x.as_ref () { let (demuxer, codec) = decoder::FfmpegDemuxer::new (&x)?;
decoder = Some (decoder::Decoder::new (x)?); decoder = Some (Decoder::File (decoder::Decoder::new (demuxer, codec)?));
}
else {
decoder = None;
}
// main_tx.send (Message::DecoderStatus (x.is_some ()));
let new_buffers = decoder::PcmBuffers::default (); let new_buffers = decoder::PcmBuffers::default ();
self_tx.send (MessageToDecoder::NeedPcm)?; self_tx.send (MessageToDecoder::NeedPcm)?;
let mut shared_state = shared_state.0.lock ().unwrap ();
shared_state.pcm_buffers = new_buffers;
},
MessageToDecoder::SetUrl (x) => {
let (demuxer, codec) = decoder::HttpOggDemuxer::new (x)?;
decoder = Some (Decoder::Http (decoder::Decoder::new (demuxer, codec)?));
let new_buffers = decoder::PcmBuffers::default ();
self_tx.send (MessageToDecoder::NeedPcm)?;
let mut shared_state = shared_state.0.lock ().unwrap (); let mut shared_state = shared_state.0.lock ().unwrap ();
shared_state.pcm_buffers = new_buffers; shared_state.pcm_buffers = new_buffers;
}, },
@ -499,7 +503,6 @@ impl AudioOutput {
pub fn new ( pub fn new (
shared_state: Arc <(Mutex <SharedState>, Condvar)>, shared_state: Arc <(Mutex <SharedState>, Condvar)>,
decoder_tx: mpsc::SyncSender <MessageToDecoder>, decoder_tx: mpsc::SyncSender <MessageToDecoder>,
main_tx: fltk::app::Sender <Message>,
) -> Result <Self> ) -> Result <Self>
{ {
let host = cpal::default_host (); let host = cpal::default_host ();
@ -553,7 +556,6 @@ impl AudioOutput {
decoder_tx.try_send (MessageToDecoder::NeedPcm).ok (); decoder_tx.try_send (MessageToDecoder::NeedPcm).ok ();
// tracing::trace! ("AudioOutput sent NeedPcm"); // tracing::trace! ("AudioOutput sent NeedPcm");
} }
// main_tx.send (Message::AudioStreamStatus (status));
}, },
move |err| { move |err| {
tracing::error! ("{:?}", err); tracing::error! ("{:?}", err);