From 71196031450aabcb63544e2036511e4f1be1d8b8 Mon Sep 17 00:00:00 2001 From: _ <_@_> Date: Sun, 23 Feb 2025 20:29:05 -0600 Subject: [PATCH] refactor --- src/prelude.rs | 1 - src/server.rs | 151 +++++++++++++++++++++++++++---------------------- 2 files changed, 82 insertions(+), 70 deletions(-) diff --git a/src/prelude.rs b/src/prelude.rs index f38e111..94112b8 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -8,7 +8,6 @@ pub use std::{ future::poll_fn, ops::ControlFlow, pin::pin, - rc::Rc, str::FromStr, task::{Context, Poll}, time::Duration, diff --git a/src/server.rs b/src/server.rs index eac0dea..045301b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -13,6 +13,7 @@ impl Default for Args { } pub(crate) struct App { + client_streams: BTreeMap>, clients: BTreeMap, listener: TcpListener, next_client_id: Id, @@ -23,6 +24,7 @@ impl App { let listener = TcpListener::bind(("0.0.0.0", args.port)).await?; Ok(Self { + client_streams: Default::default(), clients: Default::default(), listener, next_client_id: 1, @@ -36,59 +38,52 @@ impl App { let mut clients_to_delete = vec![]; for (id, client) in &mut self.clients { let id = *id; - { - let mut stream = pin!(&mut client.stream); - match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { - Poll::Pending => {} - Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), - Poll::Ready(Ok(())) => {} - } - } + let stream = self + .client_streams + .get_mut(&id) + .context("Logic error: Client has no stream")?; + let mut stream = pin!(stream); - if !client.outbox.is_empty() { - let mut stream = pin!(&mut client.stream); - match <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { - Poll::Pending => {} - Poll::Ready(Err(err)) => { - return Poll::Ready( - Err(err).context("Can't check network write half for readiness"), - ); - } - Poll::Ready(Ok(())) => { - cx.waker().wake_by_ref(); - let Some(msg) = client.outbox.pop_front() else { - return Poll::Ready(Err(anyhow!( - "Can't pop from outbox even though we just checked it was not empty" - ))); - }; - let frame = rmp_serde::to_vec(msg.as_ref())?; - if let Err(err) = stream.start_send(Bytes::from(frame)) { + match <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { + Poll::Pending => {} + Poll::Ready(Err(err)) => { + return Poll::Ready( + Err(err).context("Can't check network write half for readiness"), + ); + } + Poll::Ready(Ok(())) => { + if let Some(frame) = client.poll_send() { + if let Err(err) = stream.as_mut().start_send(frame) { return Poll::Ready(Err(err).context("start_send")); } + match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { + Poll::Pending => {} + Poll::Ready(Err(err)) => { + return Poll::Ready(Err(err).context("poll_flush")); + } + Poll::Ready(Ok(())) => {} + } tracing::debug!("Started send"); } } } - match client.poll_next(cx) { + if let Some(ChatLine { id, line }) = client.poll_inbox() { + cx.waker().wake_by_ref(); + new_messages.push(ToClient::ChatLine { id, line }); + } + + match stream.as_mut().poll_next(cx) { Poll::Pending => {} - Poll::Ready(Err(err)) => { - tracing::error!(?err, "client.poll_next error"); + Poll::Ready(None) => clients_to_delete.push(id), + Poll::Ready(Some(Err(err))) => { + tracing::error!(?err, "stream.poll_next error"); } - Poll::Ready(Ok(None)) => clients_to_delete.push(id), - Poll::Ready(Ok(Some(ToServer::ChatLine { line, sequence }))) => { - tracing::debug!(id = id, ?line); - if sequence != client.sequence { - tracing::error!( - expected = client.sequence, - actual = sequence, - "Sequence mismatch" - ); - return Poll::Ready(Err(anyhow!("Sequence mismatch"))); + Poll::Ready(Some(Ok(frame))) => { + cx.waker().wake_by_ref(); + if let Err(err) = client.handle_frame(frame.into()) { + return Poll::Ready(Err(err).context("client.handle_frame")); } - tracing::info!(?sequence, id = id); - client.sequence += 1; - new_messages.push(Rc::new(ToClient::ChatLine { id, line })); } } } @@ -100,7 +95,7 @@ impl App { for client in &mut self.clients.values_mut() { for msg in &new_messages { - client.outbox.push_back(Rc::clone(msg)); + client.handle_outgoing(msg)?; } } @@ -109,16 +104,18 @@ impl App { Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), Poll::Ready(Ok((stream, _addr))) => { cx.waker().wake_by_ref(); - let stream = Framed::new(stream, LengthDelimitedCodec::new()); + let id = self.next_client_id; + self.next_client_id += 1; let client = Client { - id: self.next_client_id, + id, + inbox: Default::default(), outbox: Default::default(), sequence: 0, - stream, }; - self.next_client_id += 1; + let stream = Framed::new(stream, LengthDelimitedCodec::new()); tracing::info!(id = client.id, "Accepted client"); - self.clients.insert(client.id, client); + self.clients.insert(id, client); + self.client_streams.insert(id, stream); } } @@ -130,35 +127,51 @@ impl App { } } +struct ChatLine { + id: Id, + line: String, +} + struct Client { id: u64, - outbox: VecDeque>, + inbox: VecDeque, + outbox: VecDeque, sequence: u64, - stream: Framed, } impl Client { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let stream = pin!(&mut self.stream); - let Poll::Ready(frame_opt) = stream.poll_next(cx) else { - return Poll::Pending; - }; + fn handle_frame(&mut self, frame: Bytes) -> Result<()> { + match rmp_serde::from_slice(&frame)? { + ToServer::ChatLine { line, sequence } => { + if sequence != self.sequence { + tracing::error!( + expected = self.sequence, + actual = sequence, + "Sequence mismatch" + ); + bail!("Sequence mismatch"); + } + self.sequence += 1; + self.inbox.push_back(ChatLine { id: self.id, line }); + } + } + Ok(()) + } - let Some(frame) = frame_opt else { - return Poll::Ready(Ok(None)); - }; + fn handle_outgoing(&mut self, msg: &ToClient) -> Result<()> { + if !self.outbox.is_empty() { + bail!("Outbox full"); + } + let bytes = rmp_serde::to_vec(msg)?.into(); + self.outbox.push_back(bytes); + Ok(()) + } - cx.waker().wake_by_ref(); - tracing::debug!("Got network data"); + fn poll_inbox(&mut self) -> Option { + self.inbox.pop_front() + } - let frame = match frame { - Err(err) => return Poll::Ready(Err(err).context("Network framing decode error")), - Ok(x) => x, - }; - let msg = match rmp_serde::from_slice(&frame) { - Err(err) => return Poll::Ready(Err(err).context("MsgPack decode error")), - Ok(x) => x, - }; - Poll::Ready(Ok(Some(msg))) + fn poll_send(&mut self) -> Option { + self.outbox.pop_front() } }