diff --git a/src/client.rs b/src/client.rs index ffe1938..70865ab 100644 --- a/src/client.rs +++ b/src/client.rs @@ -15,8 +15,7 @@ impl Default for Args { } pub(crate) struct App { - outbox: VecDeque, - sequence: u64, + client: Client, stream: Framed, timer: Interval, } @@ -28,69 +27,91 @@ impl App { let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into())); timer.set_missed_tick_behavior(MissedTickBehavior::Skip); Ok(Self { - outbox: Default::default(), - sequence: 0, + client: Default::default(), stream, timer, }) } pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll> { - { - let mut stream = pin!(&mut self.stream); - match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { - Poll::Pending => {} - Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), - Poll::Ready(Ok(())) => {} + let mut stream = pin!(&mut self.stream); + match <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { + Poll::Pending => {} + Poll::Ready(Err(err)) => { + return Poll::Ready( + Err(err).context("Can't check network write half for readiness"), + ); } - } - - if !self.outbox.is_empty() { - let mut stream = pin!(&mut self.stream); - match <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { - Poll::Pending => {} - Poll::Ready(Err(err)) => { - return Poll::Ready( - Err(err).context("Can't check network write half for readiness"), - ); - } - Poll::Ready(Ok(())) => { - cx.waker().wake_by_ref(); - let frame = rmp_serde::to_vec(&self.outbox.pop_front())?; - if let Err(err) = stream.start_send(Bytes::from(frame)) { - return Poll::Ready(Err(err).context("start_send")); + Poll::Ready(Ok(())) => { + if let Some(frame) = self.client.poll_send() { + if let Err(err) = stream.as_mut().start_send(frame) { + return Poll::Ready(Err(err).context("stream.start_send")); } tracing::debug!("Started send"); } + match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { + Poll::Pending => {} + Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), + Poll::Ready(Ok(())) => {} + } } } - let stream = pin!(&mut self.stream); - match stream.poll_next(cx) { + match stream.as_mut().poll_next(cx) { Poll::Pending => {} Poll::Ready(None) => return Poll::Ready(Err(anyhow!("Server closed cxn"))), Poll::Ready(Some(frame)) => { cx.waker().wake_by_ref(); let frame = frame.context("network framing decode")?; - match rmp_serde::from_slice(&frame)? { - ToClient::ChatLine { id, line } => { - tracing::info!(?id, ?line); - } + if let Err(err) = self.client.handle_frame(frame.into()) { + return Poll::Ready(Err(err).context("client.handle_frame")); } } } if self.timer.poll_tick(cx).is_ready() { cx.waker().wake_by_ref(); - if self.outbox.is_empty() { - self.outbox.push_back(ToServer::ChatLine { - line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(), - sequence: self.sequence, - }); - self.sequence += 1; + if let Err(err) = self.client.handle_timeout() { + return Poll::Ready(Err(err).context("client.handle_timeout")); } } Poll::Pending } } + +#[derive(Default)] +struct Client { + outbox: VecDeque, + sequence: u64, +} + +impl Client { + fn handle_frame(&self, frame: Bytes) -> Result<()> { + match rmp_serde::from_slice(&frame)? { + ToClient::ChatLine { id, line } => { + tracing::info!(?id, ?line); + } + } + Ok(()) + } + + fn handle_timeout(&mut self) -> Result<()> { + if !self.outbox.is_empty() { + bail!("Dropped message, outbox is full"); + } + + let msg = ToServer::ChatLine { + line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(), + sequence: self.sequence, + }; + let frame = rmp_serde::to_vec(&msg)?; + self.outbox.push_back(frame.into()); + self.sequence += 1; + Ok(()) + } + + fn poll_send(&mut self) -> Option { + self.outbox.pop_front() + } +} diff --git a/src/prelude.rs b/src/prelude.rs index 88d0ab1..381abf9 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -4,14 +4,14 @@ pub use bytes::Bytes; pub use futures_core::stream::Stream; pub use futures_sink::Sink; pub use std::{ - collections::VecDeque, + collections::{BTreeMap, VecDeque}, future::poll_fn, ops::ControlFlow, pin::pin, rc::Rc, str::FromStr, task::{Context, Poll}, - time::Duration, + time::{Duration, Instant}, }; pub use tokio::{ net::{TcpListener, TcpStream},