networking idea

main
_ 2023-09-14 23:02:27 -05:00
parent 9405a2d10b
commit 865f09f865
1 changed files with 121 additions and 6 deletions

View File

@ -7,7 +7,11 @@ use std::{
use eframe::egui; use eframe::egui;
use tokio::sync::mpsc; use tokio::sync::
{
mpsc,
watch,
};
mod capture; mod capture;
mod controller; mod controller;
@ -208,28 +212,139 @@ fn main_network ()
AsyncWriteExt, AsyncWriteExt,
}; };
enum MsgToController
{
AcceptedClient ((u64, mpsc::Sender <MsgToServer>)),
DisconnectedClient (u64),
GeneratedData (u64),
Request ((u64, u8)),
}
struct MsgToServer (Vec <u8>);
struct Client
{
sender: mpsc::Sender <MsgToServer>,
pause_time: Instant,
}
let rt = tokio::runtime::Runtime::new ().unwrap (); let rt = tokio::runtime::Runtime::new ().unwrap ();
rt.block_on (async rt.block_on (async
{ {
let (send_to_ctl, mut recv_at_ctl) = mpsc::channel (8);
let (send_to_gen, mut recv_at_gen) = watch::channel (Instant::now ());
// Control task
tokio::spawn (async move
{
let mut clients = std::collections::BTreeMap::default ();
while let Some (msg) = recv_at_ctl.recv ().await
{
match msg
{
MsgToController::AcceptedClient ((id, sender)) =>
{
let client = Client
{
sender,
pause_time: Instant::now (),
};
clients.insert (id, client);
},
MsgToController::DisconnectedClient(id) =>
{
clients.remove (&id);
},
MsgToController::GeneratedData (data) =>
{
let now = Instant::now ();
for (_id, client) in clients.iter_mut()
.filter(|(_id, client)| client.pause_time > now)
{
client.sender.send (MsgToServer (format! ("Data {data}\n").into_bytes())).await.unwrap ();
}
},
MsgToController::Request ((id, _req)) =>
{
let client = clients.get_mut (&id).unwrap ();
client.pause_time = Instant::now () + Duration::from_secs (2);
client.sender.send (MsgToServer (format! (":V {id}\n").into_bytes())).await.unwrap ();
send_to_gen.send (client.pause_time).unwrap ();
},
}
}
});
// Bogus data generator task
let send_to_ctl_2 = send_to_ctl.clone ();
tokio::spawn (async move
{
let mut i = 0;
while recv_at_gen.changed ().await.is_ok ()
{
let pause_time = *recv_at_gen.borrow_and_update ();
while Instant::now () < pause_time
{
println! ("Generating...");
tokio::time::sleep (Duration::from_millis(200)).await;
send_to_ctl_2.send (MsgToController::GeneratedData (i)).await.unwrap ();
i += 1;
}
};
});
let server = tokio::net::TcpListener::bind ("127.0.0.1:9001").await.unwrap (); let server = tokio::net::TcpListener::bind ("127.0.0.1:9001").await.unwrap ();
// Accept clients
tokio::spawn (async move tokio::spawn (async move
{ {
while let Ok ((mut stream, _remote_addr)) = server.accept ().await let mut client_id = 0u64;
while let Ok ((stream, _remote_addr)) = server.accept ().await
{ {
let (send_to_server, mut recv_at_server) = mpsc::channel (8);
send_to_ctl.send (MsgToController::AcceptedClient ((client_id, send_to_server))).await.unwrap ();
let (mut net_read, mut net_write) = stream.into_split ();
// Read from clients
let send_to_ctl_2 = send_to_ctl.clone ();
tokio::spawn (async move tokio::spawn (async move
{ {
for i in 0.. loop
{ {
let mut buf = vec! [0u8]; let mut buf = vec! [0u8, 0u8];
if let Err (_) = stream.read_exact (&mut buf).await if let Err (_) = net_read.read_exact (&mut buf).await
{ {
break; break;
} }
stream.write_all (format! ("{i}\n").as_bytes ()).await.unwrap (); send_to_ctl_2.send (MsgToController::Request ((client_id, buf [0]))).await.unwrap ();
} }
send_to_ctl_2.send (MsgToController::DisconnectedClient (client_id)).await.ok ();
}); });
// Write to clients
let send_to_ctl_2 = send_to_ctl.clone ();
tokio::spawn (async move
{
while let Some (msg) = recv_at_server.recv ().await
{
net_write.write_all (&msg.0).await.unwrap ();
}
send_to_ctl_2.send (MsgToController::DisconnectedClient (client_id)).await.ok ();
});
client_id += 1;
} }
}).await.unwrap (); }).await.unwrap ();
}); });