diff --git a/src/prelude.rs b/src/prelude.rs index 381abf9..f38e111 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -11,7 +11,7 @@ pub use std::{ rc::Rc, str::FromStr, task::{Context, Poll}, - time::{Duration, Instant}, + time::Duration, }; pub use tokio::{ net::{TcpListener, TcpStream}, diff --git a/src/server.rs b/src/server.rs index c939173..eac0dea 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,7 @@ use crate::prelude::*; +type Id = u64; + pub(crate) struct Args { pub(crate) port: u16, } @@ -11,9 +13,9 @@ impl Default for Args { } pub(crate) struct App { - clients: Vec, + clients: BTreeMap, listener: TcpListener, - next_client_id: u64, + next_client_id: Id, } impl App { @@ -31,7 +33,9 @@ impl App { let previous_client_count = self.clients.len(); let mut new_messages = vec![]; - for client in &mut self.clients { + let mut clients_to_delete = vec![]; + for (id, client) in &mut self.clients { + let id = *id; { let mut stream = pin!(&mut client.stream); match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { @@ -71,9 +75,9 @@ impl App { Poll::Ready(Err(err)) => { tracing::error!(?err, "client.poll_next error"); } - Poll::Ready(Ok(None)) => client.closed = true, + Poll::Ready(Ok(None)) => clients_to_delete.push(id), Poll::Ready(Ok(Some(ToServer::ChatLine { line, sequence }))) => { - tracing::debug!(id = client.id, ?line); + tracing::debug!(id = id, ?line); if sequence != client.sequence { tracing::error!( expected = client.sequence, @@ -82,24 +86,19 @@ impl App { ); return Poll::Ready(Err(anyhow!("Sequence mismatch"))); } - tracing::info!(?sequence, id = client.id); + tracing::info!(?sequence, id = id); client.sequence += 1; - new_messages.push(Rc::new(ToClient::ChatLine { - id: client.id, - line, - })); + new_messages.push(Rc::new(ToClient::ChatLine { id, line })); } } } - self.clients.retain(|client| { - if client.closed { - tracing::info!(id = client.id, "Closing client"); - } - !client.closed - }); + for id in clients_to_delete { + tracing::info!(?id, "Closing client"); + self.clients.remove(&id); + } - for client in &mut self.clients { + for client in &mut self.clients.values_mut() { for msg in &new_messages { client.outbox.push_back(Rc::clone(msg)); } @@ -112,7 +111,6 @@ impl App { cx.waker().wake_by_ref(); let stream = Framed::new(stream, LengthDelimitedCodec::new()); let client = Client { - closed: false, id: self.next_client_id, outbox: Default::default(), sequence: 0, @@ -120,7 +118,7 @@ impl App { }; self.next_client_id += 1; tracing::info!(id = client.id, "Accepted client"); - self.clients.push(client); + self.clients.insert(client.id, client); } } @@ -133,7 +131,6 @@ impl App { } struct Client { - closed: bool, id: u64, outbox: VecDeque>, sequence: u64,