use crate::prelude::*; pub(crate) struct App { clients: Vec, listener: TcpListener, next_client_id: u64, } struct Client { closed: bool, id: u64, outbox: VecDeque>, stream: Framed, } impl App { pub(crate) async fn new() -> Result { let listener = TcpListener::bind("0.0.0.0:9000").await?; Ok(Self { clients: Default::default(), listener, next_client_id: 1, }) } pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll> { let previous_client_count = self.clients.len(); let mut new_messages = vec![]; for client in &mut self.clients { { let mut stream = pin!(&mut client.stream); match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { Poll::Pending => {} Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), Poll::Ready(Ok(())) => {} } } if ! client.outbox.is_empty() { let mut stream = pin!(&mut client.stream); match <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { Poll::Pending => {} Poll::Ready(Err(err)) => { return Poll::Ready(Err(err).context("Can't check network write half for readiness")); } Poll::Ready(Ok(())) => { cx.waker().wake_by_ref(); let Some(msg) = client.outbox.pop_front() else { return Poll::Ready(Err(anyhow!("Can't pop from outbox even though we just checked it was not empty"))); }; let frame = rmp_serde::to_vec(msg.as_ref())?; if let Err(err) = stream.start_send(Bytes::from(frame)) { return Poll::Ready(Err(err).context("start_send")); } tracing::debug!("Started send"); } } } let stream = pin!(&mut client.stream); match stream.poll_next(cx) { Poll::Pending => {} Poll::Ready(None) => { cx.waker().wake_by_ref(); client.closed = true; } Poll::Ready(Some(frame)) => { tracing::info!("Got network data"); cx.waker().wake_by_ref(); let frame = frame.context("network framing decode")?; let msg: ToServer = rmp_serde::from_slice(&frame)?; match msg { ToServer::ChatLine { line } => { tracing::info!(id = client.id, ?line); new_messages.push(Rc::new(ToClient::ChatLine { id: client.id, line, })); } } } } } self.clients.retain(|client| { if client.closed { tracing::info!(id = client.id, "Closing client"); } !client.closed }); for client in &mut self.clients { for msg in &new_messages { client.outbox.push_back(Rc::clone(msg)); } } match self.listener.poll_accept(cx) { Poll::Pending => {} Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), Poll::Ready(Ok((stream, _addr))) => { cx.waker().wake_by_ref(); let stream = Framed::new(stream, BytesCodec::new()); let client = Client { closed: false, id: self.next_client_id, outbox: Default::default(), stream, }; self.next_client_id += 1; tracing::info!(id = client.id, "Accepted client"); self.clients.push(client); } } if self.clients.len() != previous_client_count { tracing::info!(client_count = self.clients.len()); } Poll::Pending } }