Compare commits
No commits in common. "843fddb568b071be67e0d10c6a82cf69fc0e586b" and "dfc6885b8c4d097b6fa4306441dbda6e6d204f70" have entirely different histories.
843fddb568
...
dfc6885b8c
|
@ -281,7 +281,7 @@ version = "3.2.4"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1631ca6e3c59112501a9d87fd86f21591ff77acd31331e8a73f8d80a65bbdd71"
|
||||
dependencies = [
|
||||
"nix",
|
||||
"nix 0.26.1",
|
||||
"windows-sys 0.42.0",
|
||||
]
|
||||
|
||||
|
@ -774,6 +774,19 @@ dependencies = [
|
|||
"hashbrown",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "insecure_chat"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"hyper",
|
||||
"mac_address",
|
||||
"nix 0.25.1",
|
||||
"ptth_diceware",
|
||||
"rand",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "instant"
|
||||
version = "0.1.12"
|
||||
|
@ -853,6 +866,16 @@ dependencies = [
|
|||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mac_address"
|
||||
version = "1.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b238e3235c8382b7653c6408ed1b08dd379bdb9fdf990fb0bbae3db2cc0ae963"
|
||||
dependencies = [
|
||||
"nix 0.23.2",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "maplit"
|
||||
version = "1.0.2"
|
||||
|
@ -874,6 +897,15 @@ version = "2.5.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.6.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "metrics_test"
|
||||
version = "0.1.0"
|
||||
|
@ -918,6 +950,33 @@ dependencies = [
|
|||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.23.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f3790c00a0150112de0f4cd161e3d7fc4b2d8a5542ffc35f099a2562aecb35c"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"memoffset",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.25.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"memoffset",
|
||||
"pin-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.26.1"
|
||||
|
@ -1258,6 +1317,17 @@ dependencies = [
|
|||
"uom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ptth_kv"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64 0.13.1",
|
||||
"hyper",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ptth_multi_call_server"
|
||||
version = "1.1.1"
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
[package]
|
||||
name = "insecure_chat"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
hyper = { version = "0.14.20", features = ["http1", "server", "tcp"] }
|
||||
mac_address = "1.1.4"
|
||||
nix = "0.25.0"
|
||||
ptth_diceware = { path = "../ptth_diceware" }
|
||||
rand = "0.8.5"
|
||||
thiserror = "1.0.37"
|
||||
tokio = { version = "1.21.2", features = ["net", "rt-multi-thread", "fs"] }
|
|
@ -0,0 +1,125 @@
|
|||
// IP address module
|
||||
// Copied from the `lookaround` project
|
||||
|
||||
use std::{
|
||||
net::Ipv4Addr,
|
||||
process::Command,
|
||||
str::FromStr,
|
||||
};
|
||||
|
||||
#[derive (Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error (transparent)]
|
||||
Io (#[from] std::io::Error),
|
||||
#[error (transparent)]
|
||||
FromUtf8 (#[from] std::string::FromUtf8Error),
|
||||
#[error ("Self-IP detection is not implemented on Mac OS")]
|
||||
NotImplementedOnMac,
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub fn get_ips () -> Result <Vec <Ipv4Addr>, Error> {
|
||||
let output = linux::get_ip_addr_output ()?;
|
||||
|
||||
Ok (linux::parse_ip_addr_output (&output))
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
pub fn get_ips () -> Result <Vec <Ipv4Addr>, Error> {
|
||||
Err (Error::NotImplementedOnMac)
|
||||
}
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
pub fn get_ips () -> Result <Vec <Ipv4Addr>, Error> {
|
||||
let output = windows::get_ip_config_output ()?;
|
||||
|
||||
Ok (windows::parse_ip_config_output (&output))
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod linux {
|
||||
use super::*;
|
||||
|
||||
pub fn get_ip_addr_output () -> Result <String, Error> {
|
||||
let output = Command::new ("ip")
|
||||
.arg ("addr")
|
||||
.output ()?;
|
||||
let output = output.stdout.as_slice ();
|
||||
let output = String::from_utf8 (output.to_vec ())?;
|
||||
Ok (output)
|
||||
}
|
||||
|
||||
pub fn parse_ip_addr_output (output: &str) -> Vec <Ipv4Addr> {
|
||||
// I wrote this in FP style because I was bored.
|
||||
|
||||
output.lines ()
|
||||
.map (|l| l.trim_start ())
|
||||
.filter_map (|l| l.strip_prefix ("inet "))
|
||||
.filter_map (|l| l.find ('/').map (|x| &l [0..x]))
|
||||
.filter_map (|l| Ipv4Addr::from_str (l).ok ())
|
||||
.filter (|a| ! a.is_loopback ())
|
||||
.collect ()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
pub mod windows {
|
||||
use super::*;
|
||||
|
||||
pub fn get_ip_config_output () -> Result <String, Error> {
|
||||
let output = Command::new ("ipconfig")
|
||||
.output ()?;
|
||||
let output = output.stdout.as_slice ();
|
||||
let output = String::from_utf8 (output.to_vec ())?;
|
||||
Ok (output)
|
||||
}
|
||||
|
||||
pub fn parse_ip_config_output (output: &str) -> Vec <Ipv4Addr> {
|
||||
let mut addrs = vec! [];
|
||||
|
||||
for line in output.lines () {
|
||||
let line = line.trim_start ();
|
||||
|
||||
// Maybe only works on English locales?
|
||||
if ! line.starts_with ("IPv4 Address") {
|
||||
continue;
|
||||
}
|
||||
let colon_pos = match line.find (':') {
|
||||
None => continue,
|
||||
Some (x) => x,
|
||||
};
|
||||
let line = &line [colon_pos + 2..];
|
||||
|
||||
let addr = match Ipv4Addr::from_str (line) {
|
||||
Err (_) => continue,
|
||||
Ok (x) => x,
|
||||
};
|
||||
|
||||
addrs.push (addr);
|
||||
}
|
||||
|
||||
addrs
|
||||
}
|
||||
|
||||
#[cfg (test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test () {
|
||||
for (input, expected) in [
|
||||
(
|
||||
r"
|
||||
IPv4 Address . . .. . . . : 192.168.1.1
|
||||
",
|
||||
vec! [
|
||||
Ipv4Addr::new (192, 168, 1, 1),
|
||||
]
|
||||
),
|
||||
] {
|
||||
let actual = parse_ip_config_output (input);
|
||||
assert_eq! (actual, expected);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,375 @@
|
|||
use std::{
|
||||
collections::*,
|
||||
net::{
|
||||
Ipv4Addr,
|
||||
SocketAddrV4,
|
||||
},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use hyper::{
|
||||
Body,
|
||||
Method,
|
||||
Request,
|
||||
Response,
|
||||
Server,
|
||||
StatusCode,
|
||||
service::{
|
||||
make_service_fn,
|
||||
service_fn,
|
||||
},
|
||||
};
|
||||
|
||||
use tokio::{
|
||||
net::UdpSocket,
|
||||
sync::RwLock,
|
||||
};
|
||||
|
||||
mod ip;
|
||||
mod tlv;
|
||||
|
||||
fn main () -> Result <(), Error>
|
||||
{
|
||||
let mut args = std::env::args ();
|
||||
let mut bail_unknown = true;
|
||||
let mut last_unknown = None;
|
||||
let mut name = ptth_diceware::passphrase ("_", 3);
|
||||
let mut subcommand_count = 0;
|
||||
let mut subcommand = None;
|
||||
|
||||
args.next ();
|
||||
while let Some (arg) = args.next () {
|
||||
if arg == "--ignore-unknown" {
|
||||
bail_unknown = false;
|
||||
}
|
||||
if arg == "--name" {
|
||||
name = args.next ().unwrap ().to_string ();
|
||||
}
|
||||
else if arg == "peer" {
|
||||
subcommand = Some (Subcommand::Peer);
|
||||
subcommand_count += 1;
|
||||
}
|
||||
else if arg == "receiver" {
|
||||
subcommand = Some (Subcommand::Receiver);
|
||||
subcommand_count += 1;
|
||||
}
|
||||
else if arg == "sender" {
|
||||
subcommand = Some (Subcommand::Sender);
|
||||
subcommand_count += 1;
|
||||
}
|
||||
else if arg == "spy" {
|
||||
subcommand = Some (Subcommand::Spy);
|
||||
subcommand_count += 1;
|
||||
}
|
||||
else {
|
||||
last_unknown = Some (arg);
|
||||
}
|
||||
}
|
||||
|
||||
if bail_unknown {
|
||||
if let Some (last_unknown) = last_unknown {
|
||||
eprintln! ("Unknown argument `{}`", last_unknown);
|
||||
return Err (Error::Args);
|
||||
}
|
||||
}
|
||||
|
||||
if subcommand_count >= 2 {
|
||||
eprintln! ("Detected {} subcommands in arguments", subcommand_count);
|
||||
return Err (Error::Args)
|
||||
}
|
||||
|
||||
let rt = tokio::runtime::Runtime::new ()?;
|
||||
|
||||
let params = Params::default ();
|
||||
|
||||
rt.block_on (async {
|
||||
if let Some (cmd) = subcommand {
|
||||
return match cmd {
|
||||
Subcommand::Peer => peer (params).await,
|
||||
Subcommand::Receiver => receiver (params).await,
|
||||
Subcommand::Sender => sender (params).await,
|
||||
Subcommand::Spy => spy (params),
|
||||
};
|
||||
}
|
||||
|
||||
println! ("Name is `{}`", name);
|
||||
|
||||
Ok::<_, Error> (())
|
||||
})?;
|
||||
|
||||
Ok (())
|
||||
}
|
||||
|
||||
enum Subcommand {
|
||||
Peer,
|
||||
Receiver,
|
||||
Sender,
|
||||
Spy,
|
||||
}
|
||||
|
||||
struct Params {
|
||||
multicast_group: (Ipv4Addr, u16),
|
||||
}
|
||||
|
||||
impl Default for Params {
|
||||
fn default () -> Self {
|
||||
let multicast_group = (Ipv4Addr::new (225, 100, 99, 98), 9041);
|
||||
|
||||
Self {
|
||||
multicast_group,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn peer (params: Params) -> Result <(), Error>
|
||||
{
|
||||
use rand::Rng;
|
||||
|
||||
let mut id = [0];
|
||||
rand::thread_rng ().try_fill (&mut id).or (Err (Error::Rand))?;
|
||||
let (multicast_addr, multicast_port) = params.multicast_group;
|
||||
let socket = tokio::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, multicast_port)).await?;
|
||||
|
||||
socket.join_multicast_v4 (multicast_addr, Ipv4Addr::UNSPECIFIED)?;
|
||||
eprintln! ("Multicast group is {:?}", params.multicast_group);
|
||||
eprintln! ("Local addr is {}", socket.local_addr ()?);
|
||||
|
||||
let peer = Peer {
|
||||
id: id [0],
|
||||
outbox: Outbox {
|
||||
index: 1000,
|
||||
messages: Default::default (),
|
||||
}.into (),
|
||||
params,
|
||||
socket,
|
||||
};
|
||||
|
||||
eprintln! ("Random peer ID is {}", peer.id);
|
||||
|
||||
let state = Arc::new (peer);
|
||||
|
||||
{
|
||||
let state = Arc::clone (&state);
|
||||
tokio::spawn (async move {
|
||||
let mut interval = tokio::time::interval (Duration::from_secs (25));
|
||||
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
loop {
|
||||
interval.tick ().await;
|
||||
|
||||
state.send_multicast (&tlv::Message::IAmOnline { peer_id: state.id }).await.ok ();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let state = Arc::clone (&state);
|
||||
tokio::spawn (async move {
|
||||
loop {
|
||||
let mut buf = vec! [0u8; 2048];
|
||||
let (bytes_recved, addr) = match state.socket.recv_from (&mut buf).await
|
||||
{
|
||||
Err (_) => {
|
||||
tokio::time::sleep (Duration::from_secs (10)).await;
|
||||
continue;
|
||||
},
|
||||
Ok (x) => x,
|
||||
};
|
||||
let buf = &buf [0..bytes_recved];
|
||||
|
||||
let msg = match tlv::decode (buf) {
|
||||
Err (_) => {
|
||||
eprintln! ("ZAT4ERXR Couldn't decode message");
|
||||
continue;
|
||||
},
|
||||
Ok (x) => x,
|
||||
};
|
||||
|
||||
println! ("Received {:?}", msg);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let make_svc = make_service_fn (|_conn| {
|
||||
let state = Arc::clone (&state);
|
||||
|
||||
async {
|
||||
Ok::<_, String> (service_fn (move |req| {
|
||||
let state = state.clone ();
|
||||
peer_handle_all (req, state)
|
||||
}))
|
||||
}
|
||||
});
|
||||
|
||||
let addr = std::net::SocketAddr::from (([127, 0, 0, 1], multicast_port));
|
||||
let server = Server::bind (&addr)
|
||||
.serve (make_svc);
|
||||
|
||||
eprintln! ("Local UI on {}", addr);
|
||||
|
||||
server.await?;
|
||||
|
||||
Ok (())
|
||||
}
|
||||
|
||||
struct Peer {
|
||||
id: u32,
|
||||
outbox: RwLock <Outbox>,
|
||||
params: Params,
|
||||
socket: UdpSocket,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
async fn send_multicast (&self, msg: &tlv::Message) -> Result <(), Error>
|
||||
{
|
||||
let msg = tlv::encode (&msg)?;
|
||||
self.socket.send_to (&msg, self.params.multicast_group).await?;
|
||||
Ok (())
|
||||
}
|
||||
}
|
||||
|
||||
struct Outbox {
|
||||
index: u32,
|
||||
messages: VecDeque <SentMessage>,
|
||||
}
|
||||
|
||||
struct SentMessage {
|
||||
index: u32,
|
||||
body: Vec <u8>,
|
||||
}
|
||||
|
||||
async fn peer_handle_all (req: Request <Body>, state: Arc <Peer>)
|
||||
-> Result <Response <Body>, Error>
|
||||
{
|
||||
if req.method () == Method::POST {
|
||||
if req.uri () == "/paste" {
|
||||
let body = hyper::body::to_bytes (req.into_body ()).await?;
|
||||
if body.len () > 1024 {
|
||||
let resp = Response::builder ()
|
||||
.status (StatusCode::BAD_REQUEST)
|
||||
.body (Body::from ("Message body must be <= 1024 bytes"))?;
|
||||
return Ok (resp);
|
||||
}
|
||||
let body = body.to_vec ();
|
||||
|
||||
let msg_index;
|
||||
{
|
||||
let mut outbox = state.outbox.write ().await;
|
||||
let msg = SentMessage {
|
||||
index: outbox.index,
|
||||
body: body.clone (),
|
||||
};
|
||||
msg_index = msg.index;
|
||||
outbox.messages.push_back (msg);
|
||||
if outbox.messages.len () > 10 {
|
||||
outbox.messages.pop_front ();
|
||||
}
|
||||
outbox.index += 1;
|
||||
}
|
||||
|
||||
match state.send_multicast (&tlv::Message::IHaveMessage {
|
||||
peer_id: state.id,
|
||||
msg_index,
|
||||
body,
|
||||
}).await {
|
||||
Ok (_) => (),
|
||||
Err (_) => return Ok (
|
||||
Response::builder ()
|
||||
.status (StatusCode::BAD_REQUEST)
|
||||
.body (Body::from ("Can't encode message"))?
|
||||
),
|
||||
}
|
||||
|
||||
return Ok (Response::new (format! ("Pasted message {}\n", msg_index).into ()));
|
||||
}
|
||||
}
|
||||
|
||||
Ok (Response::new (":V\n".into ()))
|
||||
}
|
||||
|
||||
async fn receiver (params: Params) -> Result <(), Error>
|
||||
{
|
||||
let (multicast_addr, multicast_port) = params.multicast_group;
|
||||
let socket = tokio::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, multicast_port)).await?;
|
||||
|
||||
socket.join_multicast_v4 (multicast_addr, Ipv4Addr::UNSPECIFIED)?;
|
||||
eprintln! ("Multicast group is {:?}", params.multicast_group);
|
||||
eprintln! ("Local addr is {}", socket.local_addr ()?);
|
||||
|
||||
loop {
|
||||
let mut buf = vec! [0u8; 2048];
|
||||
|
||||
let (bytes_recved, remote_addr) = socket.recv_from (&mut buf).await?;
|
||||
buf.truncate (bytes_recved);
|
||||
|
||||
println! ("Received {} bytes from {}", bytes_recved, remote_addr);
|
||||
}
|
||||
}
|
||||
|
||||
async fn sender (params: Params) -> Result <(), Error>
|
||||
{
|
||||
let (multicast_addr, multicast_port) = params.multicast_group;
|
||||
let socket = tokio::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?;
|
||||
|
||||
socket.join_multicast_v4 (multicast_addr, Ipv4Addr::UNSPECIFIED)?;
|
||||
eprintln! ("Multicast group is {:?}", params.multicast_group);
|
||||
eprintln! ("Local addr is {}", socket.local_addr ()?);
|
||||
|
||||
socket.send_to (&[], params.multicast_group).await?;
|
||||
|
||||
Ok (())
|
||||
}
|
||||
|
||||
fn spy (params: Params) -> Result <(), Error>
|
||||
{
|
||||
let (multicast_addr, multicast_port) = params.multicast_group;
|
||||
|
||||
let socket = match std::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, multicast_port)) {
|
||||
Ok (x) => x,
|
||||
Err (e) => if e.kind () == std::io::ErrorKind::AddrInUse {
|
||||
eprintln! ("Address in use. You can only run 1 instance of Insecure Chat at a time, even in spy mode.");
|
||||
return Err (Error::AddrInUse);
|
||||
}
|
||||
else {
|
||||
return Err (e.into ());
|
||||
}
|
||||
};
|
||||
|
||||
for bind_addr in ip::get_ips ()? {
|
||||
socket.join_multicast_v4 (&multicast_addr, &bind_addr)?;
|
||||
// eprintln! ("Joined multicast with {}", bind_addr);
|
||||
}
|
||||
eprintln! ("Multicast addr is {}", multicast_addr);
|
||||
eprintln! ("Local addr is {}", socket.local_addr ()?);
|
||||
|
||||
loop {
|
||||
let mut buf = vec! [0u8; 2048];
|
||||
|
||||
eprintln! ("Listening for UDP packets...");
|
||||
let (bytes_recved, remote_addr) = socket.recv_from (&mut buf)?;
|
||||
buf.truncate (bytes_recved);
|
||||
|
||||
println! ("Received {} bytes from {}", bytes_recved, remote_addr);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive (Debug, thiserror::Error)]
|
||||
enum Error {
|
||||
#[error ("Address in use")]
|
||||
AddrInUse,
|
||||
#[error ("CLI args")]
|
||||
Args,
|
||||
#[error (transparent)]
|
||||
Hyper (#[from] hyper::Error),
|
||||
#[error (transparent)]
|
||||
HyperHttp (#[from] hyper::http::Error),
|
||||
#[error (transparent)]
|
||||
Io (#[from] std::io::Error),
|
||||
#[error (transparent)]
|
||||
Ip (#[from] ip::Error),
|
||||
#[error ("Randomness")]
|
||||
Rand,
|
||||
#[error (transparent)]
|
||||
Tlv (#[from] tlv::Error),
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
// A crummy ad-hoc TLV cause I'm lazy and these are fun
|
||||
|
||||
#[derive (Debug, PartialEq)]
|
||||
pub enum Message {
|
||||
IAmOnline {
|
||||
peer_id: u32,
|
||||
},
|
||||
IHaveMessage {
|
||||
peer_id: u32,
|
||||
msg_index: u32,
|
||||
body: Vec <u8>,
|
||||
},
|
||||
IAcknowledgeYourMessage {
|
||||
msg_index: u32,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive (Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error ("Bad magic number")]
|
||||
BadMagic,
|
||||
#[error ("Data is too big to encode / decode")]
|
||||
DataTooBig,
|
||||
#[error ("Required field missing while decoding")]
|
||||
MissingField,
|
||||
#[error ("No type in TLV")]
|
||||
NoType,
|
||||
#[error ("Unrecognized TLV type")]
|
||||
UnrecognizedType,
|
||||
}
|
||||
|
||||
const MAGIC: [u8; 4] = [55, 11, 101, 38];
|
||||
|
||||
pub fn decode (src: &[u8]) -> Result <Message, Error>
|
||||
{
|
||||
let (magic, src) = take_bytes (src, 4).ok_or (Error::BadMagic)?;
|
||||
if magic != MAGIC {
|
||||
return Err (Error::BadMagic);
|
||||
}
|
||||
|
||||
let (msg_type, src) = take_bytes(src, 1).ok_or(Error::NoType)?;
|
||||
|
||||
Ok (match msg_type [0] {
|
||||
1 => {
|
||||
let (peer_id, src) = take_u32 (src).ok_or (Error::MissingField)?;
|
||||
Message::IAmOnline { peer_id, }
|
||||
},
|
||||
2 => {
|
||||
let (peer_id, src) = take_u32 (src).ok_or (Error::MissingField)?;
|
||||
let (msg_index, src) = take_u32 (src).ok_or (Error::MissingField)?;
|
||||
let (body_len, src) = take_u32 (src).ok_or (Error::MissingField)?;
|
||||
let (body_bytes, src) = take_bytes(src, body_len as usize).ok_or(Error::MissingField)?;
|
||||
let body = body_bytes.into ();
|
||||
Message::IHaveMessage { peer_id, msg_index, body, }
|
||||
},
|
||||
3 => {
|
||||
let (msg_index, src) = take_u32 (src).ok_or (Error::MissingField)?;
|
||||
Message::IAcknowledgeYourMessage { msg_index, }
|
||||
},
|
||||
_ => return Err (Error::UnrecognizedType),
|
||||
})
|
||||
}
|
||||
|
||||
fn take_u32 (src: &[u8]) -> Option <(u32, &[u8])>
|
||||
{
|
||||
let (a, b) = take_bytes (src, 4)?;
|
||||
Some ((u32::from_le_bytes([a [0], a [1], a [2], a [3]]), b))
|
||||
}
|
||||
|
||||
fn take_bytes (src: &[u8], n: usize) -> Option <(&[u8], &[u8])>
|
||||
{
|
||||
if src.len () < n {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some (src.split_at (n))
|
||||
}
|
||||
|
||||
pub fn encode (msg: &Message) -> Result <Vec <u8>, Error>
|
||||
{
|
||||
let mut buf = Vec::from (MAGIC);
|
||||
|
||||
match msg {
|
||||
Message::IAmOnline { peer_id } => {
|
||||
buf.push (1);
|
||||
buf.extend_from_slice (&peer_id.to_le_bytes());
|
||||
},
|
||||
Message::IHaveMessage { peer_id, msg_index, body } => {
|
||||
buf.push (2);
|
||||
buf.extend_from_slice (&peer_id.to_le_bytes());
|
||||
buf.extend_from_slice (&msg_index.to_le_bytes());
|
||||
|
||||
let body_len = u32::try_from (body.len ()).or (Err (Error::DataTooBig))?;
|
||||
buf.extend_from_slice (&body_len.to_le_bytes());
|
||||
buf.extend_from_slice(body.as_slice());
|
||||
},
|
||||
Message::IAcknowledgeYourMessage { msg_index } => {
|
||||
buf.push (3);
|
||||
buf.extend_from_slice (&msg_index.to_le_bytes());
|
||||
},
|
||||
}
|
||||
|
||||
Ok (buf)
|
||||
}
|
||||
|
||||
#[cfg (test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn roundtrip () {
|
||||
for msg in [
|
||||
Message::IAmOnline { peer_id: 93 },
|
||||
Message::IHaveMessage { peer_id: 93, msg_index: 1000, body: Vec::from (b":V".as_slice()) },
|
||||
Message::IAcknowledgeYourMessage { msg_index: 1000 },
|
||||
] {
|
||||
let encoded = encode (&msg).unwrap ();
|
||||
let roundtripped = decode (&encoded).unwrap ();
|
||||
assert_eq!(roundtripped, msg);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -31,6 +31,7 @@ use ptth_core::{
|
|||
use ptth_server::{
|
||||
file_server::{
|
||||
self,
|
||||
metrics,
|
||||
FileServer,
|
||||
},
|
||||
load_toml,
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
[package]
|
||||
name = "ptth_kv"
|
||||
version = "0.1.0"
|
||||
authors = ["Trish"]
|
||||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.38"
|
||||
base64 = "0.13.0"
|
||||
hyper = { version = "0.14.4", features = ["full"] }
|
||||
thiserror = "1.0.22"
|
||||
tokio = { version = "1.8.1", features = ["full"] }
|
|
@ -0,0 +1,499 @@
|
|||
use std::{
|
||||
collections::{
|
||||
HashMap,
|
||||
},
|
||||
iter::FromIterator,
|
||||
sync::{
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
|
||||
use hyper::{
|
||||
Body,
|
||||
Request,
|
||||
Response,
|
||||
StatusCode,
|
||||
};
|
||||
use tokio::{
|
||||
sync::Mutex,
|
||||
};
|
||||
|
||||
pub struct HttpService {
|
||||
store: Arc <Store>,
|
||||
}
|
||||
|
||||
pub struct Store {
|
||||
status_dirs: HashMap <Vec <u8>, StatusKeyDirectory>,
|
||||
}
|
||||
|
||||
#[derive (thiserror::Error, Debug, PartialEq)]
|
||||
pub enum Error {
|
||||
#[error ("key too long")]
|
||||
KeyTooLong,
|
||||
#[error ("no such key dir")]
|
||||
NoSuchKeyDir,
|
||||
#[error ("value too long")]
|
||||
ValueTooLong,
|
||||
}
|
||||
|
||||
pub struct StatusQuotas {
|
||||
pub max_keys: usize,
|
||||
pub max_key_bytes: usize,
|
||||
pub max_value_bytes: usize,
|
||||
pub max_payload_bytes: usize,
|
||||
}
|
||||
|
||||
pub struct GetAfter {
|
||||
pub tuples: Vec <(Vec <u8>, Vec <u8>)>,
|
||||
pub sequence: u64,
|
||||
}
|
||||
|
||||
impl HttpService {
|
||||
pub fn new (s: Store) -> Self {
|
||||
Self {
|
||||
store: Arc::new (s),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn inner (&self) -> &Store {
|
||||
&*self.store
|
||||
}
|
||||
|
||||
pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> {
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use hyper::{
|
||||
Server,
|
||||
service::{
|
||||
make_service_fn,
|
||||
service_fn,
|
||||
},
|
||||
};
|
||||
|
||||
let make_svc = make_service_fn (|_conn| {
|
||||
let state = self.store.clone ();
|
||||
|
||||
async {
|
||||
Ok::<_, String> (service_fn (move |req| {
|
||||
let state = state.clone ();
|
||||
|
||||
Self::handle_all (req, state)
|
||||
}))
|
||||
}
|
||||
});
|
||||
|
||||
let addr = SocketAddr::from(([127, 0, 0, 1], port));
|
||||
|
||||
let server = Server::bind (&addr)
|
||||
.serve (make_svc)
|
||||
;
|
||||
|
||||
server.await
|
||||
}
|
||||
}
|
||||
|
||||
impl Store {
|
||||
pub fn new <I> (status_dirs: I)
|
||||
-> Self
|
||||
where I: Iterator <Item = (Vec <u8>, StatusQuotas)>
|
||||
{
|
||||
let status_dirs = status_dirs
|
||||
.map (|(name, quotas)| (name, StatusKeyDirectory::new (quotas)))
|
||||
.collect ();
|
||||
|
||||
Self {
|
||||
status_dirs,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn list_key_dirs (&self) -> Vec <Vec <u8>> {
|
||||
self.status_dirs.iter ()
|
||||
.map (|(k, _)| k.clone ())
|
||||
.collect ()
|
||||
}
|
||||
|
||||
pub async fn set (&self, name: &[u8], key: &[u8], value: Vec <u8>)
|
||||
-> Result <(), Error>
|
||||
{
|
||||
let dir = self.status_dirs.get (name)
|
||||
.ok_or (Error::NoSuchKeyDir)?;
|
||||
|
||||
dir.set (key, value).await
|
||||
}
|
||||
|
||||
async fn set_multi (&self, name: &[u8], tuples: Vec <(&[u8], Vec <u8>)>)
|
||||
-> Result <(), Error>
|
||||
{
|
||||
let dir = self.status_dirs.get (name)
|
||||
.ok_or (Error::NoSuchKeyDir)?;
|
||||
|
||||
dir.set_multi (tuples).await
|
||||
}
|
||||
|
||||
pub async fn get_after (&self, name: &[u8], thresh: Option <u64>)
|
||||
-> Result <GetAfter, Error>
|
||||
{
|
||||
let dir = self.status_dirs.get (name)
|
||||
.ok_or (Error::NoSuchKeyDir)?;
|
||||
|
||||
dir.get_after (thresh).await
|
||||
}
|
||||
}
|
||||
|
||||
// End of public interface
|
||||
|
||||
const SET_BATCH_SIZE: usize = 32;
|
||||
|
||||
enum StoreCommand {
|
||||
SetStatus (SetStatusCommand),
|
||||
Multi (Vec <StoreCommand>),
|
||||
}
|
||||
|
||||
struct StatusKeyDirectory {
|
||||
quotas: StatusQuotas,
|
||||
|
||||
// TODO: Make this tokio::sync::Mutex.
|
||||
table: Mutex <StatusTable>,
|
||||
}
|
||||
|
||||
#[derive (Default)]
|
||||
struct StatusTable {
|
||||
map: HashMap <Vec <u8>, StatusValue>,
|
||||
sequence: u64,
|
||||
}
|
||||
|
||||
struct StatusValue {
|
||||
value: Vec <u8>,
|
||||
sequence: u64,
|
||||
}
|
||||
|
||||
struct SetStatusCommand {
|
||||
key_dir: Vec <u8>,
|
||||
key: Vec <u8>,
|
||||
value: Vec <u8>,
|
||||
}
|
||||
|
||||
impl HttpService {
|
||||
async fn handle_all (req: Request <Body>, store: Arc <Store>)
|
||||
-> Result <Response <Body>, anyhow::Error>
|
||||
{
|
||||
Ok (Response::builder ()
|
||||
.body (Body::from ("hello\n"))?)
|
||||
}
|
||||
}
|
||||
|
||||
impl StatusKeyDirectory {
|
||||
fn new (quotas: StatusQuotas) -> Self {
|
||||
Self {
|
||||
quotas,
|
||||
table: Mutex::new (Default::default ()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn set (&self, key: &[u8], value: Vec <u8>) -> Result <(), Error>
|
||||
{
|
||||
if key.len () > self.quotas.max_key_bytes {
|
||||
return Err (Error::KeyTooLong);
|
||||
}
|
||||
|
||||
if value.len () > self.quotas.max_value_bytes {
|
||||
return Err (Error::ValueTooLong);
|
||||
}
|
||||
|
||||
{
|
||||
let mut guard = self.table.lock ().await;
|
||||
guard.set (&self.quotas, key, value);
|
||||
}
|
||||
Ok (())
|
||||
}
|
||||
|
||||
async fn set_multi (&self, tuples: Vec <(&[u8], Vec <u8>)>) -> Result <(), Error>
|
||||
{
|
||||
{
|
||||
let mut guard = self.table.lock ().await;
|
||||
for (key, value) in tuples {
|
||||
guard.set (&self.quotas, key, value);
|
||||
}
|
||||
}
|
||||
Ok (())
|
||||
}
|
||||
|
||||
async fn get_after (&self, thresh: Option <u64>) -> Result <GetAfter, Error> {
|
||||
let guard = self.table.lock ().await;
|
||||
Ok (guard.get_after (thresh))
|
||||
}
|
||||
}
|
||||
|
||||
impl StatusTable {
|
||||
fn payload_bytes (&self) -> usize {
|
||||
self.map.iter ()
|
||||
.map (|(k, v)| k.len () + v.len ())
|
||||
.sum ()
|
||||
}
|
||||
|
||||
fn set (&mut self, quotas: &StatusQuotas, key: &[u8], value: Vec <u8>) {
|
||||
self.sequence += 1;
|
||||
|
||||
if self.map.len () > quotas.max_keys {
|
||||
self.map.clear ();
|
||||
}
|
||||
|
||||
let new_bytes = key.len () + value.len ();
|
||||
|
||||
if self.payload_bytes () + new_bytes > quotas.max_payload_bytes {
|
||||
self.map.clear ();
|
||||
}
|
||||
|
||||
let value = StatusValue {
|
||||
value,
|
||||
sequence: self.sequence,
|
||||
};
|
||||
|
||||
// self.map.insert (key, value);
|
||||
match self.map.get_mut (key) {
|
||||
None => {
|
||||
self.map.insert (key.to_vec (), value);
|
||||
},
|
||||
Some (v) => *v = value,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_after (&self, thresh: Option <u64>) -> GetAfter {
|
||||
let thresh = thresh.unwrap_or (0);
|
||||
|
||||
let tuples = self.map.iter ()
|
||||
.filter_map (|(key, value)| {
|
||||
if value.sequence <= thresh {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some ((key.clone (), value.value.clone ()))
|
||||
})
|
||||
.collect ();
|
||||
|
||||
GetAfter {
|
||||
tuples,
|
||||
sequence: self.sequence,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StatusValue {
|
||||
fn len (&self) -> usize {
|
||||
self.value.len ()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main () -> Result <(), hyper::Error> {
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::{
|
||||
spawn,
|
||||
time::interval,
|
||||
};
|
||||
|
||||
let service = HttpService::new (Store::new (vec! [
|
||||
(b"key_dir".to_vec (), StatusQuotas {
|
||||
max_keys: 4,
|
||||
max_key_bytes: 16,
|
||||
max_value_bytes: 16,
|
||||
max_payload_bytes: 128,
|
||||
}),
|
||||
].into_iter ()));
|
||||
|
||||
service.serve (4003).await
|
||||
}
|
||||
|
||||
#[cfg (test)]
|
||||
mod tests {
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn get_after_eq (a: &GetAfter, b: &GetAfter) {
|
||||
assert_eq! (a.sequence, b.sequence);
|
||||
|
||||
let a = a.tuples.clone ();
|
||||
let b = b.tuples.clone ();
|
||||
|
||||
let a = HashMap::<Vec <u8>, Vec <u8>>::from_iter (a.into_iter ());
|
||||
let b = HashMap::from_iter (b.into_iter ());
|
||||
|
||||
assert_eq! (a, b);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn store () {
|
||||
let rt = Runtime::new ().unwrap ();
|
||||
rt.block_on (async {
|
||||
let s = Store::new (vec! [
|
||||
(b"key_dir".to_vec (), StatusQuotas {
|
||||
max_keys: 4,
|
||||
max_key_bytes: 16,
|
||||
max_value_bytes: 16,
|
||||
max_payload_bytes: 128,
|
||||
}),
|
||||
].into_iter ());
|
||||
|
||||
let mut expected_sequence = 0;
|
||||
|
||||
assert_eq! (s.list_key_dirs (), vec! [
|
||||
b"key_dir".to_vec (),
|
||||
]);
|
||||
|
||||
assert_eq! (
|
||||
s.set (b"key_dir", b"this key is too long and will cause an error", b"bar".to_vec ()).await,
|
||||
Err (Error::KeyTooLong)
|
||||
);
|
||||
assert_eq! (
|
||||
s.set (b"key_dir", b"foo", b"this value is too long and will cause an error".to_vec ()).await,
|
||||
Err (Error::ValueTooLong)
|
||||
);
|
||||
assert_eq! (
|
||||
s.set (b"invalid_key_dir", b"foo", b"bar".to_vec ()).await,
|
||||
Err (Error::NoSuchKeyDir)
|
||||
);
|
||||
|
||||
let ga = s.get_after (b"key_dir", None).await.unwrap ();
|
||||
assert_eq! (ga.sequence, expected_sequence);
|
||||
assert_eq! (ga.tuples, vec! []);
|
||||
|
||||
s.set (b"key_dir", b"foo_1", b"bar_1".to_vec ()).await.unwrap ();
|
||||
expected_sequence += 1;
|
||||
let ga = s.get_after (b"key_dir", None).await.unwrap ();
|
||||
|
||||
assert_eq! (ga.sequence, expected_sequence);
|
||||
assert_eq! (ga.tuples, vec! [
|
||||
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
|
||||
]);
|
||||
|
||||
get_after_eq (&ga, &GetAfter {
|
||||
sequence: expected_sequence,
|
||||
tuples: vec! [
|
||||
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
|
||||
]
|
||||
});
|
||||
|
||||
s.set (b"key_dir", b"foo_2", b"bar_2".to_vec ()).await.unwrap ();
|
||||
expected_sequence += 1;
|
||||
let ga = s.get_after (b"key_dir", None).await.unwrap ();
|
||||
|
||||
get_after_eq (&ga, &GetAfter {
|
||||
sequence: expected_sequence,
|
||||
tuples: vec! [
|
||||
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
|
||||
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
|
||||
]
|
||||
});
|
||||
|
||||
s.set (b"key_dir", b"foo_1", b"bar_3".to_vec ()).await.unwrap ();
|
||||
expected_sequence += 1;
|
||||
let ga = s.get_after (b"key_dir", None).await.unwrap ();
|
||||
|
||||
get_after_eq (&ga, &GetAfter {
|
||||
sequence: expected_sequence,
|
||||
tuples: vec! [
|
||||
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
|
||||
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
|
||||
]
|
||||
});
|
||||
|
||||
let ga = s.get_after (b"key_dir", Some (2)).await.unwrap ();
|
||||
get_after_eq (&ga, &GetAfter {
|
||||
sequence: expected_sequence,
|
||||
tuples: vec! [
|
||||
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
|
||||
]
|
||||
});
|
||||
|
||||
let ga = s.get_after (b"key_dir", Some (3)).await.unwrap ();
|
||||
get_after_eq (&ga, &GetAfter {
|
||||
sequence: expected_sequence,
|
||||
tuples: vec! []
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg (not (debug_assertions))]
|
||||
fn perf () {
|
||||
use std::time::Instant;
|
||||
|
||||
let rt = Runtime::new ().unwrap ();
|
||||
rt.block_on (async {
|
||||
let s = Store::new (vec! [
|
||||
(b"key_dir".to_vec (), StatusQuotas {
|
||||
max_keys: 4,
|
||||
max_key_bytes: 16,
|
||||
max_value_bytes: 16,
|
||||
max_payload_bytes: 128,
|
||||
}),
|
||||
].into_iter ());
|
||||
|
||||
let num_iters = 1_000_000;
|
||||
|
||||
let key = b"foo";
|
||||
|
||||
let start_time = Instant::now ();
|
||||
|
||||
for i in 0..num_iters {
|
||||
let value = format! ("{}", i);
|
||||
|
||||
s.set (b"key_dir", key, value.into ()).await.unwrap ();
|
||||
}
|
||||
|
||||
let end_time = Instant::now ();
|
||||
let total_dur = end_time - start_time;
|
||||
|
||||
let avg_nanos = total_dur.as_nanos () / num_iters;
|
||||
|
||||
assert! (avg_nanos < 250, dbg! (avg_nanos));
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg (not (debug_assertions))]
|
||||
fn perf_multi () {
|
||||
use std::time::Instant;
|
||||
|
||||
let rt = Runtime::new ().unwrap ();
|
||||
rt.block_on (async {
|
||||
let s = Store::new (vec! [
|
||||
(b"key_dir".to_vec (), StatusQuotas {
|
||||
max_keys: 8,
|
||||
max_key_bytes: 16,
|
||||
max_value_bytes: 16,
|
||||
max_payload_bytes: 128,
|
||||
}),
|
||||
].into_iter ());
|
||||
|
||||
let num_iters = 1_000_000;
|
||||
|
||||
let start_time = Instant::now ();
|
||||
|
||||
for i in 0..num_iters {
|
||||
let value = Vec::<u8>::from (format! ("{}", i));
|
||||
let tuples = vec! [
|
||||
(&b"foo_0"[..], value.clone ()),
|
||||
(b"foo_1", value.clone ()),
|
||||
(b"foo_2", value.clone ()),
|
||||
(b"foo_3", value.clone ()),
|
||||
(b"foo_4", value.clone ()),
|
||||
(b"foo_5", value.clone ()),
|
||||
(b"foo_6", value.clone ()),
|
||||
(b"foo_7", value.clone ()),
|
||||
];
|
||||
|
||||
s.set_multi (b"key_dir", tuples).await.unwrap ();
|
||||
}
|
||||
|
||||
let end_time = Instant::now ();
|
||||
let total_dur = end_time - start_time;
|
||||
|
||||
let avg_nanos = total_dur.as_nanos () / (num_iters * 8);
|
||||
|
||||
assert! (avg_nanos < 150, dbg! (avg_nanos));
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,5 +1,6 @@
|
|||
use structopt::StructOpt;
|
||||
use tokio::{
|
||||
net::UdpSocket,
|
||||
sync::watch,
|
||||
};
|
||||
|
||||
|
@ -95,7 +96,34 @@ impl P2Client {
|
|||
})
|
||||
};
|
||||
|
||||
if false {
|
||||
let task_direc_connect = {
|
||||
let connection = connection.clone ();
|
||||
|
||||
tokio::spawn (async move {
|
||||
let cookie = protocol::p2_direc_to_p4 (
|
||||
&connection,
|
||||
"bogus_server",
|
||||
).await?;
|
||||
|
||||
let sock = UdpSocket::bind ("0.0.0.0:0").await?;
|
||||
|
||||
let mut interval = tokio::time::interval (Duration::from_millis (1000));
|
||||
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Delay);
|
||||
|
||||
loop {
|
||||
interval.tick ().await;
|
||||
sock.send_to(&cookie [..], "127.0.0.1:30379").await?;
|
||||
debug! ("P2 sent cookie to P3 over plain UDP");
|
||||
}
|
||||
|
||||
Ok::<_, anyhow::Error> (())
|
||||
})
|
||||
};
|
||||
}
|
||||
|
||||
task_tcp_server.await??;
|
||||
//task_direc_connect.await??;
|
||||
|
||||
Ok (())
|
||||
}
|
||||
|
|
|
@ -26,10 +26,10 @@ async fn main () -> anyhow::Result <()> {
|
|||
tokio::fs::write ("ptth_quic_output/quic_server.crt", app.server_cert ()).await?;
|
||||
|
||||
tokio::select! {
|
||||
_val = app.run () => {
|
||||
val = app.run () => {
|
||||
|
||||
},
|
||||
_val = running_rx.changed () => {
|
||||
val = running_rx.changed () => {
|
||||
|
||||
},
|
||||
}
|
||||
|
|
|
@ -94,6 +94,7 @@ impl Opt {
|
|||
pub struct P4EndServer {
|
||||
conf: Config,
|
||||
conn: quinn::Connection,
|
||||
endpoint: quinn::Endpoint,
|
||||
shutdown_rx: watch::Receiver <bool>,
|
||||
}
|
||||
|
||||
|
@ -141,6 +142,7 @@ impl P4EndServer {
|
|||
Ok ((P4EndServer {
|
||||
conf,
|
||||
conn,
|
||||
endpoint,
|
||||
shutdown_rx,
|
||||
}, shutdown_tx))
|
||||
}
|
||||
|
|
|
@ -10,6 +10,9 @@ use hyper::{
|
|||
StatusCode,
|
||||
};
|
||||
use structopt::StructOpt;
|
||||
use tokio::{
|
||||
net::UdpSocket,
|
||||
};
|
||||
|
||||
use crate::prelude::*;
|
||||
use protocol::PeerId;
|
||||
|
@ -84,9 +87,9 @@ impl App {
|
|||
pub async fn run (self) -> anyhow::Result <()> {
|
||||
let Self {
|
||||
endpoint,
|
||||
listen_addr: _,
|
||||
listen_addr,
|
||||
metrics,
|
||||
server_cert: _,
|
||||
server_cert,
|
||||
tcp_listener,
|
||||
} = self;
|
||||
|
||||
|
@ -155,6 +158,35 @@ impl App {
|
|||
})
|
||||
};
|
||||
|
||||
let task_direc_server = {
|
||||
let relay_state = Arc::clone (&relay_state);
|
||||
|
||||
tokio::spawn (async move {
|
||||
let sock = UdpSocket::bind("0.0.0.0:30379").await?;
|
||||
let mut buf = [0; 2048];
|
||||
loop {
|
||||
let (len, addr) = sock.recv_from (&mut buf).await?;
|
||||
debug! ("{:?} bytes received from {:?}", len, addr);
|
||||
|
||||
let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x));
|
||||
|
||||
{
|
||||
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
|
||||
|
||||
if let Some (direc_state) = direc_cookies.remove (&packet) {
|
||||
debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id);
|
||||
direc_state.p2_addr.send (addr).ok ();
|
||||
}
|
||||
else {
|
||||
debug! ("UDP packet didn't match any PTTH_DIREC cookie");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok::<_, anyhow::Error> (())
|
||||
})
|
||||
};
|
||||
|
||||
let task_http_server = tokio::spawn (async move {
|
||||
http_server.serve (make_svc).await?;
|
||||
Ok::<_, anyhow::Error> (())
|
||||
|
@ -193,6 +225,9 @@ impl App {
|
|||
_val = task_http_server => {
|
||||
eprintln! ("HTTP server exited, exiting");
|
||||
},
|
||||
_val = task_direc_server => {
|
||||
eprintln! ("PTTH_DIREC server exited, exiting");
|
||||
},
|
||||
}
|
||||
|
||||
Ok (())
|
||||
|
@ -221,6 +256,7 @@ async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
|
|||
struct RelayState {
|
||||
config: arc_swap::ArcSwap <Config>,
|
||||
p4_server_proxies: Mutex <HashMap <PeerId, P4State>>,
|
||||
direc_cookies: Mutex <HashMap <Vec <u8>, DirecState>>,
|
||||
metrics: Arc <RwLock <Metrics>>,
|
||||
stats: Stats,
|
||||
http_client: reqwest::Client,
|
||||
|
@ -229,6 +265,7 @@ struct RelayState {
|
|||
#[derive (Default)]
|
||||
struct Config {
|
||||
ip_nicknames: BTreeMap <[u8; 4], String>,
|
||||
tcp_listen_port: Option <u16>,
|
||||
webhook_url: Option <String>,
|
||||
}
|
||||
|
||||
|
@ -236,6 +273,7 @@ impl From <ConfigFile> for Config {
|
|||
fn from (x: ConfigFile) -> Self {
|
||||
Self {
|
||||
ip_nicknames: x.ip_nicknames.into_iter ().collect (),
|
||||
tcp_listen_port: x.tcp_listen_port,
|
||||
webhook_url: x.webhook_url,
|
||||
}
|
||||
}
|
||||
|
@ -248,6 +286,12 @@ struct ConfigFile {
|
|||
webhook_url: Option <String>,
|
||||
}
|
||||
|
||||
struct DirecState {
|
||||
start_time: Instant,
|
||||
p2_id: PeerId,
|
||||
p2_addr: tokio::sync::oneshot::Sender <SocketAddr>,
|
||||
}
|
||||
|
||||
#[derive (Default)]
|
||||
struct Stats {
|
||||
quic: ConnectEvents,
|
||||
|
@ -469,7 +513,19 @@ async fn handle_p2_connection (
|
|||
recv
|
||||
).await?
|
||||
},
|
||||
_ => (),
|
||||
protocol::P2ToP3Stream::DirecP2ToP4 {
|
||||
server_id,
|
||||
cookie,
|
||||
} => {
|
||||
handle_direc_p2_to_p4 (
|
||||
relay_state,
|
||||
client_id,
|
||||
server_id,
|
||||
cookie,
|
||||
send,
|
||||
recv
|
||||
).await?
|
||||
},
|
||||
}
|
||||
|
||||
debug! ("Request ended for P2");
|
||||
|
@ -513,6 +569,41 @@ async fn handle_request_p2_to_p4 (
|
|||
Ok (())
|
||||
}
|
||||
|
||||
async fn handle_direc_p2_to_p4 (
|
||||
relay_state: Arc <RelayState>,
|
||||
client_id: String,
|
||||
server_id: PeerId,
|
||||
cookie: Vec <u8>,
|
||||
mut client_send: quinn::SendStream,
|
||||
client_recv: quinn::RecvStream,
|
||||
) -> anyhow::Result <()>
|
||||
{
|
||||
debug! ("P2 {} wants a P2P connection to P4 {}", client_id, server_id);
|
||||
|
||||
// TODO: Check authorization
|
||||
|
||||
protocol::p3_authorize_p2_to_p4_direc (&mut client_send).await?;
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel ();
|
||||
|
||||
{
|
||||
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
|
||||
direc_cookies.insert (cookie, DirecState {
|
||||
start_time: Instant::now (),
|
||||
p2_id: client_id.clone (),
|
||||
p2_addr: tx,
|
||||
});
|
||||
}
|
||||
|
||||
debug! ("Waiting to learn P2's WAN address...");
|
||||
|
||||
let wan_addr = rx.await?;
|
||||
|
||||
debug! ("And that WAN address is {}", wan_addr);
|
||||
|
||||
Ok (())
|
||||
}
|
||||
|
||||
async fn handle_p4_connection (
|
||||
relay_state: Arc <RelayState>,
|
||||
connection: quinn::Connection,
|
||||
|
|
|
@ -9,7 +9,7 @@ use std::{
|
|||
|
||||
use quinn::{
|
||||
ClientConfig, Endpoint,
|
||||
ServerConfig,
|
||||
ServerConfig, TransportConfig,
|
||||
};
|
||||
|
||||
/// Constructs a QUIC endpoint configured for use a client only.
|
||||
|
|
|
@ -109,6 +109,8 @@ pub struct Relay {
|
|||
#[derive (Clone, Default)]
|
||||
pub (crate) struct MonitoringCounters {
|
||||
pub (crate) requests_total: u64,
|
||||
pub (crate) requests_by_scraper_api: HashMap <String, u64>,
|
||||
pub (crate) requests_by_email: HashMap <String, u64>,
|
||||
}
|
||||
|
||||
#[derive (Clone)]
|
||||
|
@ -120,6 +122,7 @@ pub struct RejectedServer {
|
|||
|
||||
#[derive (Clone, Debug)]
|
||||
pub struct AuditEvent {
|
||||
time_monotonic: Instant,
|
||||
pub time_utc: DateTime <Utc>,
|
||||
pub data: AuditData,
|
||||
}
|
||||
|
@ -145,6 +148,7 @@ pub enum AuditData {
|
|||
impl AuditEvent {
|
||||
pub fn new (data: AuditData) -> Self {
|
||||
Self {
|
||||
time_monotonic: Instant::now (),
|
||||
time_utc: Utc::now (),
|
||||
data,
|
||||
}
|
||||
|
|
|
@ -192,7 +192,7 @@ async fn api_v1 (
|
|||
})).await;
|
||||
|
||||
if path_rest == "metrics" {
|
||||
Ok (metrics (state).await?)
|
||||
Ok (metrics (req, state).await?)
|
||||
}
|
||||
else if path_rest == "test" {
|
||||
Ok (error_reply (StatusCode::OK, "You're valid!")?)
|
||||
|
@ -223,8 +223,9 @@ async fn api_v1 (
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument (level = "trace", skip (state))]
|
||||
#[instrument (level = "trace", skip (req, state))]
|
||||
async fn metrics (
|
||||
req: Request <Body>,
|
||||
state: &Relay,
|
||||
)
|
||||
-> Result <Response <Body>, RequestError>
|
||||
|
|
|
@ -57,7 +57,7 @@ pub async fn handle_listen (
|
|||
|
||||
let mut server_status = state.server_status.lock ().await;
|
||||
|
||||
let status = server_status.entry (watcher_code.clone ()).or_insert_with (Default::default);
|
||||
let mut status = server_status.entry (watcher_code.clone ()).or_insert_with (Default::default);
|
||||
|
||||
status.last_seen = now;
|
||||
}
|
||||
|
|
|
@ -419,6 +419,27 @@ impl FileServer {
|
|||
send_body,
|
||||
range,
|
||||
}) => serve_file (uri, file.into_inner (), send_body, range, headers.get ("if-none-match").map (|v| &v[..])).await?,
|
||||
MarkdownErr (e) => {
|
||||
#[cfg (feature = "markdown")]
|
||||
{
|
||||
use markdown::Error::*;
|
||||
let e = e.inner;
|
||||
let code = match &e {
|
||||
TooBig => StatusCode::InternalServerError,
|
||||
//NotMarkdown => serve_error (StatusCode::BadRequest, "File is not Markdown"),
|
||||
NotUtf8 => StatusCode::BadRequest,
|
||||
};
|
||||
|
||||
return Ok (serve_error (code, e.to_string ()));
|
||||
}
|
||||
|
||||
#[cfg (not (feature = "markdown"))]
|
||||
{
|
||||
let _e = e;
|
||||
serve_error (StatusCode::BadRequest, "Markdown feature is disabled")
|
||||
}
|
||||
},
|
||||
MarkdownPreview (s) => html::serve (s),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,6 +76,9 @@ pub enum Response {
|
|||
Root,
|
||||
ServeDir (ServeDirParams),
|
||||
ServeFile (ServeFileParams),
|
||||
|
||||
MarkdownErr (MarkdownErrWrapper),
|
||||
MarkdownPreview (String),
|
||||
}
|
||||
|
||||
#[cfg (feature = "markdown")]
|
||||
|
@ -128,7 +131,7 @@ fn serve_dir (
|
|||
|
||||
async fn serve_file (
|
||||
file: tokio::fs::File,
|
||||
_uri: &http::Uri,
|
||||
uri: &http::Uri,
|
||||
send_body: bool,
|
||||
headers: &HashMap <String, Vec <u8>>
|
||||
)
|
||||
|
|
|
@ -266,6 +266,8 @@ pub struct Config {
|
|||
|
||||
pub struct Builder {
|
||||
config_file: ConfigFile,
|
||||
hidden_path: Option <PathBuf>,
|
||||
asset_root: Option <PathBuf>,
|
||||
}
|
||||
|
||||
impl Builder {
|
||||
|
@ -287,13 +289,17 @@ impl Builder {
|
|||
|
||||
Self {
|
||||
config_file,
|
||||
hidden_path: None,
|
||||
asset_root: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build (self) -> Result <State, ServerError>
|
||||
{
|
||||
State::new (
|
||||
self.config_file
|
||||
self.config_file,
|
||||
self.hidden_path,
|
||||
self.asset_root
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -334,8 +340,12 @@ pub async fn run_server (
|
|||
|
||||
let state = Arc::new (State::new (
|
||||
config_file,
|
||||
hidden_path,
|
||||
asset_root,
|
||||
)?);
|
||||
|
||||
let file_server_2 = Arc::clone (&file_server);
|
||||
|
||||
let mut spawn_handler = || {
|
||||
let file_server = Arc::clone (&file_server);
|
||||
let hit_counter = hit_counter.clone ();
|
||||
|
@ -343,7 +353,7 @@ pub async fn run_server (
|
|||
|req: http_serde::RequestParts| async move {
|
||||
if let Some (hit_tx) = &hit_counter {
|
||||
eprintln! ("hit_tx.send");
|
||||
hit_tx.send (()).await.ok ();
|
||||
hit_tx.send (()).await;
|
||||
}
|
||||
Ok (file_server.serve_all (req.method, &req.uri, &req.headers).await?)
|
||||
}
|
||||
|
@ -359,11 +369,15 @@ pub async fn run_server (
|
|||
impl State {
|
||||
pub fn new (
|
||||
config_file: ConfigFile,
|
||||
hidden_path: Option <PathBuf>,
|
||||
asset_root: Option <PathBuf>
|
||||
)
|
||||
-> Result <Self, ServerError>
|
||||
{
|
||||
use std::convert::TryInto;
|
||||
|
||||
let asset_root = asset_root.unwrap_or_else (PathBuf::new);
|
||||
|
||||
info! ("Server name is {}", config_file.name);
|
||||
info! ("Tripcode is {}", config_file.tripcode ());
|
||||
|
||||
|
|
|
@ -24,6 +24,18 @@ fn load_inner <
|
|||
Ok (toml::from_str (&config_s)?)
|
||||
}
|
||||
|
||||
/// For files that contain public-viewable information
|
||||
|
||||
fn load_public <
|
||||
T: DeserializeOwned,
|
||||
P: AsRef <Path> + Debug
|
||||
> (
|
||||
config_file_path: P
|
||||
) -> Result <T, LoadTomlError> {
|
||||
let f = File::open (&config_file_path)?;
|
||||
load_inner (f)
|
||||
}
|
||||
|
||||
/// For files that may contain secrets and should have permissions or other
|
||||
/// safeties checked
|
||||
|
||||
|
|
Loading…
Reference in New Issue