From e3c23a0931731a09e490ef661ae3d003d2e8f699 Mon Sep 17 00:00:00 2001
From: _ <_@_>
Date: Sun, 23 Feb 2025 21:29:31 -0600
Subject: [PATCH] refactor

---
 src/prelude.rs |  2 +-
 src/server.rs  | 98 ++++++++++++++++++++++----------------------------
 2 files changed, 44 insertions(+), 56 deletions(-)

diff --git a/src/prelude.rs b/src/prelude.rs
index 94112b8..4be8dc7 100644
--- a/src/prelude.rs
+++ b/src/prelude.rs
@@ -1,5 +1,5 @@
 pub(crate) use crate::messages::{ToClient, ToServer};
-pub use anyhow::{Context as _, Result, anyhow, bail};
+pub use anyhow::{Context as _, Result, bail};
 pub use bytes::Bytes;
 pub use futures_core::stream::Stream;
 pub use futures_sink::Sink;
diff --git a/src/server.rs b/src/server.rs
index f6e24e3..65ecf78 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -48,16 +48,12 @@ impl App {
                 <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx)
             {
                 result.context("Can't check network write half for readiness")?;
-                let client = self
-                    .server
-                    .clients
-                    .get_mut(id)
-                    .context("Logic error: Stream has no associated client")?;
-                if let Some(frame) = client.poll_send() {
+                if let Some(frame) = self.server.poll_send(*id)? {
                     stream.as_mut().start_send(frame).context("start_send")?;
-                    match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) {
-                        Poll::Pending => {}
-                        Poll::Ready(result) => result.context("poll_flush")?,
+                    if let Poll::Ready(result) =
+                        <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx)
+                    {
+                        result.context("poll_flush")?;
                     }
                     tracing::debug!("Started send");
                 }
@@ -70,14 +66,7 @@ impl App {
                 Poll::Ready(Some(result)) => {
                     let frame = result.context("stream.poll_next")?;
                     cx.waker().wake_by_ref();
-                    let client = self
-                        .server
-                        .clients
-                        .get_mut(id)
-                        .context("Logic error: Stream has no associated client")?;
-                    client
-                        .handle_frame(frame.into())
-                        .context("client.handle_frame")?
+                    self.server.handle_client_frame(*id, frame.into())?;
                 }
             }
         }
@@ -86,33 +75,15 @@ impl App {
         for id in clients_to_remove {
             tracing::info!(?id, "Closing client");
             self.client_streams.remove(&id);
-            self.server.clients.remove(&id);
+            self.server.handle_client_disconnected(id);
         }
 
-        // Broadcast chat lines across all clients
-
-        let mut new_messages = vec![];
-        for client in &mut self.server.clients.values_mut() {
-            if let Some(ChatLine { id, line }) = client.poll_inbox() {
-                new_messages.push(ToClient::ChatLine { id, line });
-            }
-        }
-
-        for client in &mut self.server.clients.values_mut() {
-            for msg in &new_messages {
-                client.handle_outgoing(msg)?;
-            }
-        }
-
-        match self.listener.poll_accept(cx) {
-            Poll::Pending => {}
-            Poll::Ready(result) => {
-                let (stream, _addr) = result.context("listener.poll_accept")?;
-                cx.waker().wake_by_ref();
-                let stream = Framed::new(stream, LengthDelimitedCodec::new());
-                let id = self.server.handle_new_client();
-                self.client_streams.insert(id, stream);
-            }
+        if let Poll::Ready(result) = self.listener.poll_accept(cx) {
+            let (stream, _addr) = result.context("listener.poll_accept")?;
+            cx.waker().wake_by_ref();
+            let stream = Framed::new(stream, LengthDelimitedCodec::new());
+            let id = self.server.handle_new_client();
+            self.client_streams.insert(id, stream);
         }
 
         Ok(())
@@ -126,12 +97,32 @@ struct Server {
 }
 
 impl Server {
+    fn handle_client_disconnected(&mut self, id: Id) {
+        self.clients.remove(&id);
+    }
+
+    fn handle_client_frame(&mut self, id: Id, frame: Bytes) -> Result<()> {
+        let msg = {
+            let client = self
+                .clients
+                .get_mut(&id)
+                .context("Logic error: Stream has no associated client")?;
+            let Some(msg) = client.handle_frame(frame).context("client.handle_frame")? else {
+                return Ok(());
+            };
+            msg
+        };
+        for client in &mut self.clients.values_mut() {
+            client.handle_outgoing(&msg)?;
+        }
+        Ok(())
+    }
+
     fn handle_new_client(&mut self) -> Id {
         let id = self.next_client_id;
         self.next_client_id += 1;
         let client = Client {
             id,
-            inbox: Default::default(),
             outbox: Default::default(),
             sequence: 0,
         };
@@ -143,22 +134,24 @@ impl Server {
         self.clients.insert(id, client);
         id
     }
-}
 
-struct ChatLine {
-    id: Id,
-    line: String,
+    fn poll_send(&mut self, id: Id) -> Result<Option<Bytes>> {
+        let client = self
+            .clients
+            .get_mut(&id)
+            .context("Logic error: Stream has no associated client")?;
+        Ok(client.poll_send())
+    }
 }
 
 struct Client {
     id: u64,
-    inbox: VecDeque<ChatLine>,
     outbox: VecDeque<Bytes>,
     sequence: u64,
 }
 
 impl Client {
-    fn handle_frame(&mut self, frame: Bytes) -> Result<()> {
+    fn handle_frame(&mut self, frame: Bytes) -> Result<Option<ToClient>> {
         match rmp_serde::from_slice(&frame)? {
             ToServer::ChatLine { line, sequence } => {
                 if sequence != self.sequence {
@@ -170,10 +163,9 @@ impl Client {
                     bail!("Sequence mismatch");
                 }
                 self.sequence += 1;
-                self.inbox.push_back(ChatLine { id: self.id, line });
+                Ok(Some(ToClient::ChatLine { id: self.id, line }))
             }
         }
-        Ok(())
     }
 
     fn handle_outgoing(&mut self, msg: &ToClient) -> Result<()> {
@@ -185,10 +177,6 @@ impl Client {
         Ok(())
     }
 
-    fn poll_inbox(&mut self) -> Option<ChatLine> {
-        self.inbox.pop_front()
-    }
-
     fn poll_send(&mut self) -> Option<Bytes> {
         self.outbox.pop_front()
     }