diff --git a/Cargo.lock b/Cargo.lock index 835d3d8..c138489 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -725,6 +725,7 @@ dependencies = [ "mac_address", "nix 0.25.0", "ptth_diceware", + "rand", "thiserror", "tokio", ] diff --git a/crates/insecure_chat/Cargo.toml b/crates/insecure_chat/Cargo.toml index 00b6328..1a015d5 100644 --- a/crates/insecure_chat/Cargo.toml +++ b/crates/insecure_chat/Cargo.toml @@ -10,5 +10,6 @@ hyper = { version = "0.14.20", features = ["http1", "server", "tcp"] } mac_address = "1.1.4" nix = "0.25.0" ptth_diceware = { path = "../ptth_diceware" } +rand = "0.8.5" thiserror = "1.0.37" tokio = { version = "1.21.2", features = ["net", "rt-multi-thread", "fs"] } diff --git a/crates/insecure_chat/src/main.rs b/crates/insecure_chat/src/main.rs index 5b38d10..c7ac46b 100644 --- a/crates/insecure_chat/src/main.rs +++ b/crates/insecure_chat/src/main.rs @@ -5,6 +5,7 @@ use std::{ SocketAddrV4, }, sync::Arc, + time::Duration, }; use hyper::{ @@ -26,6 +27,7 @@ use tokio::{ }; mod ip; +mod tlv; fn main () -> Result <(), Error> { @@ -122,6 +124,10 @@ impl Default for Params { async fn peer (params: Params) -> Result <(), Error> { + use rand::Rng; + + let mut id = [0]; + rand::thread_rng ().try_fill (&mut id).or (Err (Error::Rand))?; let (multicast_addr, multicast_port) = params.multicast_group; let socket = tokio::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, multicast_port)).await?; @@ -130,6 +136,7 @@ async fn peer (params: Params) -> Result <(), Error> eprintln! ("Local addr is {}", socket.local_addr ()?); let peer = Peer { + id: id [0], outbox: Outbox { index: 1000, messages: Default::default (), @@ -138,10 +145,54 @@ async fn peer (params: Params) -> Result <(), Error> socket, }; + eprintln! ("Random peer ID is {}", peer.id); + let state = Arc::new (peer); + { + let state = Arc::clone (&state); + tokio::spawn (async move { + let mut interval = tokio::time::interval (Duration::from_secs (25)); + interval.set_missed_tick_behavior (tokio::time::MissedTickBehavior::Skip); + + loop { + interval.tick ().await; + + state.send_multicast (&tlv::Message::IAmOnline { peer_id: state.id }).await.ok (); + } + }); + } + + { + let state = Arc::clone (&state); + tokio::spawn (async move { + loop { + let mut buf = vec! [0u8; 2048]; + let (bytes_recved, addr) = match state.socket.recv_from (&mut buf).await + { + Err (_) => { + tokio::time::sleep (Duration::from_secs (10)).await; + continue; + }, + Ok (x) => x, + }; + let buf = &buf [0..bytes_recved]; + + let msg = match tlv::decode (buf) { + Err (_) => { + eprintln! ("ZAT4ERXR Couldn't decode message"); + continue; + }, + Ok (x) => x, + }; + + println! ("Received {:?}", msg); + } + }); + } + let make_svc = make_service_fn (|_conn| { - let state = state.clone (); + let state = Arc::clone (&state); async { Ok::<_, String> (service_fn (move |req| { @@ -163,11 +214,21 @@ async fn peer (params: Params) -> Result <(), Error> } struct Peer { + id: u32, outbox: RwLock , params: Params, socket: UdpSocket, } +impl Peer { + async fn send_multicast (&self, msg: &tlv::Message) -> Result <(), Error> + { + let msg = tlv::encode (&msg)?; + self.socket.send_to (&msg, self.params.multicast_group).await?; + Ok (()) + } +} + struct Outbox { index: u32, messages: VecDeque , @@ -207,7 +268,18 @@ async fn peer_handle_all (req: Request , state: Arc ) outbox.index += 1; } - state.socket.send_to (&body, state.params.multicast_group).await?; + match state.send_multicast (&tlv::Message::IHaveMessage { + peer_id: state.id, + msg_index, + body, + }).await { + Ok (_) => (), + Err (_) => return Ok ( + Response::builder () + .status (StatusCode::BAD_REQUEST) + .body (Body::from ("Can't encode message"))? + ), + } return Ok (Response::new (format! ("Pasted message {}\n", msg_index).into ())); } @@ -296,4 +368,8 @@ enum Error { Io (#[from] std::io::Error), #[error (transparent)] Ip (#[from] ip::Error), + #[error ("Randomness")] + Rand, + #[error (transparent)] + Tlv (#[from] tlv::Error), } diff --git a/crates/insecure_chat/src/tlv.rs b/crates/insecure_chat/src/tlv.rs new file mode 100644 index 0000000..8f5ae6d --- /dev/null +++ b/crates/insecure_chat/src/tlv.rs @@ -0,0 +1,122 @@ +// A crummy ad-hoc TLV cause I'm lazy and these are fun + +#[derive (Debug, PartialEq)] +pub enum Message { + IAmOnline { + peer_id: u32, + }, + IHaveMessage { + peer_id: u32, + msg_index: u32, + body: Vec , + }, + IAcknowledgeYourMessage { + msg_index: u32, + }, +} + +#[derive (Debug, thiserror::Error)] +pub enum Error { + #[error ("Bad magic number")] + BadMagic, + #[error ("Data is too big to encode / decode")] + DataTooBig, + #[error ("Required field missing while decoding")] + MissingField, + #[error ("No type in TLV")] + NoType, + #[error ("Unrecognized TLV type")] + UnrecognizedType, +} + +const MAGIC: [u8; 4] = [55, 11, 101, 38]; + +pub fn decode (src: &[u8]) -> Result +{ + let (magic, src) = take_bytes (src, 4).ok_or (Error::BadMagic)?; + if magic != MAGIC { + return Err (Error::BadMagic); + } + + let (msg_type, src) = take_bytes(src, 1).ok_or(Error::NoType)?; + + Ok (match msg_type [0] { + 1 => { + let (peer_id, src) = take_u32 (src).ok_or (Error::MissingField)?; + Message::IAmOnline { peer_id, } + }, + 2 => { + let (peer_id, src) = take_u32 (src).ok_or (Error::MissingField)?; + let (msg_index, src) = take_u32 (src).ok_or (Error::MissingField)?; + let (body_len, src) = take_u32 (src).ok_or (Error::MissingField)?; + let (body_bytes, src) = take_bytes(src, body_len as usize).ok_or(Error::MissingField)?; + let body = body_bytes.into (); + Message::IHaveMessage { peer_id, msg_index, body, } + }, + 3 => { + let (msg_index, src) = take_u32 (src).ok_or (Error::MissingField)?; + Message::IAcknowledgeYourMessage { msg_index, } + }, + _ => return Err (Error::UnrecognizedType), + }) +} + +fn take_u32 (src: &[u8]) -> Option <(u32, &[u8])> +{ + let (a, b) = take_bytes (src, 4)?; + Some ((u32::from_le_bytes([a [0], a [1], a [2], a [3]]), b)) +} + +fn take_bytes (src: &[u8], n: usize) -> Option <(&[u8], &[u8])> +{ + if src.len () < n { + return None; + } + + Some (src.split_at (n)) +} + +pub fn encode (msg: &Message) -> Result , Error> +{ + let mut buf = Vec::from (MAGIC); + + match msg { + Message::IAmOnline { peer_id } => { + buf.push (1); + buf.extend_from_slice (&peer_id.to_le_bytes()); + }, + Message::IHaveMessage { peer_id, msg_index, body } => { + buf.push (2); + buf.extend_from_slice (&peer_id.to_le_bytes()); + buf.extend_from_slice (&msg_index.to_le_bytes()); + + let body_len = u32::try_from (body.len ()).or (Err (Error::DataTooBig))?; + buf.extend_from_slice (&body_len.to_le_bytes()); + buf.extend_from_slice(body.as_slice()); + }, + Message::IAcknowledgeYourMessage { msg_index } => { + buf.push (3); + buf.extend_from_slice (&msg_index.to_le_bytes()); + }, + } + + Ok (buf) +} + +#[cfg (test)] +mod test { + use super::*; + + #[test] + fn roundtrip () { + for msg in [ + Message::IAmOnline { peer_id: 93 }, + Message::IHaveMessage { peer_id: 93, msg_index: 1000, body: Vec::from (b":V".as_slice()) }, + Message::IAcknowledgeYourMessage { msg_index: 1000 }, + ] { + let encoded = encode (&msg).unwrap (); + let roundtripped = decode (&encoded).unwrap (); + assert_eq!(roundtripped, msg); + } + } +}