🚧 Set up streaming on the relay

however it doesn't pass a smoke test for video streaming yet
main
_ 2020-10-27 13:04:28 +00:00
parent 587849bcfa
commit b9db10b76a
3 changed files with 12 additions and 18 deletions

View File

@ -24,19 +24,23 @@ use hyper::{
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use tokio::{ use tokio::{
sync::mpsc::{
channel,
Receiver,
},
sync::Mutex, sync::Mutex,
time::delay_for, time::delay_for,
}; };
use ptth::watcher::Watchers; use ptth::watcher::Watchers;
#[derive (Clone)]
enum Message { enum Message {
Meow, Meow,
//HttpRequestRequest (String), //HttpRequestRequest (String),
HttpRequestResponse (String), HttpRequestResponse (String),
// HttpResponseRequest (String), // HttpResponseRequest (String),
HttpResponseResponse (Vec <u8>), HttpResponseResponse (Vec <u8>),
HttpResponseResponseStream (Body),
} }
#[derive (Default)] #[derive (Default)]
@ -93,13 +97,13 @@ async fn handle_http_response (
-> Response <Body> -> Response <Body>
{ {
println! ("Step 6"); println! ("Step 6");
let payload = hyper::body::to_bytes (req.into_body ()).await.unwrap ().to_vec (); let body = req.into_body ();
{ {
let mut watchers = state.watchers.lock ().await; let mut watchers = state.watchers.lock ().await;
println! ("Step 7"); println! ("Step 7");
if ! watchers.wake_one (Message::HttpResponseResponse (payload), &req_id) if ! watchers.wake_one (Message::HttpResponseResponseStream (body), &req_id)
{ {
println! ("Step 8"); println! ("Step 8");
return status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n"); return status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n");
@ -149,9 +153,9 @@ async fn handle_http_request (
}); });
match r.await { match r.await {
Ok (Message::HttpResponseResponse (s)) => { Ok (Message::HttpResponseResponseStream (body)) => {
println! ("Step 7"); println! ("Step 7");
status_reply (StatusCode::OK, s) status_reply (StatusCode::OK, body)
}, },
_ => status_reply (StatusCode::GATEWAY_TIMEOUT, "server didn't reply in time or somethin'"), _ => status_reply (StatusCode::GATEWAY_TIMEOUT, "server didn't reply in time or somethin'"),
} }

View File

@ -82,7 +82,7 @@ async fn main () -> Result <(), Box <dyn Error>> {
//let rx: Receiver <Vec <u8>> = rx; //let rx: Receiver <Vec <u8>> = rx;
tokio::spawn (async move { tokio::spawn (async move {
let path = "/home/user/pictures/bzqcChY.jpg"; let path = "/home/user/videos/Hidden_-_A_Gender_-_by_Kate_Bornstein-fFD8NpA3hec.mp4";
let mut f = File::open (path).await.unwrap (); let mut f = File::open (path).await.unwrap ();
let mut tx = tx; let mut tx = tx;

View File

@ -24,7 +24,7 @@ impl <T> Default for Watchers <T> {
} }
} }
impl <T: 'static + Clone + Send + Sync> Watchers <T> { impl <T: 'static + Send + Sync> Watchers <T> {
pub fn add_watcher_with_id (&mut self, s: oneshot::Sender <T>, id: String) { pub fn add_watcher_with_id (&mut self, s: oneshot::Sender <T>, id: String) {
self.senders.insert (id, s); self.senders.insert (id, s);
} }
@ -32,17 +32,7 @@ impl <T: 'static + Clone + Send + Sync> Watchers <T> {
pub fn remove_watcher (&mut self, id: &str) { pub fn remove_watcher (&mut self, id: &str) {
self.senders.remove (id); self.senders.remove (id);
} }
/*
fn wake_all (&mut self, msg: T) {
let waiters = {
std::mem::take (&mut self.senders)
};
for (_id, waiter) in waiters.into_iter () {
waiter.send (msg.clone ()).ok ();
}
}
*/
pub fn wake_one (&mut self, msg: T, id: &str) -> bool { pub fn wake_one (&mut self, msg: T, id: &str) -> bool {
println! ("wake_one {}", id); println! ("wake_one {}", id);