diff --git a/src/prelude.rs b/src/prelude.rs index 94112b8..4be8dc7 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,5 +1,5 @@ pub(crate) use crate::messages::{ToClient, ToServer}; -pub use anyhow::{Context as _, Result, anyhow, bail}; +pub use anyhow::{Context as _, Result, bail}; pub use bytes::Bytes; pub use futures_core::stream::Stream; pub use futures_sink::Sink; diff --git a/src/server.rs b/src/server.rs index f6e24e3..65ecf78 100644 --- a/src/server.rs +++ b/src/server.rs @@ -48,16 +48,12 @@ impl App { <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { result.context("Can't check network write half for readiness")?; - let client = self - .server - .clients - .get_mut(id) - .context("Logic error: Stream has no associated client")?; - if let Some(frame) = client.poll_send() { + if let Some(frame) = self.server.poll_send(*id)? { stream.as_mut().start_send(frame).context("start_send")?; - match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { - Poll::Pending => {} - Poll::Ready(result) => result.context("poll_flush")?, + if let Poll::Ready(result) = + <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) + { + result.context("poll_flush")?; } tracing::debug!("Started send"); } @@ -70,14 +66,7 @@ impl App { Poll::Ready(Some(result)) => { let frame = result.context("stream.poll_next")?; cx.waker().wake_by_ref(); - let client = self - .server - .clients - .get_mut(id) - .context("Logic error: Stream has no associated client")?; - client - .handle_frame(frame.into()) - .context("client.handle_frame")? + self.server.handle_client_frame(*id, frame.into())?; } } } @@ -86,33 +75,15 @@ impl App { for id in clients_to_remove { tracing::info!(?id, "Closing client"); self.client_streams.remove(&id); - self.server.clients.remove(&id); + self.server.handle_client_disconnected(id); } - // Broadcast chat lines across all clients - - let mut new_messages = vec![]; - for client in &mut self.server.clients.values_mut() { - if let Some(ChatLine { id, line }) = client.poll_inbox() { - new_messages.push(ToClient::ChatLine { id, line }); - } - } - - for client in &mut self.server.clients.values_mut() { - for msg in &new_messages { - client.handle_outgoing(msg)?; - } - } - - match self.listener.poll_accept(cx) { - Poll::Pending => {} - Poll::Ready(result) => { - let (stream, _addr) = result.context("listener.poll_accept")?; - cx.waker().wake_by_ref(); - let stream = Framed::new(stream, LengthDelimitedCodec::new()); - let id = self.server.handle_new_client(); - self.client_streams.insert(id, stream); - } + if let Poll::Ready(result) = self.listener.poll_accept(cx) { + let (stream, _addr) = result.context("listener.poll_accept")?; + cx.waker().wake_by_ref(); + let stream = Framed::new(stream, LengthDelimitedCodec::new()); + let id = self.server.handle_new_client(); + self.client_streams.insert(id, stream); } Ok(()) @@ -126,12 +97,32 @@ struct Server { } impl Server { + fn handle_client_disconnected(&mut self, id: Id) { + self.clients.remove(&id); + } + + fn handle_client_frame(&mut self, id: Id, frame: Bytes) -> Result<()> { + let msg = { + let client = self + .clients + .get_mut(&id) + .context("Logic error: Stream has no associated client")?; + let Some(msg) = client.handle_frame(frame).context("client.handle_frame")? else { + return Ok(()); + }; + msg + }; + for client in &mut self.clients.values_mut() { + client.handle_outgoing(&msg)?; + } + Ok(()) + } + fn handle_new_client(&mut self) -> Id { let id = self.next_client_id; self.next_client_id += 1; let client = Client { id, - inbox: Default::default(), outbox: Default::default(), sequence: 0, }; @@ -143,22 +134,24 @@ impl Server { self.clients.insert(id, client); id } -} -struct ChatLine { - id: Id, - line: String, + fn poll_send(&mut self, id: Id) -> Result> { + let client = self + .clients + .get_mut(&id) + .context("Logic error: Stream has no associated client")?; + Ok(client.poll_send()) + } } struct Client { id: u64, - inbox: VecDeque, outbox: VecDeque, sequence: u64, } impl Client { - fn handle_frame(&mut self, frame: Bytes) -> Result<()> { + fn handle_frame(&mut self, frame: Bytes) -> Result> { match rmp_serde::from_slice(&frame)? { ToServer::ChatLine { line, sequence } => { if sequence != self.sequence { @@ -170,10 +163,9 @@ impl Client { bail!("Sequence mismatch"); } self.sequence += 1; - self.inbox.push_back(ChatLine { id: self.id, line }); + Ok(Some(ToClient::ChatLine { id: self.id, line })) } } - Ok(()) } fn handle_outgoing(&mut self, msg: &ToClient) -> Result<()> { @@ -185,10 +177,6 @@ impl Client { Ok(()) } - fn poll_inbox(&mut self) -> Option { - self.inbox.pop_front() - } - fn poll_send(&mut self) -> Option { self.outbox.pop_front() }