Compare commits

...

8 Commits

Author SHA1 Message Date
_ c4de3ad00d add connect and disconnect messages 2025-02-23 21:36:43 -06:00
_ e3c23a0931 refactor 2025-02-23 21:29:31 -06:00
_ 2606946240 refactor 2025-02-23 21:11:08 -06:00
_ 7119603145 refactor 2025-02-23 20:29:05 -06:00
_ a2b286b01d refactor 2025-02-23 19:43:08 -06:00
_ 425503d66d refactor 2025-02-23 19:30:26 -06:00
_ 4d018413f8 refactor 2025-02-23 19:28:28 -06:00
_ fd93df42e1 clean up the code 2025-02-23 19:18:20 -06:00
4 changed files with 213 additions and 164 deletions

View File

@ -15,8 +15,7 @@ impl Default for Args {
} }
pub(crate) struct App { pub(crate) struct App {
outbox: VecDeque<ToServer>, client: Client,
sequence: u64,
stream: Framed<TcpStream, LengthDelimitedCodec>, stream: Framed<TcpStream, LengthDelimitedCodec>,
timer: Interval, timer: Interval,
} }
@ -28,69 +27,92 @@ impl App {
let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into())); let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into()));
timer.set_missed_tick_behavior(MissedTickBehavior::Skip); timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
Ok(Self { Ok(Self {
outbox: Default::default(), client: Default::default(),
sequence: 0,
stream, stream,
timer, timer,
}) })
} }
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
{ match self.step(cx) {
let mut stream = pin!(&mut self.stream); Ok(()) => Poll::Pending,
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) { Err(err) => Poll::Ready(Err(err)),
Poll::Pending => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")),
Poll::Ready(Ok(())) => {}
} }
} }
if !self.outbox.is_empty() { fn step(&mut self, cx: &mut Context<'_>) -> Result<()> {
let mut stream = pin!(&mut self.stream); let mut stream = pin!(&mut self.stream);
match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) { if let Poll::Ready(result) =
Poll::Pending => {} <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx)
Poll::Ready(Err(err)) => { {
return Poll::Ready( result?;
Err(err).context("Can't check network write half for readiness"), if let Some(frame) = self.client.poll_send() {
); stream
} .as_mut()
Poll::Ready(Ok(())) => { .start_send(frame)
cx.waker().wake_by_ref(); .context("stream.start_send")?;
let frame = rmp_serde::to_vec(&self.outbox.pop_front())?;
if let Err(err) = stream.start_send(Bytes::from(frame)) {
return Poll::Ready(Err(err).context("start_send"));
}
tracing::debug!("Started send"); tracing::debug!("Started send");
} }
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) {
Poll::Pending => {}
Poll::Ready(result) => {
result.context("poll_flush")?;
}
} }
} }
let stream = pin!(&mut self.stream); if let Poll::Ready(frame_opt) = stream.as_mut().poll_next(cx) {
match stream.poll_next(cx) { let frame = frame_opt.context("Server closed cxn")?;
Poll::Pending => {}
Poll::Ready(None) => return Poll::Ready(Err(anyhow!("Server closed cxn"))),
Poll::Ready(Some(frame)) => {
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
let frame = frame.context("network framing decode")?; let frame = frame.context("network framing decode")?;
match rmp_serde::from_slice(&frame)? { self.client
ToClient::ChatLine { id, line } => { .handle_frame(frame.into())
tracing::info!(?id, ?line); .context("client.handle_frame")?;
}
}
}
} }
if self.timer.poll_tick(cx).is_ready() { if self.timer.poll_tick(cx).is_ready() {
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
if self.outbox.is_empty() { self.client
self.outbox.push_back(ToServer::ChatLine { .handle_timeout()
line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(), .context("client.handle_timeout")?;
sequence: self.sequence,
});
self.sequence += 1;
}
} }
Poll::Pending Ok(())
}
}
#[derive(Default)]
struct Client {
outbox: VecDeque<Bytes>,
sequence: u64,
}
impl Client {
fn handle_frame(&self, frame: Bytes) -> Result<()> {
match rmp_serde::from_slice(&frame)? {
ToClient::ChatLine { id, line } => tracing::info!(?id, ?line),
ToClient::ClientConnected { id } => tracing::info!(?id, "Connected"),
ToClient::ClientDisconnected { id } => tracing::info!(?id, "Disconnected"),
}
Ok(())
}
fn handle_timeout(&mut self) -> Result<()> {
if !self.outbox.is_empty() {
bail!("Dropped message, outbox is full");
}
let msg = ToServer::ChatLine {
line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(),
sequence: self.sequence,
};
let frame = rmp_serde::to_vec(&msg)?;
self.outbox.push_back(frame.into());
self.sequence += 1;
Ok(())
}
fn poll_send(&mut self) -> Option<Bytes> {
self.outbox.pop_front()
} }
} }

