Compare commits

...

2 Commits

Author SHA1 Message Date
_ 843fddb568 🚨 cargo check pass 2023-08-31 19:15:04 -05:00
_ 25a8a035b9 💥 remove un-finished stuff and clear up `cargo check` for the PTTH_QUIC GUI 2023-08-31 19:03:52 -05:00
20 changed files with 13 additions and 1408 deletions

72
Cargo.lock generated
View File

@ -281,7 +281,7 @@ version = "3.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1631ca6e3c59112501a9d87fd86f21591ff77acd31331e8a73f8d80a65bbdd71"
dependencies = [
"nix 0.26.1",
"nix",
"windows-sys 0.42.0",
]
@ -774,19 +774,6 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "insecure_chat"
version = "0.1.0"
dependencies = [
"hyper",
"mac_address",
"nix 0.25.1",
"ptth_diceware",
"rand",
"thiserror",
"tokio",
]
[[package]]
name = "instant"
version = "0.1.12"
@ -866,16 +853,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "mac_address"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b238e3235c8382b7653c6408ed1b08dd379bdb9fdf990fb0bbae3db2cc0ae963"
dependencies = [
"nix 0.23.2",
"winapi",
]
[[package]]
name = "maplit"
version = "1.0.2"
@ -897,15 +874,6 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memoffset"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg",
]
[[package]]
name = "metrics_test"
version = "0.1.0"
@ -950,33 +918,6 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nix"
version = "0.23.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f3790c00a0150112de0f4cd161e3d7fc4b2d8a5542ffc35f099a2562aecb35c"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"libc",
"memoffset",
]
[[package]]
name = "nix"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f346ff70e7dbfd675fe90590b92d59ef2de15a8779ae305ebcbfd3f0caf59be4"
dependencies = [
"autocfg",
"bitflags",
"cfg-if",
"libc",
"memoffset",
"pin-utils",
]
[[package]]
name = "nix"
version = "0.26.1"
@ -1317,17 +1258,6 @@ dependencies = [
"uom",
]
[[package]]
name = "ptth_kv"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.13.1",
"hyper",
"thiserror",
"tokio",
]
[[package]]
name = "ptth_multi_call_server"
version = "1.1.1"

View File

@ -1,15 +0,0 @@
[package]
name = "insecure_chat"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
hyper = { version = "0.14.20", features = ["http1", "server", "tcp"] }
mac_address = "1.1.4"
nix = "0.25.0"
ptth_diceware = { path = "../ptth_diceware" }
rand = "0.8.5"
thiserror = "1.0.37"
tokio = { version = "1.21.2", features = ["net", "rt-multi-thread", "fs"] }

View File

