From 7b4eeea12c1ebf3d8635605c07aeef62ff30efc9 Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Tue, 18 Oct 2022 01:53:55 +0000 Subject: [PATCH] :construction: wip: messing with UDP multicast --- Cargo.lock | 11 ++- crates/insecure_chat/Cargo.toml | 1 + crates/insecure_chat/src/main.rs | 157 ++++++++++++++++++++++++++++++- 3 files changed, 160 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb955e5..835d3d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -636,9 +636,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.5.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" [[package]] name = "httpdate" @@ -648,9 +648,9 @@ checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" [[package]] name = "hyper" -version = "0.14.13" +version = "0.14.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593" +checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" dependencies = [ "bytes", "futures-channel", @@ -661,7 +661,7 @@ dependencies = [ "http-body", "httparse", "httpdate", - "itoa 0.4.8", + "itoa 1.0.1", "pin-project-lite", "socket2", "tokio", @@ -721,6 +721,7 @@ dependencies = [ name = "insecure_chat" version = "0.1.0" dependencies = [ + "hyper", "mac_address", "nix 0.25.0", "ptth_diceware", diff --git a/crates/insecure_chat/Cargo.toml b/crates/insecure_chat/Cargo.toml index c3de3af..00b6328 100644 --- a/crates/insecure_chat/Cargo.toml +++ b/crates/insecure_chat/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +hyper = { version = "0.14.20", features = ["http1", "server", "tcp"] } mac_address = "1.1.4" nix = "0.25.0" ptth_diceware = { path = "../ptth_diceware" } diff --git a/crates/insecure_chat/src/main.rs b/crates/insecure_chat/src/main.rs index 48c440e..5b38d10 100644 --- a/crates/insecure_chat/src/main.rs +++ b/crates/insecure_chat/src/main.rs @@ -1,9 +1,28 @@ use std::{ + collections::*, net::{ Ipv4Addr, SocketAddrV4, - UdpSocket, }, + sync::Arc, +}; + +use hyper::{ + Body, + Method, + Request, + Response, + Server, + StatusCode, + service::{ + make_service_fn, + service_fn, + }, +}; + +use tokio::{ + net::UdpSocket, + sync::RwLock, }; mod ip; @@ -25,6 +44,14 @@ fn main () -> Result <(), Error> if arg == "--name" { name = args.next ().unwrap ().to_string (); } + else if arg == "peer" { + subcommand = Some (Subcommand::Peer); + subcommand_count += 1; + } + else if arg == "receiver" { + subcommand = Some (Subcommand::Receiver); + subcommand_count += 1; + } else if arg == "sender" { subcommand = Some (Subcommand::Sender); subcommand_count += 1; @@ -57,6 +84,8 @@ fn main () -> Result <(), Error> rt.block_on (async { if let Some (cmd) = subcommand { return match cmd { + Subcommand::Peer => peer (params).await, + Subcommand::Receiver => receiver (params).await, Subcommand::Sender => sender (params).await, Subcommand::Spy => spy (params), }; @@ -71,6 +100,8 @@ fn main () -> Result <(), Error> } enum Subcommand { + Peer, + Receiver, Sender, Spy, } @@ -89,11 +120,125 @@ impl Default for Params { } } +async fn peer (params: Params) -> Result <(), Error> +{ + let (multicast_addr, multicast_port) = params.multicast_group; + let socket = tokio::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, multicast_port)).await?; + + socket.join_multicast_v4 (multicast_addr, Ipv4Addr::UNSPECIFIED)?; + eprintln! ("Multicast group is {:?}", params.multicast_group); + eprintln! ("Local addr is {}", socket.local_addr ()?); + + let peer = Peer { + outbox: Outbox { + index: 1000, + messages: Default::default (), + }.into (), + params, + socket, + }; + + let state = Arc::new (peer); + + let make_svc = make_service_fn (|_conn| { + let state = state.clone (); + + async { + Ok::<_, String> (service_fn (move |req| { + let state = state.clone (); + peer_handle_all (req, state) + })) + } + }); + + let addr = std::net::SocketAddr::from (([127, 0, 0, 1], multicast_port)); + let server = Server::bind (&addr) + .serve (make_svc); + + eprintln! ("Local UI on {}", addr); + + server.await?; + + Ok (()) +} + +struct Peer { + outbox: RwLock , + params: Params, + socket: UdpSocket, +} + +struct Outbox { + index: u32, + messages: VecDeque , +} + +struct SentMessage { + index: u32, + body: Vec , +} + +async fn peer_handle_all (req: Request , state: Arc ) +-> Result , Error> +{ + if req.method () == Method::POST { + if req.uri () == "/paste" { + let body = hyper::body::to_bytes (req.into_body ()).await?; + if body.len () > 1024 { + let resp = Response::builder () + .status (StatusCode::BAD_REQUEST) + .body (Body::from ("Message body must be <= 1024 bytes"))?; + return Ok (resp); + } + let body = body.to_vec (); + + let msg_index; + { + let mut outbox = state.outbox.write ().await; + let msg = SentMessage { + index: outbox.index, + body: body.clone (), + }; + msg_index = msg.index; + outbox.messages.push_back (msg); + if outbox.messages.len () > 10 { + outbox.messages.pop_front (); + } + outbox.index += 1; + } + + state.socket.send_to (&body, state.params.multicast_group).await?; + + return Ok (Response::new (format! ("Pasted message {}\n", msg_index).into ())); + } + } + + Ok (Response::new (":V\n".into ())) +} + +async fn receiver (params: Params) -> Result <(), Error> +{ + let (multicast_addr, multicast_port) = params.multicast_group; + let socket = tokio::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, multicast_port)).await?; + + socket.join_multicast_v4 (multicast_addr, Ipv4Addr::UNSPECIFIED)?; + eprintln! ("Multicast group is {:?}", params.multicast_group); + eprintln! ("Local addr is {}", socket.local_addr ()?); + + loop { + let mut buf = vec! [0u8; 2048]; + + let (bytes_recved, remote_addr) = socket.recv_from (&mut buf).await?; + buf.truncate (bytes_recved); + + println! ("Received {} bytes from {}", bytes_recved, remote_addr); + } +} + async fn sender (params: Params) -> Result <(), Error> { - let socket = tokio::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?; - let (multicast_addr, multicast_port) = params.multicast_group; + let socket = tokio::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, 0)).await?; socket.join_multicast_v4 (multicast_addr, Ipv4Addr::UNSPECIFIED)?; eprintln! ("Multicast group is {:?}", params.multicast_group); @@ -108,7 +253,7 @@ fn spy (params: Params) -> Result <(), Error> { let (multicast_addr, multicast_port) = params.multicast_group; - let socket = match UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, multicast_port)) { + let socket = match std::net::UdpSocket::bind (SocketAddrV4::new (Ipv4Addr::UNSPECIFIED, multicast_port)) { Ok (x) => x, Err (e) => if e.kind () == std::io::ErrorKind::AddrInUse { eprintln! ("Address in use. You can only run 1 instance of Insecure Chat at a time, even in spy mode."); @@ -144,6 +289,10 @@ enum Error { #[error ("CLI args")] Args, #[error (transparent)] + Hyper (#[from] hyper::Error), + #[error (transparent)] + HyperHttp (#[from] hyper::http::Error), + #[error (transparent)] Io (#[from] std::io::Error), #[error (transparent)] Ip (#[from] ip::Error),