👕 refactor: Extract relay_state module

main
_ 2020-12-13 03:29:54 +00:00
parent 532f99f770
commit 5f947ed73c
5 changed files with 132 additions and 94 deletions

View File

@ -17,7 +17,6 @@
use std::{ use std::{
borrow::Cow, borrow::Cow,
collections::HashMap, collections::HashMap,
convert::TryFrom,
iter::FromIterator, iter::FromIterator,
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
@ -46,10 +45,7 @@ use serde::{
}; };
use tokio::{ use tokio::{
sync::{ sync::{
Mutex,
oneshot, oneshot,
RwLock,
watch,
}, },
}; };
@ -63,101 +59,15 @@ pub mod config;
pub mod errors; pub mod errors;
pub mod git_version; pub mod git_version;
pub mod key_validity; pub mod key_validity;
mod relay_state;
mod server_endpoint; mod server_endpoint;
pub use config::Config; pub use config::Config;
pub use errors::*; pub use errors::*;
pub use relay_state::RelayState;
/* use relay_state::*;
Here's what we need to handle:
When a request comes in:
- Park the client in response_rendezvous
- Look up the server ID in request_rendezvous
- If a server is parked, unpark it and send the request
- Otherwise, queue the request
When a server comes to listen:
- Look up the server ID in request_rendezvous
- Either return all pending requests, or park the server
When a server comes to respond:
- Look up the parked client in response_rendezvous
- Unpark the client and begin streaming
So we need these lookups to be fast:
- Server IDs, where (1 server) or (0 or many clients)
can be parked
- Request IDs, where 1 client is parked
*/
enum RequestRendezvous {
ParkedClients (Vec <http_serde::WrappedRequest>),
ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, ShuttingDownError>>),
}
type ResponseRendezvous = oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>;
#[derive (Clone)]
pub struct ServerStatus {
last_seen: DateTime <Utc>,
}
impl Default for ServerStatus {
fn default () -> Self {
Self {
last_seen: Utc::now (),
}
}
}
pub struct RelayState {
config: RwLock <Config>,
handlebars: Arc <Handlebars <'static>>,
// Key: Server ID
request_rendezvous: Mutex <HashMap <String, RequestRendezvous>>,
server_status: Mutex <HashMap <String, ServerStatus>>,
// Key: Request ID
response_rendezvous: RwLock <DashMap <String, ResponseRendezvous>>,
shutdown_watch_tx: watch::Sender <bool>,
shutdown_watch_rx: watch::Receiver <bool>,
}
impl TryFrom <Config> for RelayState {
type Error = RelayError;
fn try_from (config: Config) -> Result <Self, Self::Error> {
let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false);
Ok (Self {
config: config.into (),
handlebars: Arc::new (load_templates (&PathBuf::new ())?),
request_rendezvous: Default::default (),
server_status: Default::default (),
response_rendezvous: Default::default (),
shutdown_watch_tx,
shutdown_watch_rx,
})
}
}
impl RelayState {
pub async fn list_servers (&self) -> Vec <String> {
self.request_rendezvous.lock ().await.iter ()
.map (|(k, _)| (*k).clone ())
.collect ()
}
}
fn ok_reply <B: Into <Body>> (b: B) fn ok_reply <B: Into <Body>> (b: B)
-> Result <Response <Body>, http::Error> -> Result <Response <Body>, http::Error>

View File