@ -1,125 +0,0 @@
// IP address module
// Copied from the `lookaround` project
use std::{
net::Ipv4Addr,
process::Command,
str::FromStr,
};
#[derive (Debug, thiserror::Error)]
pub enum Error {
#[error (transparent)]
Io (#[from] std::io::Error),
#[error (transparent)]
FromUtf8 (#[from] std::string::FromUtf8Error),
#[error ("Self-IP detection is not implemented on Mac OS")]
NotImplementedOnMac,
}
#[cfg(target_os = "linux")]
pub fn get_ips () -> Result <Vec <Ipv4Addr>, Error> {
let output = linux::get_ip_addr_output ()?;
Ok (linux::parse_ip_addr_output (&output))
}
#[cfg(target_os = "macos")]
pub fn get_ips () -> Result <Vec <Ipv4Addr>, Error> {
Err (Error::NotImplementedOnMac)
}
#[cfg(target_os = "windows")]
pub fn get_ips () -> Result <Vec <Ipv4Addr>, Error> {
let output = windows::get_ip_config_output ()?;
Ok (windows::parse_ip_config_output (&output))
}
#[cfg(target_os = "linux")]
pub mod linux {
use super::*;
pub fn get_ip_addr_output () -> Result <String, Error> {
let output = Command::new ("ip")
.arg ("addr")
.output ()?;
let output = output.stdout.as_slice ();
let output = String::from_utf8 (output.to_vec ())?;
Ok (output)
}
pub fn parse_ip_addr_output (output: &str) -> Vec <Ipv4Addr> {
// I wrote this in FP style because I was bored.
output.lines ()
.map (|l| l.trim_start ())
.filter_map (|l| l.strip_prefix ("inet "))
.filter_map (|l| l.find ('/').map (|x| &l [0..x]))
.filter_map (|l| Ipv4Addr::from_str (l).ok ())
.filter (|a| ! a.is_loopback ())
.collect ()
}
}
#[cfg(target_os = "windows")]
pub mod windows {
use super::*;
pub fn get_ip_config_output () -> Result <String, Error> {
let output = Command::new ("ipconfig")
.output ()?;
let output = output.stdout.as_slice ();
let output = String::from_utf8 (output.to_vec ())?;
Ok (output)
}
pub fn parse_ip_config_output (output: &str) -> Vec <Ipv4Addr> {
let mut addrs = vec! [];
for line in output.lines () {
let line = line.trim_start ();
// Maybe only works on English locales?
if ! line.starts_with ("IPv4 Address") {
continue;
}
let colon_pos = match line.find (':') {
None => continue,
Some (x) => x,
};
let line = &line [colon_pos + 2..];
let addr = match Ipv4Addr::from_str (line) {
Err (_) => continue,
Ok (x) => x,
};
addrs.push (addr);
}
addrs
}
#[cfg (test)]
mod test {
use super::*;
#[test]
fn test () {
for (input, expected) in [
(
r"
IPv4 Address . . .. . . . : 192.168.1.1
",
vec! [
Ipv4Addr::new (192, 168, 1, 1),
]
),
] {
let actual = parse_ip_config_output (input);
assert_eq! (actual, expected);
}
}
}
}

View File

@ -1,375 +0,0 @@
use std::{
collections::*,
net::{
Ipv4Addr,
SocketAddrV4,
},
sync::Arc,
time::Duration,
};
use hyper::{
Body,
Method,
Request,
Response,
Server,
StatusCode,
service::{
make_service_fn,
service_fn,
},
};
use tokio::{
net::UdpSocket,
sync::RwLock,
};
mod ip;
mod tlv;
fn main () -> Result <(), Error>
{
let mut args = std::env::args ();
let mut bail_unknown = true;
let mut last_unknown = None;
let mut name = ptth_diceware::passphrase ("_", 3);
let mut subcommand_count = 0;
let mut subcommand = None;
args.next ();
while let Some (arg) = args.next () {
if arg == "--ignore-unknown" {
bail_unknown = false;
}
if arg == "--name" {
name = args.next ().unwrap ().to_string ();
}
else if arg == "peer" {
subcommand = Some (Subcommand::Peer);
subcommand_count += 1;
}
else if arg == "receiver" {
subcommand = Some (Subcommand::Receiver);
subcommand_count += 1;
}
else if arg == "sender" {
subcommand = Some (Subcommand::Sender);
subcommand_count += 1;
}
else if arg == "spy" {
subcommand = Some (Subcommand::Spy);
subcommand_count += 1;
}
else {
last_unknown = Some (arg);
}
}
if bail_unknown {
if let Some (last_unknown) = last_unknown {
eprintln! ("Unknown argument `{}`", last_unknown);
return Err (Error::Args);
}
}
if subcommand_count >= 2 {
eprintln! ("Detected {} subcommands in arguments", subcommand_count);
return Err (Error::Args)
}
let rt = tokio::runtime::Runtime::new ()?;
let params = Params::default ();
rt.block_on (async {
if let Some (cmd) = subcommand {
return match cmd {
Subcommand::Peer => peer (params).await,
Subcommand::Receiver => receiver (params).await,
Subcommand::Sender => sender (params).await,
Subcommand::Spy => spy (params),
};
}
println! ("Name is `{}`", name);
Ok::<_, Error> (())
})?;
Ok (())
}
enum Subcommand {
Peer,
Receiver,
Sender,
Spy,
}
struct Params {
multicast_group: (Ipv4Addr, u16),
}
impl Default for Params {
fn default () -> Self {
let multicast_group = (Ipv4Addr::new (225, 100, 99, 98), 9041);
Self {
multicast_group,
}
}
}
async fn peer (params: Params) -> Result <(), Error>
{
use rand::Rng;
let mut id = [0];
rand::thread_rng ().try_fill (&mut id).or (Err (Error::Rand))?;
let (multicast_addr, multicast_port) = params.multicast_group;
let socket = tokio::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, multicast_port)).await?;
socket.join_multicast_v4 (multicast_addr, Ipv4Addr::UNSPECIFIED)?;
eprintln! ("Multicast group is {:?}", params.multicast_group);
eprintln! ("Local addr is {}", socket.local_addr ()?);
let peer = Peer {
id: id [0],
outbox: Outbox {
index: 1000,
messages: Default::default (),
}.into (),
params,
socket,
};
eprintln! ("Random peer ID is {}", peer.id);
let state = Arc::new (peer);
{
let state = Arc::clone (&state);
tokio::spawn (async move {
let mut interval = tokio::time::interval (Duration::from_secs (25));
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick ().await;
state.send_multicast (&tlv::Message::IAmOnline { peer_id: state.id }).await.ok ();
}
});
}
{
let state = Arc::clone (&state);
tokio::spawn (async move {
loop {
let mut buf = vec! [0u8; 2048];
let (bytes_recved, addr) = match state.socket.recv_from (&mut buf).await
{
Err (_) => {
tokio::time::sleep (Duration::from_secs (10)).await;
continue;
},
Ok (x) => x,
};
let buf = &buf [0..bytes_recved];
let msg = match tlv::decode (buf) {
Err (_) => {
eprintln! ("ZAT4ERXR Couldn't decode message");
continue;
},
Ok (x) => x,
};
println! ("Received {:?}", msg);
}
});
}
let make_svc = make_service_fn (|_conn| {
let state = Arc::clone (&state);
async {
Ok::<_, String> (service_fn (move |req| {
let state = state.clone ();
peer_handle_all (req, state)
}))
}
});
let addr = std::net::SocketAddr::from (([127, 0, 0, 1], multicast_port));
let server = Server::bind (&addr)
.serve (make_svc);
eprintln! ("Local UI on {}", addr);
server.await?;
Ok (())
}
struct Peer {
id: u32,
outbox: RwLock <Outbox>,
params: Params,
socket: UdpSocket,
}
impl Peer {
async fn send_multicast (&self, msg: &tlv::Message) -> Result <(), Error>
{
let msg = tlv::encode (&msg)?;
self.socket.send_to (&msg, self.params.multicast_group).await?;
Ok (())
}
}
struct Outbox {
index: u32,
messages: VecDeque <SentMessage>,
}
struct SentMessage {
index: u32,
body: Vec <u8>,
}
async fn peer_handle_all (req: Request <Body>, state: Arc <Peer>)
-> Result <Response <Body>, Error>
{
if req.method () == Method::POST {
if req.uri () == "/paste" {
let body = hyper::body::to_bytes (req.into_body ()).await?;
if body.len () > 1024 {
let resp = Response::builder ()
.status (StatusCode::BAD_REQUEST)
.body (Body::from ("Message body must be <= 1024 bytes"))?;
return Ok (resp);
}
let body = body.to_vec ();
let msg_index;
{
let mut outbox = state.outbox.write ().await;
let msg = SentMessage {
index: outbox.index,
body: body.clone (),
};
msg_index = msg.index;
outbox.messages.push_back (msg);
if outbox.messages.len () > 10 {
outbox.messages.pop_front ();
}
outbox.index += 1;
}
match state.send_multicast (&tlv::Message::IHaveMessage {
peer_id: state.id,
msg_index,
body,
}).await {
Ok (_) => (),
Err (_) => return Ok (
Response::builder ()
.status (StatusCode::BAD_REQUEST)
.body (Body::from ("Can't encode message"))?
),
}
return Ok (Response::new (format! ("Pasted message {}\n", msg_index).into ()));
}
}
Ok (Response::new (":V\n".into ()))
}
async fn receiver (params: Params) -> Result <(), Error>
{
let (multicast_addr, multicast_port) = params.multicast_group;
let socket = tokio::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, multicast_port)).await?;
socket.join_multicast_v4 (multicast_addr, Ipv4Addr::UNSPECIFIED)?;
eprintln! ("Multicast group is {:?}", params.multicast_group);
eprintln! ("Local addr is {}", socket.local_addr ()?);
loop {
let mut buf = vec! [0u8; 2048];
let (bytes_recved, remote_addr) = socket.recv_from (&mut buf).await?;
buf.truncate (bytes_recved);
println! ("Received {} bytes from {}", bytes_recved, remote_addr);
}
}
async fn sender (params: Params) -> Result <(), Error>
{
let (multicast_addr, multicast_port) = params.multicast_group;
let socket = tokio::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?;
socket.join_multicast_v4 (multicast_addr, Ipv4Addr::UNSPECIFIED)?;
eprintln! ("Multicast group is {:?}", params.multicast_group);
eprintln! ("Local addr is {}", socket.local_addr ()?);
socket.send_to (&[], params.multicast_group).await?;
Ok (())
}
fn spy (params: Params) -> Result <(), Error>
{
let (multicast_addr, multicast_port) = params.multicast_group;
let socket = match std::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, multicast_port)) {
Ok (x) => x,
Err (e) => if e.kind () == std::io::ErrorKind::AddrInUse {
eprintln! ("Address in use. You can only run 1 instance of Insecure Chat at a time, even in spy mode.");
return Err (Error::AddrInUse);
}
else {
return Err (e.into ());
}
};
for bind_addr in ip::get_ips ()? {
socket.join_multicast_v4 (&multicast_addr, &bind_addr)?;
// eprintln! ("Joined multicast with {}", bind_addr);
}
eprintln! ("Multicast addr is {}", multicast_addr);
eprintln! ("Local addr is {}", socket.local_addr ()?);
loop {
let mut buf = vec! [0u8; 2048];
eprintln! ("Listening for UDP packets...");
let (bytes_recved, remote_addr) = socket.recv_from (&mut buf)?;
buf.truncate (bytes_recved);
println! ("Received {} bytes from {}", bytes_recved, remote_addr);
}
}
#[derive (Debug, thiserror::Error)]
enum Error {
#[error ("Address in use")]
AddrInUse,
#[error ("CLI args")]
Args,
#[error (transparent)]
Hyper (#[from] hyper::Error),
#[error (transparent)]
HyperHttp (#[from] hyper::http::Error),
#[error (transparent)]
Io (#[from] std::io::Error),
#[error (transparent)]
Ip (#[from] ip::Error),
#[error ("Randomness")]
Rand,
#[error (transparent)]
Tlv (#[from] tlv::Error),
}

View File

@ -1,122 +0,0 @@
// A crummy ad-hoc TLV cause I'm lazy and these are fun
#[derive (Debug, PartialEq)]
pub enum Message {
IAmOnline {
peer_id: u32,
},
IHaveMessage {
peer_id: u32,
msg_index: u32,
body: Vec <u8>,
},
IAcknowledgeYourMessage {
msg_index: u32,
},
}
#[derive (Debug, thiserror::Error)]
pub enum Error {
#[error ("Bad magic number")]
BadMagic,
#[error ("Data is too big to encode / decode")]
DataTooBig,
#[error ("Required field missing while decoding")]
MissingField,
#[error ("No type in TLV")]
NoType,
#[error ("Unrecognized TLV type")]
UnrecognizedType,
}
const MAGIC: [u8; 4] = [55, 11, 101, 38];
pub fn decode (src: &[u8]) -> Result <Message, Error>
{
let (magic, src) = take_bytes (src, 4).ok_or (Error::BadMagic)?;
if magic != MAGIC {
return Err (Error::BadMagic);
}
let (msg_type, src) = take_bytes(src, 1).ok_or(Error::NoType)?;
Ok (match msg_type [0] {
1 => {
let (peer_id, src) = take_u32 (src).ok_or (Error::MissingField)?;
Message::IAmOnline { peer_id, }
},
2 => {
let (peer_id, src) = take_u32 (src).ok_or (Error::MissingField)?;
let (msg_index, src) = take_u32 (src).ok_or (Error::MissingField)?;
let (body_len, src) = take_u32 (src).ok_or (Error::MissingField)?;
let (body_bytes, src) = take_bytes(src, body_len as usize).ok_or(Error::MissingField)?;
let body = body_bytes.into ();
Message::IHaveMessage { peer_id, msg_index, body, }
},
3 => {
let (msg_index, src) = take_u32 (src).ok_or (Error::MissingField)?;
Message::IAcknowledgeYourMessage { msg_index, }
},
_ => return Err (Error::UnrecognizedType),
})
}
fn take_u32 (src: &[u8]) -> Option <(u32, &[u8])>
{
let (a, b) = take_bytes (src, 4)?;
Some ((u32::from_le_bytes([a [0], a [1], a [2], a [3]]), b))
}
fn take_bytes (src: &[u8], n: usize) -> Option <(&[u8], &[u8])>
{
if src.len () < n {
return None;
}
Some (src.split_at (n))
}
pub fn encode (msg: &Message) -> Result <Vec <u8>, Error>
{
let mut buf = Vec::from (MAGIC);
match msg {
Message::IAmOnline { peer_id } => {
buf.push (1);
buf.extend_from_slice (&peer_id.to_le_bytes());
},
Message::IHaveMessage { peer_id, msg_index, body } => {
buf.push (2);
buf.extend_from_slice (&peer_id.to_le_bytes());
buf.extend_from_slice (&msg_index.to_le_bytes());
let body_len = u32::try_from (body.len ()).or (Err (Error::DataTooBig))?;
buf.extend_from_slice (&body_len.to_le_bytes());
buf.extend_from_slice(body.as_slice());
},
Message::IAcknowledgeYourMessage { msg_index } => {
buf.push (3);
buf.extend_from_slice (&msg_index.to_le_bytes());
},
}
Ok (buf)
}
#[cfg (test)]
mod test {
use super::*;
#[test]
fn roundtrip () {
for msg in [
Message::IAmOnline { peer_id: 93 },
Message::IHaveMessage { peer_id: 93, msg_index: 1000, body: Vec::from (b":V".as_slice()) },
Message::IAcknowledgeYourMessage { msg_index: 1000 },
] {
let encoded = encode (&msg).unwrap ();
let roundtripped = decode (&encoded).unwrap ();
assert_eq!(roundtripped, msg);
}
}
}

View File

@ -31,7 +31,6 @@ use ptth_core::{
use ptth_server::{
file_server::{
self,
metrics,
FileServer,
},
load_toml,

View File

@ -1,12 +0,0 @@
[package]
name = "ptth_kv"
version = "0.1.0"
authors = ["Trish"]
edition = "2018"
[dependencies]
anyhow = "1.0.38"
base64 = "0.13.0"
hyper = { version = "0.14.4", features = ["full"] }
thiserror = "1.0.22"
tokio = { version = "1.8.1", features = ["full"] }

View File

@ -1,499 +0,0 @@
use std::{
collections::{
HashMap,
},
iter::FromIterator,
sync::{
Arc,
},
};
use hyper::{
Body,
Request,
Response,
StatusCode,
};
use tokio::{
sync::Mutex,
};
pub struct HttpService {
store: Arc <Store>,
}
pub struct Store {
status_dirs: HashMap <Vec <u8>, StatusKeyDirectory>,
}
#[derive (thiserror::Error, Debug, PartialEq)]
pub enum Error {
#[error ("key too long")]
KeyTooLong,
#[error ("no such key dir")]
NoSuchKeyDir,
#[error ("value too long")]
ValueTooLong,
}
pub struct StatusQuotas {
pub max_keys: usize,
pub max_key_bytes: usize,
pub max_value_bytes: usize,
pub max_payload_bytes: usize,
}
pub struct GetAfter {
pub tuples: Vec <(Vec <u8>, Vec <u8>)>,
pub sequence: u64,
}
impl HttpService {
pub fn new (s: Store) -> Self {
Self {
store: Arc::new (s),
}
}
pub fn inner (&self) -> &Store {
&*self.store
}
pub async fn serve (&self, port: u16) -> Result <(), hyper::Error> {
use std::net::SocketAddr;
use hyper::{
Server,
service::{
make_service_fn,
service_fn,
},
};
let make_svc = make_service_fn (|_conn| {
let state = self.store.clone ();
async {
Ok::<_, String> (service_fn (move |req| {
let state = state.clone ();
Self::handle_all (req, state)
}))
}
});
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let server = Server::bind (&addr)
.serve (make_svc)
;
server.await
}
}
impl Store {
pub fn new <I> (status_dirs: I)
-> Self
where I: Iterator <Item = (Vec <u8>, StatusQuotas)>
{
let status_dirs = status_dirs
.map (|(name, quotas)| (name, StatusKeyDirectory::new (quotas)))
.collect ();
Self {
status_dirs,
}
}
pub fn list_key_dirs (&self) -> Vec <Vec <u8>> {
self.status_dirs.iter ()
.map (|(k, _)| k.clone ())
.collect ()
}
pub async fn set (&self, name: &[u8], key: &[u8], value: Vec <u8>)
-> Result <(), Error>
{
let dir = self.status_dirs.get (name)
.ok_or (Error::NoSuchKeyDir)?;
dir.set (key, value).await
}
async fn set_multi (&self, name: &[u8], tuples: Vec <(&[u8], Vec <u8>)>)
-> Result <(), Error>
{
let dir = self.status_dirs.get (name)
.ok_or (Error::NoSuchKeyDir)?;
dir.set_multi (tuples).await
}
pub async fn get_after (&self, name: &[u8], thresh: Option <u64>)
-> Result <GetAfter, Error>
{
let dir = self.status_dirs.get (name)
.ok_or (Error::NoSuchKeyDir)?;
dir.get_after (thresh).await
}
}
// End of public interface
const SET_BATCH_SIZE: usize = 32;
enum StoreCommand {
SetStatus (SetStatusCommand),
Multi (Vec <StoreCommand>),
}
struct StatusKeyDirectory {
quotas: StatusQuotas,
// TODO: Make this tokio::sync::Mutex.
table: Mutex <StatusTable>,
}
#[derive (Default)]
struct StatusTable {
map: HashMap <Vec <u8>, StatusValue>,
sequence: u64,
}
struct StatusValue {
value: Vec <u8>,
sequence: u64,
}
struct SetStatusCommand {
key_dir: Vec <u8>,
key: Vec <u8>,
value: Vec <u8>,
}
impl HttpService {
async fn handle_all (req: Request <Body>, store: Arc <Store>)
-> Result <Response <Body>, anyhow::Error>
{
Ok (Response::builder ()
.body (Body::from ("hello\n"))?)
}
}
impl StatusKeyDirectory {
fn new (quotas: StatusQuotas) -> Self {
Self {
quotas,
table: Mutex::new (Default::default ()),
}
}
async fn set (&self, key: &[u8], value: Vec <u8>) -> Result <(), Error>
{
if key.len () > self.quotas.max_key_bytes {
return Err (Error::KeyTooLong);
}
if value.len () > self.quotas.max_value_bytes {
return Err (Error::ValueTooLong);
}
{
let mut guard = self.table.lock ().await;
guard.set (&self.quotas, key, value);
}
Ok (())
}
async fn set_multi (&self, tuples: Vec <(&[u8], Vec <u8>)>) -> Result <(), Error>
{
{
let mut guard = self.table.lock ().await;
for (key, value) in tuples {
guard.set (&self.quotas, key, value);
}
}
Ok (())
}
async fn get_after (&self, thresh: Option <u64>) -> Result <GetAfter, Error> {
let guard = self.table.lock ().await;
Ok (guard.get_after (thresh))
}
}
impl StatusTable {
fn payload_bytes (&self) -> usize {
self.map.iter ()
.map (|(k, v)| k.len () + v.len ())
.sum ()
}
fn set (&mut self, quotas: &StatusQuotas, key: &[u8], value: Vec <u8>) {
self.sequence += 1;
if self.map.len () > quotas.max_keys {
self.map.clear ();
}
let new_bytes = key.len () + value.len ();
if self.payload_bytes () + new_bytes > quotas.max_payload_bytes {
self.map.clear ();
}
let value = StatusValue {
value,
sequence: self.sequence,
};
// self.map.insert (key, value);
match self.map.get_mut (key) {
None => {
self.map.insert (key.to_vec (), value);
},
Some (v) => *v = value,
}
}
fn get_after (&self, thresh: Option <u64>) -> GetAfter {
let thresh = thresh.unwrap_or (0);
let tuples = self.map.iter ()
.filter_map (|(key, value)| {
if value.sequence <= thresh {
return None;
}
Some ((key.clone (), value.value.clone ()))
})
.collect ();
GetAfter {
tuples,
sequence: self.sequence,
}
}
}
impl StatusValue {
fn len (&self) -> usize {
self.value.len ()
}
}
#[tokio::main]
async fn main () -> Result <(), hyper::Error> {
use std::time::Duration;
use tokio::{
spawn,
time::interval,
};
let service = HttpService::new (Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 4,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
}),
].into_iter ()));
service.serve (4003).await
}
#[cfg (test)]
mod tests {
use tokio::runtime::Runtime;
use super::*;
fn get_after_eq (a: &GetAfter, b: &GetAfter) {
assert_eq! (a.sequence, b.sequence);
let a = a.tuples.clone ();
let b = b.tuples.clone ();
let a = HashMap::<Vec <u8>, Vec <u8>>::from_iter (a.into_iter ());
let b = HashMap::from_iter (b.into_iter ());
assert_eq! (a, b);
}
#[test]
fn store () {
let rt = Runtime::new ().unwrap ();
rt.block_on (async {
let s = Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 4,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
}),
].into_iter ());
let mut expected_sequence = 0;
assert_eq! (s.list_key_dirs (), vec! [
b"key_dir".to_vec (),
]);
assert_eq! (
s.set (b"key_dir", b"this key is too long and will cause an error", b"bar".to_vec ()).await,
Err (Error::KeyTooLong)
);
assert_eq! (
s.set (b"key_dir", b"foo", b"this value is too long and will cause an error".to_vec ()).await,
Err (Error::ValueTooLong)
);
assert_eq! (
s.set (b"invalid_key_dir", b"foo", b"bar".to_vec ()).await,
Err (Error::NoSuchKeyDir)
);
let ga = s.get_after (b"key_dir", None).await.unwrap ();
assert_eq! (ga.sequence, expected_sequence);
assert_eq! (ga.tuples, vec! []);
s.set (b"key_dir", b"foo_1", b"bar_1".to_vec ()).await.unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).await.unwrap ();
assert_eq! (ga.sequence, expected_sequence);
assert_eq! (ga.tuples, vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
]);
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
]
});
s.set (b"key_dir", b"foo_2", b"bar_2".to_vec ()).await.unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).await.unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_1".to_vec ()),
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
]
});
s.set (b"key_dir", b"foo_1", b"bar_3".to_vec ()).await.unwrap ();
expected_sequence += 1;
let ga = s.get_after (b"key_dir", None).await.unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
(b"foo_2".to_vec (), b"bar_2".to_vec ()),
]
});
let ga = s.get_after (b"key_dir", Some (2)).await.unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! [
(b"foo_1".to_vec (), b"bar_3".to_vec ()),
]
});
let ga = s.get_after (b"key_dir", Some (3)).await.unwrap ();
get_after_eq (&ga, &GetAfter {
sequence: expected_sequence,
tuples: vec! []
});
});
}
#[test]
#[cfg (not (debug_assertions))]
fn perf () {
use std::time::Instant;
let rt = Runtime::new ().unwrap ();
rt.block_on (async {
let s = Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 4,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
}),
].into_iter ());
let num_iters = 1_000_000;
let key = b"foo";
let start_time = Instant::now ();
for i in 0..num_iters {
let value = format! ("{}", i);
s.set (b"key_dir", key, value.into ()).await.unwrap ();
}
let end_time = Instant::now ();
let total_dur = end_time - start_time;
let avg_nanos = total_dur.as_nanos () / num_iters;
assert! (avg_nanos < 250, dbg! (avg_nanos));
});
}
#[test]
#[cfg (not (debug_assertions))]
fn perf_multi () {
use std::time::Instant;
let rt = Runtime::new ().unwrap ();
rt.block_on (async {
let s = Store::new (vec! [
(b"key_dir".to_vec (), StatusQuotas {
max_keys: 8,
max_key_bytes: 16,
max_value_bytes: 16,
max_payload_bytes: 128,
}),
].into_iter ());
let num_iters = 1_000_000;
let start_time = Instant::now ();
for i in 0..num_iters {
let value = Vec::<u8>::from (format! ("{}", i));
let tuples = vec! [
(&b"foo_0"[..], value.clone ()),
(b"foo_1", value.clone ()),
(b"foo_2", value.clone ()),
(b"foo_3", value.clone ()),
(b"foo_4", value.clone ()),
(b"foo_5", value.clone ()),
(b"foo_6", value.clone ()),
(b"foo_7", value.clone ()),
];
s.set_multi (b"key_dir", tuples).await.unwrap ();
}
let end_time = Instant::now ();
let total_dur = end_time - start_time;
let avg_nanos = total_dur.as_nanos () / (num_iters * 8);
assert! (avg_nanos < 150, dbg! (avg_nanos));
});
}
}

