111 lines
2.3 KiB
Rust
111 lines
2.3 KiB
Rust
use std::{
|
|
collections::HashMap,
|
|
convert::TryFrom,
|
|
};
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use dashmap::DashMap;
|
|
use tokio::sync::{
|
|
Mutex,
|
|
RwLock,
|
|
oneshot,
|
|
watch,
|
|
};
|
|
|
|
use crate::{
|
|
Body,
|
|
Config,
|
|
RelayError,
|
|
ShuttingDownError,
|
|
};
|
|
|
|
use ptth_core::http_serde;
|
|
|
|
/*
|
|
|
|
Here's what we need to handle:
|
|
|
|
When a request comes in:
|
|
|
|
- Park the client in response_rendezvous
|
|
- Look up the server ID in request_rendezvous
|
|
- If a server is parked, unpark it and send the request
|
|
- Otherwise, queue the request
|
|
|
|
When a server comes to listen:
|
|
|
|
- Look up the server ID in request_rendezvous
|
|
- Either return all pending requests, or park the server
|
|
|
|
When a server comes to respond:
|
|
|
|
- Look up the parked client in response_rendezvous
|
|
- Unpark the client and begin streaming
|
|
|
|
So we need these lookups to be fast:
|
|
|
|
- Server IDs, where (1 server) or (0 or many clients)
|
|
can be parked
|
|
- Request IDs, where 1 client is parked
|
|
|
|
*/
|
|
|
|
pub enum RequestRendezvous {
|
|
ParkedClients (Vec <http_serde::WrappedRequest>),
|
|
ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, ShuttingDownError>>),
|
|
}
|
|
|
|
type ResponseRendezvous = oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>;
|
|
|
|
#[derive (Clone)]
|
|
pub struct ServerStatus {
|
|
pub last_seen: DateTime <Utc>,
|
|
}
|
|
|
|
impl Default for ServerStatus {
|
|
fn default () -> Self {
|
|
Self {
|
|
last_seen: Utc::now (),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct RelayState {
|
|
pub config: RwLock <Config>,
|
|
|
|
// Key: Server ID
|
|
pub request_rendezvous: Mutex <HashMap <String, RequestRendezvous>>,
|
|
pub server_status: Mutex <HashMap <String, ServerStatus>>,
|
|
|
|
// Key: Request ID
|
|
pub response_rendezvous: RwLock <DashMap <String, ResponseRendezvous>>,
|
|
|
|
pub shutdown_watch_tx: watch::Sender <bool>,
|
|
pub shutdown_watch_rx: watch::Receiver <bool>,
|
|
}
|
|
|
|
impl TryFrom <Config> for RelayState {
|
|
type Error = RelayError;
|
|
|
|
fn try_from (config: Config) -> Result <Self, Self::Error> {
|
|
let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false);
|
|
|
|
Ok (Self {
|
|
config: config.into (),
|
|
request_rendezvous: Default::default (),
|
|
server_status: Default::default (),
|
|
response_rendezvous: Default::default (),
|
|
shutdown_watch_tx,
|
|
shutdown_watch_rx,
|
|
})
|
|
}
|
|
}
|
|
|
|
impl RelayState {
|
|
pub async fn list_servers (&self) -> Vec <String> {
|
|
self.request_rendezvous.lock ().await.iter ()
|
|
.map (|(k, _)| (*k).clone ())
|
|
.collect ()
|
|
}
|
|
}
|