@ -0,0 +1,116 @@
use std::{
collections::HashMap,
convert::TryFrom,
path::{PathBuf},
sync::Arc,
};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use handlebars::Handlebars;
use tokio::sync::{
Mutex,
RwLock,
oneshot,
watch,
};
use crate::{
Body,
Config,
RelayError,
ShuttingDownError,
load_templates,
};
use ptth_core::http_serde;
/*
Here's what we need to handle:
When a request comes in:
- Park the client in response_rendezvous
- Look up the server ID in request_rendezvous
- If a server is parked, unpark it and send the request
- Otherwise, queue the request
When a server comes to listen:
- Look up the server ID in request_rendezvous
- Either return all pending requests, or park the server
When a server comes to respond:
- Look up the parked client in response_rendezvous
- Unpark the client and begin streaming
So we need these lookups to be fast:
- Server IDs, where (1 server) or (0 or many clients)
can be parked
- Request IDs, where 1 client is parked
*/
pub enum RequestRendezvous {
ParkedClients (Vec <http_serde::WrappedRequest>),
ParkedServer (oneshot::Sender <Result <http_serde::WrappedRequest, ShuttingDownError>>),
}
type ResponseRendezvous = oneshot::Sender <Result <(http_serde::ResponseParts, Body), ShuttingDownError>>;
#[derive (Clone)]
pub struct ServerStatus {
pub last_seen: DateTime <Utc>,
}
impl Default for ServerStatus {
fn default () -> Self {
Self {
last_seen: Utc::now (),
}
}
}
pub struct RelayState {
pub config: RwLock <Config>,
pub handlebars: Arc <Handlebars <'static>>,
// Key: Server ID
pub request_rendezvous: Mutex <HashMap <String, RequestRendezvous>>,
pub server_status: Mutex <HashMap <String, ServerStatus>>,
// Key: Request ID
pub response_rendezvous: RwLock <DashMap <String, ResponseRendezvous>>,
pub shutdown_watch_tx: watch::Sender <bool>,
pub shutdown_watch_rx: watch::Receiver <bool>,
}
impl TryFrom <Config> for RelayState {
type Error = RelayError;
fn try_from (config: Config) -> Result <Self, Self::Error> {
let (shutdown_watch_tx, shutdown_watch_rx) = watch::channel (false);
Ok (Self {
config: config.into (),
handlebars: Arc::new (load_templates (&PathBuf::new ())?),
request_rendezvous: Default::default (),
server_status: Default::default (),
response_rendezvous: Default::default (),
shutdown_watch_tx,
shutdown_watch_rx,
})
}
}
impl RelayState {
pub async fn list_servers (&self) -> Vec <String> {
self.request_rendezvous.lock ().await.iter ()
.map (|(k, _)| (*k).clone ())
.collect ()
}
}

View File

@ -75,6 +75,8 @@ pub async fn handle_listen (
// End of early returns // End of early returns
{ {
// TODO: Move into relay_state.rs
let mut server_status = state.server_status.lock ().await; let mut server_status = state.server_status.lock ().await;
let mut status = server_status.entry (watcher_code.clone ()).or_insert_with (Default::default); let mut status = server_status.entry (watcher_code.clone ()).or_insert_with (Default::default);

View File

@ -1,3 +1,5 @@
use tokio::runtime::Runtime;
use super::*; use super::*;
#[test] #[test]
@ -23,3 +25,12 @@ fn test_pretty_print_last_seen () {
assert_eq! (actual, expected); assert_eq! (actual, expected);
} }
} }
#[test]
fn scraper_endpoints () {
let mut rt = Runtime::new ().expect ("Can't create runtime for testing");
rt.block_on (async {
});
}

View File

@ -1,7 +1,6 @@
Interesting issues will get a unique ID with Interesting issues will get a unique ID with
`dd if=/dev/urandom bs=5 count=1 | base32` `dd if=/dev/urandom bs=5 count=1 | base32`
- Report server version in HTML
- [YNQAQKJS](issues/2020-12Dec/auth-route-YNQAQKJS.md) Open new auth route for spiders / scrapers - [YNQAQKJS](issues/2020-12Dec/auth-route-YNQAQKJS.md) Open new auth route for spiders / scrapers
- Track / Estimate bandwidth per server? - Track / Estimate bandwidth per server?
- EOTPXGR3 Remote `tail -f` (_Complicated_) (Maybe use chunked encoding or something?) - EOTPXGR3 Remote `tail -f` (_Complicated_) (Maybe use chunked encoding or something?)