View File

@ -1,6 +1,5 @@
use structopt::StructOpt;
use tokio::{
net::UdpSocket,
sync::watch,
};
@ -96,34 +95,7 @@ impl P2Client {
})
};
if false {
let task_direc_connect = {
let connection = connection.clone ();
tokio::spawn (async move {
let cookie = protocol::p2_direc_to_p4 (
&connection,
"bogus_server",
).await?;
let sock = UdpSocket::bind ("0.0.0.0:0").await?;
let mut interval = tokio::time::interval (Duration::from_millis (1000));
interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Delay);
loop {
interval.tick ().await;
sock.send_to(&cookie [..], "127.0.0.1:30379").await?;
debug! ("P2 sent cookie to P3 over plain UDP");
}
Ok::<_, anyhow::Error> (())
})
};
}
task_tcp_server.await??;
//task_direc_connect.await??;
Ok (())
}

View File

@ -26,10 +26,10 @@ async fn main () -> anyhow::Result <()> {
tokio::fs::write ("ptth_quic_output/quic_server.crt", app.server_cert ()).await?;
tokio::select! {
val = app.run () => {
_val = app.run () => {
},
val = running_rx.changed () => {
_val = running_rx.changed () => {
},
}

View File

@ -94,7 +94,6 @@ impl Opt {
pub struct P4EndServer {
conf: Config,
conn: quinn::Connection,
endpoint: quinn::Endpoint,
shutdown_rx: watch::Receiver <bool>,
}
@ -142,7 +141,6 @@ impl P4EndServer {
Ok ((P4EndServer {
conf,
conn,
endpoint,
shutdown_rx,
}, shutdown_tx))
}

View File

@ -10,9 +10,6 @@ use hyper::{
StatusCode,
};
use structopt::StructOpt;
use tokio::{
net::UdpSocket,
};
use crate::prelude::*;
use protocol::PeerId;
@ -87,9 +84,9 @@ impl App {
pub async fn run (self) -> anyhow::Result <()> {
let Self {
endpoint,
listen_addr,
listen_addr: _,
metrics,
server_cert,
server_cert: _,
tcp_listener,
} = self;
@ -158,35 +155,6 @@ impl App {
})
};
let task_direc_server = {
let relay_state = Arc::clone (&relay_state);
tokio::spawn (async move {
let sock = UdpSocket::bind("0.0.0.0:30379").await?;
let mut buf = [0; 2048];
loop {
let (len, addr) = sock.recv_from (&mut buf).await?;
debug! ("{:?} bytes received from {:?}", len, addr);
let packet = Vec::from_iter ((&buf [0..len]).into_iter ().map (|x| *x));
{
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
if let Some (direc_state) = direc_cookies.remove (&packet) {
debug! ("Got PTTH_DIREC cookie for {}", direc_state.p2_id);
direc_state.p2_addr.send (addr).ok ();
}
else {
debug! ("UDP packet didn't match any PTTH_DIREC cookie");
}
}
}
Ok::<_, anyhow::Error> (())
})
};
let task_http_server = tokio::spawn (async move {
http_server.serve (make_svc).await?;
Ok::<_, anyhow::Error> (())
@ -225,9 +193,6 @@ impl App {
_val = task_http_server => {
eprintln! ("HTTP server exited, exiting");
},
_val = task_direc_server => {
eprintln! ("PTTH_DIREC server exited, exiting");
},
}
Ok (())
@ -256,7 +221,6 @@ async fn handle_http (_req: Request <Body>, relay_state: Arc <RelayState>)
struct RelayState {
config: arc_swap::ArcSwap <Config>,
p4_server_proxies: Mutex <HashMap <PeerId, P4State>>,
direc_cookies: Mutex <HashMap <Vec <u8>, DirecState>>,
metrics: Arc <RwLock <Metrics>>,
stats: Stats,
http_client: reqwest::Client,
@ -265,7 +229,6 @@ struct RelayState {
#[derive (Default)]
struct Config {
ip_nicknames: BTreeMap <[u8; 4], String>,
tcp_listen_port: Option <u16>,
webhook_url: Option <String>,
}
@ -273,7 +236,6 @@ impl From <ConfigFile> for Config {
fn from (x: ConfigFile) -> Self {
Self {
ip_nicknames: x.ip_nicknames.into_iter ().collect (),
tcp_listen_port: x.tcp_listen_port,
webhook_url: x.webhook_url,
}
}
@ -286,12 +248,6 @@ struct ConfigFile {
webhook_url: Option <String>,
}
struct DirecState {
start_time: Instant,
p2_id: PeerId,
p2_addr: tokio::sync::oneshot::Sender <SocketAddr>,
}
#[derive (Default)]
struct Stats {
quic: ConnectEvents,
@ -513,19 +469,7 @@ async fn handle_p2_connection (
recv
).await?
},
protocol::P2ToP3Stream::DirecP2ToP4 {
server_id,
cookie,
} => {
handle_direc_p2_to_p4 (
relay_state,
client_id,
server_id,
cookie,
send,
recv
).await?
},
_ => (),
}
debug! ("Request ended for P2");
@ -569,41 +513,6 @@ async fn handle_request_p2_to_p4 (
Ok (())
}
async fn handle_direc_p2_to_p4 (
relay_state: Arc <RelayState>,
client_id: String,
server_id: PeerId,
cookie: Vec <u8>,
mut client_send: quinn::SendStream,
client_recv: quinn::RecvStream,
) -> anyhow::Result <()>
{
debug! ("P2 {} wants a P2P connection to P4 {}", client_id, server_id);
// TODO: Check authorization
protocol::p3_authorize_p2_to_p4_direc (&mut client_send).await?;
let (tx, rx) = tokio::sync::oneshot::channel ();
{
let mut direc_cookies = relay_state.direc_cookies.lock ().await;
direc_cookies.insert (cookie, DirecState {
start_time: Instant::now (),
p2_id: client_id.clone (),
p2_addr: tx,
});
}
debug! ("Waiting to learn P2's WAN address...");
let wan_addr = rx.await?;
debug! ("And that WAN address is {}", wan_addr);
Ok (())
}
async fn handle_p4_connection (
relay_state: Arc <RelayState>,
connection: quinn::Connection,

View File

@ -9,7 +9,7 @@ use std::{
use quinn::{
ClientConfig, Endpoint,
ServerConfig, TransportConfig,
ServerConfig,
};
/// Constructs a QUIC endpoint configured for use a client only.

View File

@ -109,8 +109,6 @@ pub struct Relay {
#[derive (Clone, Default)]
pub (crate) struct MonitoringCounters {
pub (crate) requests_total: u64,
pub (crate) requests_by_scraper_api: HashMap <String, u64>,
pub (crate) requests_by_email: HashMap <String, u64>,
}
#[derive (Clone)]
@ -122,7 +120,6 @@ pub struct RejectedServer {
#[derive (Clone, Debug)]
pub struct AuditEvent {
time_monotonic: Instant,
pub time_utc: DateTime <Utc>,
pub data: AuditData,
}
@ -148,7 +145,6 @@ pub enum AuditData {
impl AuditEvent {
pub fn new (data: AuditData) -> Self {
Self {
time_monotonic: Instant::now (),
time_utc: Utc::now (),
data,
}

View File

@ -192,7 +192,7 @@ async fn api_v1 (
})).await;
if path_rest == "metrics" {
Ok (metrics (req, state).await?)
Ok (metrics (state).await?)
}
else if path_rest == "test" {
Ok (error_reply (StatusCode::OK, "You're valid!")?)
@ -223,9 +223,8 @@ async fn api_v1 (
}
}
#[instrument (level = "trace", skip (req, state))]
#[instrument (level = "trace", skip (state))]
async fn metrics (
req: Request <Body>,
state: &Relay,
)
-> Result <Response <Body>, RequestError>

View File

@ -57,7 +57,7 @@ pub async fn handle_listen (
let mut server_status = state.server_status.lock ().await;
let mut status = server_status.entry (watcher_code.clone ()).or_insert_with (Default::default);
let status = server_status.entry (watcher_code.clone ()).or_insert_with (Default::default);
status.last_seen = now;
}

View File

@ -419,27 +419,6 @@ impl FileServer {
send_body,
range,
}) => serve_file (uri, file.into_inner (), send_body, range, headers.get ("if-none-match").map (|v| &v[..])).await?,
MarkdownErr (e) => {
#[cfg (feature = "markdown")]
{
use markdown::Error::*;
let e = e.inner;
let code = match &e {
TooBig => StatusCode::InternalServerError,
//NotMarkdown => serve_error (StatusCode::BadRequest, "File is not Markdown"),
NotUtf8 => StatusCode::BadRequest,
};
return Ok (serve_error (code, e.to_string ()));
}
#[cfg (not (feature = "markdown"))]
{
let _e = e;
serve_error (StatusCode::BadRequest, "Markdown feature is disabled")
}
},
MarkdownPreview (s) => html::serve (s),
})
}
}

View File

@ -76,9 +76,6 @@ pub enum Response {
Root,
ServeDir (ServeDirParams),
ServeFile (ServeFileParams),
MarkdownErr (MarkdownErrWrapper),
MarkdownPreview (String),
}
#[cfg (feature = "markdown")]
@ -131,7 +128,7 @@ fn serve_dir (
async fn serve_file (
file: tokio::fs::File,
uri: &http::Uri,
_uri: &http::Uri,
send_body: bool,
headers: &HashMap <String, Vec <u8>>
)

View File

@ -266,8 +266,6 @@ pub struct Config {
pub struct Builder {
config_file: ConfigFile,
hidden_path: Option <PathBuf>,
asset_root: Option <PathBuf>,
}
impl Builder {
@ -289,17 +287,13 @@ impl Builder {
Self {
config_file,
hidden_path: None,
asset_root: None,
}
}
pub fn build (self) -> Result <State, ServerError>
{
State::new (
self.config_file,
self.hidden_path,
self.asset_root
self.config_file
)
}
@ -340,12 +334,8 @@ pub async fn run_server (
let state = Arc::new (State::new (
config_file,
hidden_path,
asset_root,
)?);
let file_server_2 = Arc::clone (&file_server);
let mut spawn_handler = || {
let file_server = Arc::clone (&file_server);
let hit_counter = hit_counter.clone ();
@ -353,7 +343,7 @@ pub async fn run_server (
|req: http_serde::RequestParts| async move {
if let Some (hit_tx) = &hit_counter {
eprintln! ("hit_tx.send");
hit_tx.send (()).await;
hit_tx.send (()).await.ok ();
}
Ok (file_server.serve_all (req.method, &req.uri, &req.headers).await?)
}
@ -369,15 +359,11 @@ pub async fn run_server (
impl State {
pub fn new (
config_file: ConfigFile,
hidden_path: Option <PathBuf>,
asset_root: Option <PathBuf>
)
-> Result <Self, ServerError>
{
use std::convert::TryInto;
let asset_root = asset_root.unwrap_or_else (PathBuf::new);
info! ("Server name is {}", config_file.name);
info! ("Tripcode is {}", config_file.tripcode ());

View File

@ -24,18 +24,6 @@ fn load_inner <
Ok (toml::from_str (&config_s)?)
}
/// For files that contain public-viewable information
fn load_public <
T: DeserializeOwned,
P: AsRef <Path> + Debug
> (
config_file_path: P
) -> Result <T, LoadTomlError> {
let f = File::open (&config_file_path)?;
load_inner (f)
}
/// For files that may contain secrets and should have permissions or other
/// safeties checked