use crate::prelude::*; type Id = u64; pub(crate) struct Args { pub(crate) port: u16, } impl Default for Args { fn default() -> Self { Self { port: 9000 } } } pub(crate) struct App { clients: BTreeMap<Id, Client>, listener: TcpListener, next_client_id: Id, } impl App { pub(crate) async fn new(args: Args) -> Result<Self> { let listener = TcpListener::bind(("0.0.0.0", args.port)).await?; Ok(Self { clients: Default::default(), listener, next_client_id: 1, }) } pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { let previous_client_count = self.clients.len(); let mut new_messages = vec![]; let mut clients_to_delete = vec![]; for (id, client) in &mut self.clients { let id = *id; { let mut stream = pin!(&mut client.stream); match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) { Poll::Pending => {} Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), Poll::Ready(Ok(())) => {} } } if !client.outbox.is_empty() { let mut stream = pin!(&mut client.stream); match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) { Poll::Pending => {} Poll::Ready(Err(err)) => { return Poll::Ready( Err(err).context("Can't check network write half for readiness"), ); } Poll::Ready(Ok(())) => { cx.waker().wake_by_ref(); let Some(msg) = client.outbox.pop_front() else { return Poll::Ready(Err(anyhow!( "Can't pop from outbox even though we just checked it was not empty" ))); }; let frame = rmp_serde::to_vec(msg.as_ref())?; if let Err(err) = stream.start_send(Bytes::from(frame)) { return Poll::Ready(Err(err).context("start_send")); } tracing::debug!("Started send"); } } } match client.poll_next(cx) { Poll::Pending => {} Poll::Ready(Err(err)) => { tracing::error!(?err, "client.poll_next error"); } Poll::Ready(Ok(None)) => clients_to_delete.push(id), Poll::Ready(Ok(Some(ToServer::ChatLine { line, sequence }))) => { tracing::debug!(id = id, ?line); if sequence != client.sequence { tracing::error!( expected = client.sequence, actual = sequence, "Sequence mismatch" ); return Poll::Ready(Err(anyhow!("Sequence mismatch"))); } tracing::info!(?sequence, id = id); client.sequence += 1; new_messages.push(Rc::new(ToClient::ChatLine { id, line })); } } } for id in clients_to_delete { tracing::info!(?id, "Closing client"); self.clients.remove(&id); } for client in &mut self.clients.values_mut() { for msg in &new_messages { client.outbox.push_back(Rc::clone(msg)); } } match self.listener.poll_accept(cx) { Poll::Pending => {} Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), Poll::Ready(Ok((stream, _addr))) => { cx.waker().wake_by_ref(); let stream = Framed::new(stream, LengthDelimitedCodec::new()); let client = Client { id: self.next_client_id, outbox: Default::default(), sequence: 0, stream, }; self.next_client_id += 1; tracing::info!(id = client.id, "Accepted client"); self.clients.insert(client.id, client); } } if self.clients.len() != previous_client_count { tracing::info!(client_count = self.clients.len()); } Poll::Pending } } struct Client { id: u64, outbox: VecDeque<Rc<ToClient>>, sequence: u64, stream: Framed<TcpStream, LengthDelimitedCodec>, } impl Client { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<ToServer>>> { let stream = pin!(&mut self.stream); let Poll::Ready(frame_opt) = stream.poll_next(cx) else { return Poll::Pending; }; let Some(frame) = frame_opt else { return Poll::Ready(Ok(None)); }; cx.waker().wake_by_ref(); tracing::debug!("Got network data"); let frame = match frame { Err(err) => return Poll::Ready(Err(err).context("Network framing decode error")), Ok(x) => x, }; let msg = match rmp_serde::from_slice(&frame) { Err(err) => return Poll::Ready(Err(err).context("MsgPack decode error")), Ok(x) => x, }; Poll::Ready(Ok(Some(msg))) } }