🚧 Split up watchers into client and server

main
_ 2020-10-31 21:53:09 -05:00
parent 15a62a5e2a
commit b673fa94c7
1 changed files with 15 additions and 17 deletions

View File

@ -32,15 +32,12 @@ use crate::{
}; };
use watcher::*; use watcher::*;
enum Message {
HttpRequestResponse (http_serde::WrappedRequest),
HttpResponseResponseStream ((http_serde::ResponseParts, Body)),
}
#[derive (Default)] #[derive (Default)]
struct ServerState { struct ServerState {
handlebars: Arc <Handlebars <'static>>, handlebars: Arc <Handlebars <'static>>,
watchers: Arc <Mutex <Watchers <Message>>>,
client_watchers: Arc <Mutex <Watchers <(http_serde::ResponseParts, Body)>>>,
server_watchers: Arc <Mutex <Watchers <http_serde::WrappedRequest>>>,
} }
fn status_reply <B: Into <Body>> (status: StatusCode, b: B) fn status_reply <B: Into <Body>> (status: StatusCode, b: B)
@ -53,8 +50,8 @@ async fn handle_http_listen (state: Arc <ServerState>, watcher_code: String)
-> Response <Body> -> Response <Body>
{ {
//println! ("Step 1"); //println! ("Step 1");
match Watchers::long_poll (state.watchers.clone (), watcher_code).await { match Watchers::long_poll (state.server_watchers.clone (), watcher_code).await {
Some (Message::HttpRequestResponse (parts)) => { Some (parts) => {
//println! ("Step 3"); //println! ("Step 3");
status_reply (StatusCode::OK, rmp_serde::to_vec (&parts).unwrap ()) status_reply (StatusCode::OK, rmp_serde::to_vec (&parts).unwrap ())
}, },
@ -74,10 +71,10 @@ async fn handle_http_response (
let resp_parts: http_serde::ResponseParts = rmp_serde::from_read_ref (&base64::decode (parts.headers.get (crate::PTTH_MAGIC_HEADER).unwrap ()).unwrap ()).unwrap (); let resp_parts: http_serde::ResponseParts = rmp_serde::from_read_ref (&base64::decode (parts.headers.get (crate::PTTH_MAGIC_HEADER).unwrap ()).unwrap ()).unwrap ();
{ {
let mut watchers = state.watchers.lock ().await; let mut watchers = state.client_watchers.lock ().await;
//println! ("Step 7"); //println! ("Step 7");
if ! watchers.wake_one (Message::HttpResponseResponseStream ((resp_parts, body)), &req_id) if ! watchers.wake_one ((resp_parts, body), &req_id)
{ {
println! ("Step 8 (bad thing)"); println! ("Step 8 (bad thing)");
status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n") status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n")
@ -117,7 +114,7 @@ async fn handle_http_request (
let id_2 = parts.id.clone (); let id_2 = parts.id.clone ();
{ {
let mut that = state.watchers.lock ().await; let mut that = state.client_watchers.lock ().await;
that.add_watcher_with_id (s, id_2) that.add_watcher_with_id (s, id_2)
} }
@ -125,23 +122,23 @@ async fn handle_http_request (
tokio::spawn (async move { tokio::spawn (async move {
{ {
let mut watchers = state.watchers.lock ().await; let mut watchers = state.server_watchers.lock ().await;
//println! ("Step 3"); //println! ("Step 3");
if ! watchers.wake_one (Message::HttpRequestResponse (parts), &watcher_code) { if ! watchers.wake_one (parts, &watcher_code) {
watchers.remove_watcher (&req_id); watchers.remove_watcher (&req_id);
} }
} }
delay_for (timeout).await; delay_for (timeout).await;
{ {
let mut that = state.watchers.lock ().await; let mut that = state.client_watchers.lock ().await;
that.remove_watcher (&req_id); that.remove_watcher (&req_id);
} }
}); });
match r.await { match r.await {
Ok (Message::HttpResponseResponseStream ((resp_parts, body))) => { Ok ((resp_parts, body)) => {
//println! ("Step 7"); //println! ("Step 7");
let mut resp = Response::builder () let mut resp = Response::builder ()
@ -205,7 +202,7 @@ async fn handle_all (req: Request <Body>, state: Arc <ServerState>)
} }
let names: Vec <_> = { let names: Vec <_> = {
state.watchers.lock ().await.senders.iter () state.server_watchers.lock ().await.senders.iter ()
.map (|(k, _)| (*k).clone ()) .map (|(k, _)| (*k).clone ())
.collect () .collect ()
}; };
@ -263,7 +260,8 @@ pub async fn main () -> Result <(), Box <dyn Error>> {
let state = ServerState { let state = ServerState {
handlebars: Arc::new (load_templates ()?), handlebars: Arc::new (load_templates ()?),
watchers: Default::default (), server_watchers: Default::default (),
client_watchers: Default::default (),
}; };
let state = Arc::new (state); let state = Arc::new (state);