clean up the code

main
_ 2025-02-23 19:18:20 -06:00
parent 4548f9daee
commit fd93df42e1
2 changed files with 61 additions and 40 deletions

View File

@ -15,8 +15,7 @@ impl Default for Args {
}
pub(crate) struct App {
outbox: VecDeque<ToServer>,
sequence: u64,
client: Client,
stream: Framed<TcpStream, LengthDelimitedCodec>,
timer: Interval,
}
@ -28,24 +27,13 @@ impl App {
let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into()));
timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
Ok(Self {
outbox: Default::default(),
sequence: 0,
client: Default::default(),
stream,
timer,
})
}
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
{
let mut stream = pin!(&mut self.stream);
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) {
Poll::Pending => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")),
Poll::Ready(Ok(())) => {}
}
}
if !self.outbox.is_empty() {
let mut stream = pin!(&mut self.stream);
match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) {
Poll::Pending => {}
@ -55,42 +43,75 @@ impl App {
);
}
Poll::Ready(Ok(())) => {
cx.waker().wake_by_ref();
let frame = rmp_serde::to_vec(&self.outbox.pop_front())?;
if let Err(err) = stream.start_send(Bytes::from(frame)) {
return Poll::Ready(Err(err).context("start_send"));
if let Some(frame) = self.client.poll_send() {
if let Err(err) = stream.as_mut().start_send(frame) {
return Poll::Ready(Err(err).context("stream.start_send"));
}
tracing::debug!("Started send");
}
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) {
Poll::Pending => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")),
Poll::Ready(Ok(())) => {}
}
}
}
let stream = pin!(&mut self.stream);
match stream.poll_next(cx) {
match stream.as_mut().poll_next(cx) {
Poll::Pending => {}
Poll::Ready(None) => return Poll::Ready(Err(anyhow!("Server closed cxn"))),
Poll::Ready(Some(frame)) => {
cx.waker().wake_by_ref();
let frame = frame.context("network framing decode")?;
match rmp_serde::from_slice(&frame)? {
ToClient::ChatLine { id, line } => {
tracing::info!(?id, ?line);
}
if let Err(err) = self.client.handle_frame(frame.into()) {
return Poll::Ready(Err(err).context("client.handle_frame"));
}
}
}
if self.timer.poll_tick(cx).is_ready() {
cx.waker().wake_by_ref();
if self.outbox.is_empty() {
self.outbox.push_back(ToServer::ChatLine {
line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(),
sequence: self.sequence,
});
self.sequence += 1;
if let Err(err) = self.client.handle_timeout() {
return Poll::Ready(Err(err).context("client.handle_timeout"));
}
}
Poll::Pending
}
}
#[derive(Default)]
struct Client {
outbox: VecDeque<Bytes>,
sequence: u64,
}
impl Client {
fn handle_frame(&self, frame: Bytes) -> Result<()> {
match rmp_serde::from_slice(&frame)? {
ToClient::ChatLine { id, line } => {
tracing::info!(?id, ?line);
}
}
Ok(())
}
fn handle_timeout(&mut self) -> Result<()> {
if !self.outbox.is_empty() {
bail!("Dropped message, outbox is full");
}
let msg = ToServer::ChatLine {
line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(),
sequence: self.sequence,
};
let frame = rmp_serde::to_vec(&msg)?;
self.outbox.push_back(frame.into());
self.sequence += 1;
Ok(())
}
fn poll_send(&mut self) -> Option<Bytes> {
self.outbox.pop_front()
}
}

View File

@ -4,14 +4,14 @@ pub use bytes::Bytes;
pub use futures_core::stream::Stream;
pub use futures_sink::Sink;
pub use std::{
collections::VecDeque,
collections::{BTreeMap, VecDeque},
future::poll_fn,
ops::ControlFlow,
pin::pin,
rc::Rc,
str::FromStr,
task::{Context, Poll},
time::Duration,
time::{Duration, Instant},
};
pub use tokio::{
net::{TcpListener, TcpStream},