diff --git a/src/relay/mod.rs b/src/relay/mod.rs index c8a5da1..ceb563e 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -32,15 +32,12 @@ use crate::{ }; use watcher::*; -enum Message { - HttpRequestResponse (http_serde::WrappedRequest), - HttpResponseResponseStream ((http_serde::ResponseParts, Body)), -} - #[derive (Default)] struct ServerState { handlebars: Arc >, - watchers: Arc >>, + + client_watchers: Arc >>, + server_watchers: Arc >>, } fn status_reply > (status: StatusCode, b: B) @@ -53,8 +50,8 @@ async fn handle_http_listen (state: Arc , watcher_code: String) -> Response { //println! ("Step 1"); - match Watchers::long_poll (state.watchers.clone (), watcher_code).await { - Some (Message::HttpRequestResponse (parts)) => { + match Watchers::long_poll (state.server_watchers.clone (), watcher_code).await { + Some (parts) => { //println! ("Step 3"); status_reply (StatusCode::OK, rmp_serde::to_vec (&parts).unwrap ()) }, @@ -74,10 +71,10 @@ async fn handle_http_response ( let resp_parts: http_serde::ResponseParts = rmp_serde::from_read_ref (&base64::decode (parts.headers.get (crate::PTTH_MAGIC_HEADER).unwrap ()).unwrap ()).unwrap (); { - let mut watchers = state.watchers.lock ().await; + let mut watchers = state.client_watchers.lock ().await; //println! ("Step 7"); - if ! watchers.wake_one (Message::HttpResponseResponseStream ((resp_parts, body)), &req_id) + if ! watchers.wake_one ((resp_parts, body), &req_id) { println! ("Step 8 (bad thing)"); status_reply (StatusCode::BAD_REQUEST, "A bad thing happened.\n") @@ -117,7 +114,7 @@ async fn handle_http_request ( let id_2 = parts.id.clone (); { - let mut that = state.watchers.lock ().await; + let mut that = state.client_watchers.lock ().await; that.add_watcher_with_id (s, id_2) } @@ -125,23 +122,23 @@ async fn handle_http_request ( tokio::spawn (async move { { - let mut watchers = state.watchers.lock ().await; + let mut watchers = state.server_watchers.lock ().await; //println! ("Step 3"); - if ! watchers.wake_one (Message::HttpRequestResponse (parts), &watcher_code) { + if ! watchers.wake_one (parts, &watcher_code) { watchers.remove_watcher (&req_id); } } delay_for (timeout).await; { - let mut that = state.watchers.lock ().await; + let mut that = state.client_watchers.lock ().await; that.remove_watcher (&req_id); } }); match r.await { - Ok (Message::HttpResponseResponseStream ((resp_parts, body))) => { + Ok ((resp_parts, body)) => { //println! ("Step 7"); let mut resp = Response::builder () @@ -205,7 +202,7 @@ async fn handle_all (req: Request , state: Arc ) } let names: Vec <_> = { - state.watchers.lock ().await.senders.iter () + state.server_watchers.lock ().await.senders.iter () .map (|(k, _)| (*k).clone ()) .collect () }; @@ -263,7 +260,8 @@ pub async fn main () -> Result <(), Box > { let state = ServerState { handlebars: Arc::new (load_templates ()?), - watchers: Default::default (), + server_watchers: Default::default (), + client_watchers: Default::default (), }; let state = Arc::new (state);