diff --git a/src/server.rs b/src/server.rs index 045301b..f6e24e3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -14,9 +14,8 @@ impl Default for Args { pub(crate) struct App { client_streams: BTreeMap>, - clients: BTreeMap, listener: TcpListener, - next_client_id: Id, + server: Server, } impl App { @@ -25,75 +24,81 @@ impl App { Ok(Self { client_streams: Default::default(), - clients: Default::default(), listener, - next_client_id: 1, + server: Default::default(), }) } pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll> { - let previous_client_count = self.clients.len(); + match self.step(cx) { + Ok(()) => Poll::Pending, + Err(err) => Poll::Ready(Err(err)), + } + } - let mut new_messages = vec![]; - let mut clients_to_delete = vec![]; - for (id, client) in &mut self.clients { - let id = *id; - let stream = self - .client_streams - .get_mut(&id) - .context("Logic error: Client has no stream")?; + fn step(&mut self, cx: &mut Context<'_>) -> Result<()> { + let mut clients_to_remove = vec![]; + + // Pump the network streams + for (id, stream) in &mut self.client_streams { let mut stream = pin!(stream); - match <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { - Poll::Pending => {} - Poll::Ready(Err(err)) => { - return Poll::Ready( - Err(err).context("Can't check network write half for readiness"), - ); - } - Poll::Ready(Ok(())) => { - if let Some(frame) = client.poll_send() { - if let Err(err) = stream.as_mut().start_send(frame) { - return Poll::Ready(Err(err).context("start_send")); - } - match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { - Poll::Pending => {} - Poll::Ready(Err(err)) => { - return Poll::Ready(Err(err).context("poll_flush")); - } - Poll::Ready(Ok(())) => {} - } - tracing::debug!("Started send"); + // Try to send from the client's outbox + if let Poll::Ready(result) = + <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) + { + result.context("Can't check network write half for readiness")?; + let client = self + .server + .clients + .get_mut(id) + .context("Logic error: Stream has no associated client")?; + if let Some(frame) = client.poll_send() { + stream.as_mut().start_send(frame).context("start_send")?; + match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { + Poll::Pending => {} + Poll::Ready(result) => result.context("poll_flush")?, } + tracing::debug!("Started send"); } } - if let Some(ChatLine { id, line }) = client.poll_inbox() { - cx.waker().wake_by_ref(); - new_messages.push(ToClient::ChatLine { id, line }); - } - + // Try to read into the client's inbox match stream.as_mut().poll_next(cx) { Poll::Pending => {} - Poll::Ready(None) => clients_to_delete.push(id), - Poll::Ready(Some(Err(err))) => { - tracing::error!(?err, "stream.poll_next error"); - } - Poll::Ready(Some(Ok(frame))) => { + Poll::Ready(None) => clients_to_remove.push(*id), + Poll::Ready(Some(result)) => { + let frame = result.context("stream.poll_next")?; cx.waker().wake_by_ref(); - if let Err(err) = client.handle_frame(frame.into()) { - return Poll::Ready(Err(err).context("client.handle_frame")); - } + let client = self + .server + .clients + .get_mut(id) + .context("Logic error: Stream has no associated client")?; + client + .handle_frame(frame.into()) + .context("client.handle_frame")? } } } - for id in clients_to_delete { + // Close out disconnected clients + for id in clients_to_remove { tracing::info!(?id, "Closing client"); - self.clients.remove(&id); + self.client_streams.remove(&id); + self.server.clients.remove(&id); } - for client in &mut self.clients.values_mut() { + // Broadcast chat lines across all clients + + let mut new_messages = vec![]; + for client in &mut self.server.clients.values_mut() { + if let Some(ChatLine { id, line }) = client.poll_inbox() { + new_messages.push(ToClient::ChatLine { id, line }); + } + } + + for client in &mut self.server.clients.values_mut() { for msg in &new_messages { client.handle_outgoing(msg)?; } @@ -101,29 +106,42 @@ impl App { match self.listener.poll_accept(cx) { Poll::Pending => {} - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), - Poll::Ready(Ok((stream, _addr))) => { + Poll::Ready(result) => { + let (stream, _addr) = result.context("listener.poll_accept")?; cx.waker().wake_by_ref(); - let id = self.next_client_id; - self.next_client_id += 1; - let client = Client { - id, - inbox: Default::default(), - outbox: Default::default(), - sequence: 0, - }; let stream = Framed::new(stream, LengthDelimitedCodec::new()); - tracing::info!(id = client.id, "Accepted client"); - self.clients.insert(id, client); + let id = self.server.handle_new_client(); self.client_streams.insert(id, stream); } } - if self.clients.len() != previous_client_count { - tracing::info!(client_count = self.clients.len()); - } + Ok(()) + } +} - Poll::Pending +#[derive(Default)] +struct Server { + clients: BTreeMap, + next_client_id: Id, +} + +impl Server { + fn handle_new_client(&mut self) -> Id { + let id = self.next_client_id; + self.next_client_id += 1; + let client = Client { + id, + inbox: Default::default(), + outbox: Default::default(), + sequence: 0, + }; + tracing::info!( + id = client.id, + total = self.clients.len(), + "Accepted client" + ); + self.clients.insert(id, client); + id } }