ptth/crates/ptth_relay/src/server_endpoint.rs

213 lines
5.3 KiB
Rust

use std::{
time::Duration,
};
use chrono::Utc;
use futures::{
FutureExt,
stream::StreamExt,
};
use hyper::{
Body,
Response,
Request,
StatusCode,
};
use tokio::{
spawn,
sync::{
mpsc,
oneshot,
},
};
use tokio_stream::wrappers::ReceiverStream;
use ptth_core::{
http_serde,
prelude::*,
};
use super::{
error_reply,
errors::{
RequestError,
ShuttingDownError,
},
HandleHttpResponseError,
ok_reply,
Relay,
};
// Servers will come here and either handle queued requests from parked clients,
// or park themselves until a request comes in.
// Step 1
pub async fn handle_listen (
state: &Relay,
watcher_code: String,
)
-> Result <Response <Body>, RequestError>
{
use super::RequestRendezvous::*;
let now = Utc::now ();
{
// TODO: Move into relay_state.rs
let mut server_status = state.server_status.lock ().await;
let mut status = server_status.entry (watcher_code.clone ()).or_insert_with (Default::default);
status.last_seen = now;
}
let (tx, rx) = oneshot::channel ();
let listen_id = rusty_ulid::generate_ulid_string ();
{
let mut request_rendezvous = state.request_rendezvous.lock ().await;
if let Some (ParkedClients (v)) = request_rendezvous.remove (&watcher_code)
{
if ! v.is_empty () {
// 1 or more clients were parked - Make the server
// handle them immediately
debug! ("Sending {} parked requests to server {}", v.len (), watcher_code);
return Ok (ok_reply (rmp_serde::to_vec (&v)?)?);
}
}
debug! ("Parking server {}, listen id {}", watcher_code, listen_id);
request_rendezvous.insert (watcher_code.clone (), ParkedServer (tx));
}
// No clients were parked - make the server long-poll
futures::select! {
x = rx.fuse () => match x {
Ok (Ok (one_req)) => {
trace! ("Unparking server {}", watcher_code);
Ok (ok_reply (rmp_serde::to_vec (&vec! [one_req])?)?)
},
Ok (Err (ShuttingDownError::ShuttingDown)) => Ok (error_reply (StatusCode::SERVICE_UNAVAILABLE, "Server is shutting down, try again soon")?),
Err (e) => {
error! ("{} {} {}", watcher_code, listen_id, e);
Ok (error_reply (StatusCode::INTERNAL_SERVER_ERROR, "Server error")?)
},
},
_ = tokio::time::sleep (Duration::from_secs (25)).fuse () => {
debug! ("Timed out http_listen for server {} {}", watcher_code, listen_id);
return Ok (error_reply (StatusCode::NO_CONTENT, "No requests now, long-poll again")?)
}
}
}
/// Servers will come here to stream responses to clients.
/// (Step 5 in the docs)
pub async fn handle_response (
req: Request <Body>,
state: &Relay,
req_id: String,
)
-> Result <Response <Body>, HandleHttpResponseError>
{
#[derive (Debug)]
enum BodyFinishedReason {
StreamFinished,
ClientDisconnected,
}
use BodyFinishedReason::*;
use HandleHttpResponseError::*;
let (parts, mut body) = req.into_parts ();
let magic_header = parts.headers.get (ptth_core::PTTH_MAGIC_HEADER).ok_or (MissingPtthMagicHeader)?;
let magic_header = base64::decode (magic_header).map_err (PtthMagicHeaderNotBase64)?;
let resp_parts: http_serde::ResponseParts = rmp_serde::from_read_ref (&magic_header).map_err (PtthMagicHeaderNotMsgPack)?;
// Intercept the body packets here so we can check when the stream
// ends or errors out
let (body_tx, body_rx) = mpsc::channel (2);
let (body_finished_tx, body_finished_rx) = oneshot::channel ();
let mut shutdown_watch_rx = state.shutdown_watch_rx.clone ();
let relay_task = spawn (async move {
if *shutdown_watch_rx.borrow () {
debug! ("Can't relay bytes, relay is shutting down");
}
else {
loop {
let item = body.next ().await;
if let Some (item) = item {
if let Ok (bytes) = &item {
trace! ("Relaying {} bytes", bytes.len ());
}
futures::select! {
x = body_tx.send (item).fuse () => if x.is_err () {
info! ("Body closed while relaying. (Client hung up?)");
body_finished_tx.send (ClientDisconnected).map_err (|_| LostServer)?;
break;
},
_ = shutdown_watch_rx.changed ().fuse () => {
debug! ("Closing stream: relay is shutting down");
break;
},
}
}
else {
trace! ("Finished relaying bytes");
body_finished_tx.send (StreamFinished).map_err (|_| LostServer)?;
break;
}
}
}
Ok::<(), HandleHttpResponseError> (())
});
let body = Body::wrap_stream (ReceiverStream::new (body_rx));
let tx = {
let response_rendezvous = state.response_rendezvous.read ().await;
match response_rendezvous.remove (&req_id) {
None => {
error! ("Server tried to respond to non-existent request");
return Ok (error_reply (StatusCode::BAD_REQUEST, "Request ID not found in response_rendezvous")?);
},
Some ((_, x)) => x,
}
};
// UKAUFFY4 (Send half)
if tx.tx.send (Ok ((resp_parts, body))).is_err () {
let msg = "Failed to connect to client";
error! (msg);
return Ok (error_reply (StatusCode::BAD_GATEWAY, msg)?);
}
relay_task.await??;
trace! ("Connected server to client for streaming.");
match body_finished_rx.await {
Ok (StreamFinished) => {
Ok (error_reply (StatusCode::OK, "StreamFinished")?)
},
Ok (ClientDisconnected) => {
Ok (error_reply (StatusCode::OK, "ClientDisconnected")?)
},
Err (e) => {
debug! ("body_finished_rx {}", e);
Ok (error_reply (StatusCode::OK, "body_finished_rx Err")?)
},
}
}