use crate::prelude::*; type Id = u64; pub(crate) struct Args { pub(crate) port: u16, } impl Default for Args { fn default() -> Self { Self { port: 9000 } } } pub(crate) struct App { client_streams: BTreeMap>, listener: TcpListener, server: Server, } impl App { pub(crate) async fn new(args: Args) -> Result { let listener = TcpListener::bind(("0.0.0.0", args.port)).await?; Ok(Self { client_streams: Default::default(), listener, server: Default::default(), }) } pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll> { match self.step(cx) { Ok(()) => Poll::Pending, Err(err) => Poll::Ready(Err(err)), } } fn step(&mut self, cx: &mut Context<'_>) -> Result<()> { let mut clients_to_remove = vec![]; // Pump the network streams for (id, stream) in &mut self.client_streams { let mut stream = pin!(stream); // Try to send from the client's outbox if let Poll::Ready(result) = <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { result.context("Can't check network write half for readiness")?; let client = self .server .clients .get_mut(id) .context("Logic error: Stream has no associated client")?; if let Some(frame) = client.poll_send() { stream.as_mut().start_send(frame).context("start_send")?; match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { Poll::Pending => {} Poll::Ready(result) => result.context("poll_flush")?, } tracing::debug!("Started send"); } } // Try to read into the client's inbox match stream.as_mut().poll_next(cx) { Poll::Pending => {} Poll::Ready(None) => clients_to_remove.push(*id), Poll::Ready(Some(result)) => { let frame = result.context("stream.poll_next")?; cx.waker().wake_by_ref(); let client = self .server .clients .get_mut(id) .context("Logic error: Stream has no associated client")?; client .handle_frame(frame.into()) .context("client.handle_frame")? } } } // Close out disconnected clients for id in clients_to_remove { tracing::info!(?id, "Closing client"); self.client_streams.remove(&id); self.server.clients.remove(&id); } // Broadcast chat lines across all clients let mut new_messages = vec![]; for client in &mut self.server.clients.values_mut() { if let Some(ChatLine { id, line }) = client.poll_inbox() { new_messages.push(ToClient::ChatLine { id, line }); } } for client in &mut self.server.clients.values_mut() { for msg in &new_messages { client.handle_outgoing(msg)?; } } match self.listener.poll_accept(cx) { Poll::Pending => {} Poll::Ready(result) => { let (stream, _addr) = result.context("listener.poll_accept")?; cx.waker().wake_by_ref(); let stream = Framed::new(stream, LengthDelimitedCodec::new()); let id = self.server.handle_new_client(); self.client_streams.insert(id, stream); } } Ok(()) } } #[derive(Default)] struct Server { clients: BTreeMap, next_client_id: Id, } impl Server { fn handle_new_client(&mut self) -> Id { let id = self.next_client_id; self.next_client_id += 1; let client = Client { id, inbox: Default::default(), outbox: Default::default(), sequence: 0, }; tracing::info!( id = client.id, total = self.clients.len(), "Accepted client" ); self.clients.insert(id, client); id } } struct ChatLine { id: Id, line: String, } struct Client { id: u64, inbox: VecDeque, outbox: VecDeque, sequence: u64, } impl Client { fn handle_frame(&mut self, frame: Bytes) -> Result<()> { match rmp_serde::from_slice(&frame)? { ToServer::ChatLine { line, sequence } => { if sequence != self.sequence { tracing::error!( expected = self.sequence, actual = sequence, "Sequence mismatch" ); bail!("Sequence mismatch"); } self.sequence += 1; self.inbox.push_back(ChatLine { id: self.id, line }); } } Ok(()) } fn handle_outgoing(&mut self, msg: &ToClient) -> Result<()> { if !self.outbox.is_empty() { bail!("Outbox full"); } let bytes = rmp_serde::to_vec(msg)?.into(); self.outbox.push_back(bytes); Ok(()) } fn poll_inbox(&mut self) -> Option { self.inbox.pop_front() } fn poll_send(&mut self) -> Option { self.outbox.pop_front() } }