diff --git a/src/main.rs b/src/main.rs index bd931c3..2aab6bc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,11 @@ use std::{ use eframe::egui; -use tokio::sync::mpsc; +use tokio::sync:: +{ + mpsc, + watch, +}; mod capture; mod controller; @@ -208,28 +212,139 @@ fn main_network () AsyncWriteExt, }; + enum MsgToController + { + AcceptedClient ((u64, mpsc::Sender )), + DisconnectedClient (u64), + GeneratedData (u64), + Request ((u64, u8)), + } + + struct MsgToServer (Vec ); + + struct Client + { + sender: mpsc::Sender , + pause_time: Instant, + } + let rt = tokio::runtime::Runtime::new ().unwrap (); rt.block_on (async { + let (send_to_ctl, mut recv_at_ctl) = mpsc::channel (8); + let (send_to_gen, mut recv_at_gen) = watch::channel (Instant::now ()); + + // Control task + tokio::spawn (async move + { + let mut clients = std::collections::BTreeMap::default (); + + while let Some (msg) = recv_at_ctl.recv ().await + { + match msg + { + MsgToController::AcceptedClient ((id, sender)) => + { + let client = Client + { + sender, + pause_time: Instant::now (), + }; + clients.insert (id, client); + }, + MsgToController::DisconnectedClient(id) => + { + clients.remove (&id); + }, + MsgToController::GeneratedData (data) => + { + let now = Instant::now (); + for (_id, client) in clients.iter_mut() + .filter(|(_id, client)| client.pause_time > now) + { + client.sender.send (MsgToServer (format! ("Data {data}\n").into_bytes())).await.unwrap (); + } + }, + MsgToController::Request ((id, _req)) => + { + let client = clients.get_mut (&id).unwrap (); + + client.pause_time = Instant::now () + Duration::from_secs (2); + client.sender.send (MsgToServer (format! (":V {id}\n").into_bytes())).await.unwrap (); + send_to_gen.send (client.pause_time).unwrap (); + }, + } + } + }); + + // Bogus data generator task + + let send_to_ctl_2 = send_to_ctl.clone (); + tokio::spawn (async move + { + let mut i = 0; + + while recv_at_gen.changed ().await.is_ok () + { + let pause_time = *recv_at_gen.borrow_and_update (); + while Instant::now () < pause_time + { + println! ("Generating..."); + tokio::time::sleep (Duration::from_millis(200)).await; + + send_to_ctl_2.send (MsgToController::GeneratedData (i)).await.unwrap (); + i += 1; + } + }; + }); + let server = tokio::net::TcpListener::bind ("127.0.0.1:9001").await.unwrap (); + // Accept clients + tokio::spawn (async move { - while let Ok ((mut stream, _remote_addr)) = server.accept ().await + let mut client_id = 0u64; + while let Ok ((stream, _remote_addr)) = server.accept ().await { + let (send_to_server, mut recv_at_server) = mpsc::channel (8); + send_to_ctl.send (MsgToController::AcceptedClient ((client_id, send_to_server))).await.unwrap (); + + let (mut net_read, mut net_write) = stream.into_split (); + + // Read from clients + + let send_to_ctl_2 = send_to_ctl.clone (); tokio::spawn (async move { - for i in 0.. + loop { - let mut buf = vec! [0u8]; - if let Err (_) = stream.read_exact (&mut buf).await + let mut buf = vec! [0u8, 0u8]; + if let Err (_) = net_read.read_exact (&mut buf).await { break; } - stream.write_all (format! ("{i}\n").as_bytes ()).await.unwrap (); + send_to_ctl_2.send (MsgToController::Request ((client_id, buf [0]))).await.unwrap (); } + + send_to_ctl_2.send (MsgToController::DisconnectedClient (client_id)).await.ok (); }); + + // Write to clients + + let send_to_ctl_2 = send_to_ctl.clone (); + tokio::spawn (async move + { + while let Some (msg) = recv_at_server.recv ().await + { + net_write.write_all (&msg.0).await.unwrap (); + } + + send_to_ctl_2.send (MsgToController::DisconnectedClient (client_id)).await.ok (); + }); + + client_id += 1; } }).await.unwrap (); });