View File

@ -1,8 +1,11 @@
use crate::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
pub(crate) enum ToClient { pub(crate) enum ToClient {
ChatLine { id: u64, line: String }, ChatLine { id: Id, line: String },
ClientConnected { id: Id },
ClientDisconnected { id: Id },
} }
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]

View File

@ -1,14 +1,13 @@
pub(crate) use crate::messages::{ToClient, ToServer}; pub(crate) use crate::messages::{ToClient, ToServer};
pub use anyhow::{Context as _, Result, anyhow, bail}; pub use anyhow::{Context as _, Result, bail};
pub use bytes::Bytes; pub use bytes::Bytes;
pub use futures_core::stream::Stream; pub use futures_core::stream::Stream;
pub use futures_sink::Sink; pub use futures_sink::Sink;
pub use std::{ pub use std::{
collections::VecDeque, collections::{BTreeMap, VecDeque},
future::poll_fn, future::poll_fn,
ops::ControlFlow, ops::ControlFlow,
pin::pin, pin::pin,
rc::Rc,
str::FromStr, str::FromStr,
task::{Context, Poll}, task::{Context, Poll},
time::Duration, time::Duration,
@ -20,3 +19,5 @@ pub use tokio::{
}; };
// Don't use BytesCodec, it is _nonsense_ // Don't use BytesCodec, it is _nonsense_
pub use tokio_util::codec::{Framed, LengthDelimitedCodec}; pub use tokio_util::codec::{Framed, LengthDelimitedCodec};
pub type Id = u64;

View File

@ -11,9 +11,9 @@ impl Default for Args {
} }
pub(crate) struct App { pub(crate) struct App {
clients: Vec<Client>, client_streams: BTreeMap<Id, Framed<TcpStream, LengthDelimitedCodec>>,
listener: TcpListener, listener: TcpListener,
next_client_id: u64, server: Server,
} }
impl App { impl App {
@ -21,147 +21,170 @@ impl App {
let listener = TcpListener::bind(("0.0.0.0", args.port)).await?; let listener = TcpListener::bind(("0.0.0.0", args.port)).await?;
Ok(Self { Ok(Self {
clients: Default::default(), client_streams: Default::default(),
listener, listener,
next_client_id: 1, server: Default::default(),
}) })
} }
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
let previous_client_count = self.clients.len(); match self.step(cx) {
Ok(()) => Poll::Pending,
Err(err) => Poll::Ready(Err(err)),
}
}
let mut new_messages = vec![]; fn step(&mut self, cx: &mut Context<'_>) -> Result<()> {
for client in &mut self.clients { let mut clients_to_remove = vec![];
// Pump the network streams
for (id, stream) in &mut self.client_streams {
let mut stream = pin!(stream);
// Try to send from the client's outbox
if let Poll::Ready(result) =
<_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx)
{ {
let mut stream = pin!(&mut client.stream); result.context("Can't check network write half for readiness")?;
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) { if let Some(frame) = self.server.poll_send(*id)? {
Poll::Pending => {} stream.as_mut().start_send(frame).context("start_send")?;
Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), if let Poll::Ready(result) =
Poll::Ready(Ok(())) => {} <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx)
} {
} result.context("poll_flush")?;
if !client.outbox.is_empty() {
let mut stream = pin!(&mut client.stream);
match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) {
Poll::Pending => {}
Poll::Ready(Err(err)) => {
return Poll::Ready(
Err(err).context("Can't check network write half for readiness"),
);
}
Poll::Ready(Ok(())) => {
cx.waker().wake_by_ref();
let Some(msg) = client.outbox.pop_front() else {
return Poll::Ready(Err(anyhow!(
"Can't pop from outbox even though we just checked it was not empty"
)));
};
let frame = rmp_serde::to_vec(msg.as_ref())?;
if let Err(err) = stream.start_send(Bytes::from(frame)) {
return Poll::Ready(Err(err).context("start_send"));
} }
tracing::debug!("Started send"); tracing::debug!("Started send");
} }
} }
}
match client.poll_next(cx) { // Try to read data in
match stream.as_mut().poll_next(cx) {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Err(err)) => { Poll::Ready(None) => clients_to_remove.push(*id),
tracing::error!(?err, "client.poll_next error"); Poll::Ready(Some(result)) => {
} let frame = result.context("stream.poll_next")?;
Poll::Ready(Ok(None)) => client.closed = true, cx.waker().wake_by_ref();
Poll::Ready(Ok(Some(ToServer::ChatLine { line, sequence }))) => { self.server.handle_client_frame(*id, frame.into())?;
tracing::debug!(id = client.id, ?line);
if sequence != client.sequence {
tracing::error!(
expected = client.sequence,
actual = sequence,
"Sequence mismatch"
);
return Poll::Ready(Err(anyhow!("Sequence mismatch")));
}
tracing::info!(?sequence, id = client.id);
client.sequence += 1;
new_messages.push(Rc::new(ToClient::ChatLine {
id: client.id,
line,
}));
} }
} }
} }
self.clients.retain(|client| { // Close out disconnected clients
if client.closed { for id in clients_to_remove {
tracing::info!(id = client.id, "Closing client"); cx.waker().wake_by_ref();
} tracing::info!(?id, "Closing client");
!client.closed self.client_streams.remove(&id);
}); self.server.handle_client_disconnected(id)?;
for client in &mut self.clients {
for msg in &new_messages {
client.outbox.push_back(Rc::clone(msg));
}
} }
match self.listener.poll_accept(cx) { if let Poll::Ready(result) = self.listener.poll_accept(cx) {
Poll::Pending => {} let (stream, _addr) = result.context("listener.poll_accept")?;
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Ready(Ok((stream, _addr))) => {
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
let stream = Framed::new(stream, LengthDelimitedCodec::new()); let stream = Framed::new(stream, LengthDelimitedCodec::new());
let id = self.server.handle_new_client()?;
self.client_streams.insert(id, stream);
}
Ok(())
}
}
#[derive(Default)]
struct Server {
clients: BTreeMap<Id, Client>,
next_client_id: Id,
}
impl Server {
fn broadcast(&mut self, msg: &ToClient) -> Result<()> {
for client in &mut self.clients.values_mut() {
client.handle_outgoing(msg)?;
}
Ok(())
}
fn handle_client_disconnected(&mut self, id: Id) -> Result<()> {
self.clients.remove(&id);
self.broadcast(&ToClient::ClientDisconnected { id })?;
Ok(())
}
fn handle_client_frame(&mut self, id: Id, frame: Bytes) -> Result<()> {
let msg = {
let client = self
.clients
.get_mut(&id)
.context("Logic error: Stream has no associated client")?;
let Some(msg) = client.handle_frame(frame).context("client.handle_frame")? else {
return Ok(());
};
msg
};
self.broadcast(&msg)?;
Ok(())
}
fn handle_new_client(&mut self) -> Result<Id> {
let id = self.next_client_id;
self.next_client_id += 1;
let client = Client { let client = Client {
closed: false, id,
id: self.next_client_id,
outbox: Default::default(), outbox: Default::default(),
sequence: 0, sequence: 0,
stream,
}; };
self.next_client_id += 1; tracing::info!(
tracing::info!(id = client.id, "Accepted client"); id = client.id,
self.clients.push(client); total = self.clients.len(),
} "Accepted client"
);
self.clients.insert(id, client);
self.broadcast(&ToClient::ClientConnected { id })?;
Ok(id)
} }
if self.clients.len() != previous_client_count { fn poll_send(&mut self, id: Id) -> Result<Option<Bytes>> {
tracing::info!(client_count = self.clients.len()); let client = self
} .clients
.get_mut(&id)
Poll::Pending .context("Logic error: Stream has no associated client")?;
Ok(client.poll_send())
} }
} }
struct Client { struct Client {
closed: bool,
id: u64, id: u64,
outbox: VecDeque<Rc<ToClient>>, outbox: VecDeque<Bytes>,
sequence: u64, sequence: u64,
stream: Framed<TcpStream, LengthDelimitedCodec>,
} }
impl Client { impl Client {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<ToServer>>> { fn handle_frame(&mut self, frame: Bytes) -> Result<Option<ToClient>> {
let stream = pin!(&mut self.stream); match rmp_serde::from_slice(&frame)? {
let Poll::Ready(frame_opt) = stream.poll_next(cx) else { ToServer::ChatLine { line, sequence } => {
return Poll::Pending; if sequence != self.sequence {
}; tracing::error!(
expected = self.sequence,
actual = sequence,
"Sequence mismatch"
);
bail!("Sequence mismatch");
}
self.sequence += 1;
Ok(Some(ToClient::ChatLine { id: self.id, line }))
}
}
}
let Some(frame) = frame_opt else { fn handle_outgoing(&mut self, msg: &ToClient) -> Result<()> {
return Poll::Ready(Ok(None)); if !self.outbox.is_empty() {
}; bail!("Outbox full");
}
let bytes = rmp_serde::to_vec(msg)?.into();
self.outbox.push_back(bytes);
Ok(())
}
cx.waker().wake_by_ref(); fn poll_send(&mut self) -> Option<Bytes> {
tracing::debug!("Got network data"); self.outbox.pop_front()
let frame = match frame {
Err(err) => return Poll::Ready(Err(err).context("Network framing decode error")),
Ok(x) => x,
};
let msg = match rmp_serde::from_slice(&frame) {
Err(err) => return Poll::Ready(Err(err).context("MsgPack decode error")),
Ok(x) => x,
};
Poll::Ready(Ok(Some(msg)))
} }
} }