213 lines
5.3 KiB
Rust
213 lines
5.3 KiB
Rust
use std::{
|
|
time::Duration,
|
|
};
|
|
|
|
use chrono::Utc;
|
|
use futures::{
|
|
FutureExt,
|
|
stream::StreamExt,
|
|
};
|
|
use hyper::{
|
|
Body,
|
|
Response,
|
|
Request,
|
|
StatusCode,
|
|
};
|
|
use tokio::{
|
|
spawn,
|
|
sync::{
|
|
mpsc,
|
|
oneshot,
|
|
},
|
|
};
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
|
|
use ptth_core::{
|
|
http_serde,
|
|
prelude::*,
|
|
};
|
|
|
|
use super::{
|
|
error_reply,
|
|
errors::{
|
|
RequestError,
|
|
ShuttingDownError,
|
|
},
|
|
HandleHttpResponseError,
|
|
ok_reply,
|
|
Relay,
|
|
};
|
|
|
|
// Servers will come here and either handle queued requests from parked clients,
|
|
// or park themselves until a request comes in.
|
|
// Step 1
|
|
|
|
pub async fn handle_listen (
|
|
state: &Relay,
|
|
watcher_code: String,
|
|
)
|
|
-> Result <Response <Body>, RequestError>
|
|
{
|
|
use super::RequestRendezvous::*;
|
|
|
|
let now = Utc::now ();
|
|
|
|
{
|
|
// TODO: Move into relay_state.rs
|
|
|
|
let mut server_status = state.server_status.lock ().await;
|
|
|
|
let mut status = server_status.entry (watcher_code.clone ()).or_insert_with (Default::default);
|
|
|
|
status.last_seen = now;
|
|
}
|
|
|
|
let (tx, rx) = oneshot::channel ();
|
|
|
|
let listen_id = rusty_ulid::generate_ulid_string ();
|
|
|
|
{
|
|
let mut request_rendezvous = state.request_rendezvous.lock ().await;
|
|
|
|
if let Some (ParkedClients (v)) = request_rendezvous.remove (&watcher_code)
|
|
{
|
|
if ! v.is_empty () {
|
|
// 1 or more clients were parked - Make the server
|
|
// handle them immediately
|
|
|
|
debug! ("Sending {} parked requests to server {}", v.len (), watcher_code);
|
|
return Ok (ok_reply (rmp_serde::to_vec (&v)?)?);
|
|
}
|
|
}
|
|
|
|
debug! ("Parking server {}, listen id {}", watcher_code, listen_id);
|
|
request_rendezvous.insert (watcher_code.clone (), ParkedServer (tx));
|
|
}
|
|
|
|
// No clients were parked - make the server long-poll
|
|
|
|
futures::select! {
|
|
x = rx.fuse () => match x {
|
|
Ok (Ok (one_req)) => {
|
|
trace! ("Unparking server {}", watcher_code);
|
|
Ok (ok_reply (rmp_serde::to_vec (&vec! [one_req])?)?)
|
|
},
|
|
Ok (Err (ShuttingDownError::ShuttingDown)) => Ok (error_reply (StatusCode::SERVICE_UNAVAILABLE, "Server is shutting down, try again soon")?),
|
|
Err (e) => {
|
|
error! ("{} {} {}", watcher_code, listen_id, e);
|
|
Ok (error_reply (StatusCode::INTERNAL_SERVER_ERROR, "Server error")?)
|
|
},
|
|
},
|
|
_ = tokio::time::sleep (Duration::from_secs (25)).fuse () => {
|
|
debug! ("Timed out http_listen for server {} {}", watcher_code, listen_id);
|
|
return Ok (error_reply (StatusCode::NO_CONTENT, "No requests now, long-poll again")?)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Servers will come here to stream responses to clients.
|
|
/// (Step 5 in the docs)
|
|
|
|
pub async fn handle_response (
|
|
req: Request <Body>,
|
|
state: &Relay,
|
|
req_id: String,
|
|
)
|
|
-> Result <Response <Body>, HandleHttpResponseError>
|
|
{
|
|
#[derive (Debug)]
|
|
enum BodyFinishedReason {
|
|
StreamFinished,
|
|
ClientDisconnected,
|
|
}
|
|
use BodyFinishedReason::*;
|
|
use HandleHttpResponseError::*;
|
|
|
|
let (parts, mut body) = req.into_parts ();
|
|
|
|
let magic_header = parts.headers.get (ptth_core::PTTH_MAGIC_HEADER).ok_or (MissingPtthMagicHeader)?;
|
|
|
|
let magic_header = base64::decode (magic_header).map_err (PtthMagicHeaderNotBase64)?;
|
|
|
|
let resp_parts: http_serde::ResponseParts = rmp_serde::from_read_ref (&magic_header).map_err (PtthMagicHeaderNotMsgPack)?;
|
|
|
|
// Intercept the body packets here so we can check when the stream
|
|
// ends or errors out
|
|
|
|
let (body_tx, body_rx) = mpsc::channel (2);
|
|
let (body_finished_tx, body_finished_rx) = oneshot::channel ();
|
|
let mut shutdown_watch_rx = state.shutdown_watch_rx.clone ();
|
|
|
|
let relay_task = spawn (async move {
|
|
if *shutdown_watch_rx.borrow () {
|
|
debug! ("Can't relay bytes, relay is shutting down");
|
|
}
|
|
else {
|
|
loop {
|
|
let item = body.next ().await;
|
|
|
|
if let Some (item) = item {
|
|
if let Ok (bytes) = &item {
|
|
trace! ("Relaying {} bytes", bytes.len ());
|
|
}
|
|
|
|
futures::select! {
|
|
x = body_tx.send (item).fuse () => if x.is_err () {
|
|
info! ("Body closed while relaying. (Client hung up?)");
|
|
body_finished_tx.send (ClientDisconnected).map_err (|_| LostServer)?;
|
|
break;
|
|
},
|
|
_ = shutdown_watch_rx.changed ().fuse () => {
|
|
debug! ("Closing stream: relay is shutting down");
|
|
break;
|
|
},
|
|
}
|
|
}
|
|
else {
|
|
trace! ("Finished relaying bytes");
|
|
body_finished_tx.send (StreamFinished).map_err (|_| LostServer)?;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok::<(), HandleHttpResponseError> (())
|
|
});
|
|
|
|
let body = Body::wrap_stream (ReceiverStream::new (body_rx));
|
|
|
|
let tx = {
|
|
let response_rendezvous = state.response_rendezvous.read ().await;
|
|
match response_rendezvous.remove (&req_id) {
|
|
None => {
|
|
error! ("Server tried to respond to non-existent request");
|
|
return Ok (error_reply (StatusCode::BAD_REQUEST, "Request ID not found in response_rendezvous")?);
|
|
},
|
|
Some ((_, x)) => x,
|
|
}
|
|
};
|
|
|
|
// UKAUFFY4 (Send half)
|
|
if tx.send (Ok ((resp_parts, body))).is_err () {
|
|
let msg = "Failed to connect to client";
|
|
error! (msg);
|
|
return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?);
|
|
}
|
|
|
|
relay_task.await??;
|
|
|
|
trace! ("Connected server to client for streaming.");
|
|
match body_finished_rx.await {
|
|
Ok (StreamFinished) => {
|
|
Ok (error_reply (StatusCode::OK, "StreamFinished")?)
|
|
},
|
|
Ok (ClientDisconnected) => {
|
|
Ok (error_reply (StatusCode::OK, "ClientDisconnected")?)
|
|
},
|
|
Err (e) => {
|
|
debug! ("body_finished_rx {}", e);
|
|
Ok (error_reply (StatusCode::OK, "body_finished_rx Err")?)
|
|
},
|
|
}
|
|
}
|