ptth/crates/ptth_relay/src/relay_state.rs

154 lines
3.2 KiB
Rust
Raw Normal View History

use std::{
collections::HashMap,
convert::TryFrom,
};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use tokio::sync::{
Mutex,
RwLock,
oneshot,
watch,
};
use crate::{
Body,
Config,
RelayError,
ShuttingDownError,
};
use ptth_core::http_serde;
/*
Here's what we need to handle:
When a request comes in:
- Park the client in response_rendezvous
- Look up the server ID in request_rendezvous
- If a server is parked, unpark it and send the request
- Otherwise, queue the request
When a server comes to listen:
- Look up the server ID in request_rendezvous
- Either return all pending requests, or park the server
When a server comes to respond:
- Look up the parked client in response_rendezvous
- Unpark the client and begin streaming
So we need these lookups to be fast:
- Server IDs, where (1 server) or (0 or many clients)
can be parked
- Request IDs, where 1 client is parked
*/
pub enum RequestRendezvous {
ParkedClients (Vec <http_serde::WrappedRequest>),
ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, ShuttingDownError>>),
}
type ResponseRendezvous = oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>;
#[derive (Clone)]
pub struct ServerStatus {
pub last_seen: DateTime <Utc>,
}
impl Default for ServerStatus {
fn default () -> Self {
Self {
last_seen: Utc::now (),
}
}
}
pub struct RelayState {
pub config: RwLock <Config>,
// Key: Server ID
pub request_rendezvous: Mutex <HashMap <String, RequestRendezvous>>,
pub server_status: Mutex <HashMap <String, ServerStatus>>,
// Key: Request ID
pub response_rendezvous: RwLock <DashMap <String, ResponseRendezvous>>,
pub shutdown_watch_tx: watch::Sender <bool>,
pub shutdown_watch_rx: watch::Receiver <bool>,
// List of recently rejected server names (Used to approve servers)
pub unregistered_servers: BoundedVec <RejectedServer>,
}
2021-04-03 03:14:23 +00:00
pub struct BoundedVec <T: Clone> {
bound: usize,
v: RwLock <Vec <T>>,
}
impl <T: Clone> BoundedVec <T> {
pub fn new (bound: usize) -> Self {
Self {
bound,
v: Default::default (),
}
}
2021-04-03 03:14:23 +00:00
pub async fn to_vec (&self) -> Vec <T> {
let guard = self.v.read ().await;
(*guard).clone ()
}
// Not mut because we have a RwLock
// One of the only problems with Rust is that
// 'mut' doesn't _really_ -mean 'mutable' once you're
// multi-threading or using async
pub async fn push (&self, x: T) {
2021-04-03 03:14:23 +00:00
let mut guard = self.v.write ().await;
guard.push (x);
while guard.len () > self.bound {
guard.remove (0);
}
}
}
#[derive (Clone)]
pub struct RejectedServer {
pub name: String,
pub tripcode: blake3::Hash,
pub seen: DateTime <Utc>,
}
impl TryFrom <Config> for RelayState {
type Error = RelayError;
fn try_from (config: Config) -> Result <Self, Self::Error> {
let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false);
Ok (Self {
config: config.into (),
request_rendezvous: Default::default (),
server_status: Default::default (),
response_rendezvous: Default::default (),
shutdown_watch_tx,
shutdown_watch_rx,
unregistered_servers: BoundedVec::new (20),
})
}
}
impl RelayState {
pub async fn list_servers (&self) -> Vec <String> {
self.request_rendezvous.lock ().await.iter ()
.map (|(k, _)| (*k).clone ())
.collect ()
}
}