main
_ 2025-02-23 20:29:05 -06:00
parent a2b286b01d
commit 7119603145
2 changed files with 82 additions and 70 deletions

View File

@ -8,7 +8,6 @@ pub use std::{
future::poll_fn, future::poll_fn,
ops::ControlFlow, ops::ControlFlow,
pin::pin, pin::pin,
rc::Rc,
str::FromStr, str::FromStr,
task::{Context, Poll}, task::{Context, Poll},
time::Duration, time::Duration,

View File

@ -13,6 +13,7 @@ impl Default for Args {
} }
pub(crate) struct App { pub(crate) struct App {
client_streams: BTreeMap<Id, Framed<TcpStream, LengthDelimitedCodec>>,
clients: BTreeMap<Id, Client>, clients: BTreeMap<Id, Client>,
listener: TcpListener, listener: TcpListener,
next_client_id: Id, next_client_id: Id,
@ -23,6 +24,7 @@ impl App {
let listener = TcpListener::bind(("0.0.0.0", args.port)).await?; let listener = TcpListener::bind(("0.0.0.0", args.port)).await?;
Ok(Self { Ok(Self {
client_streams: Default::default(),
clients: Default::default(), clients: Default::default(),
listener, listener,
next_client_id: 1, next_client_id: 1,
@ -36,17 +38,12 @@ impl App {
let mut clients_to_delete = vec![]; let mut clients_to_delete = vec![];
for (id, client) in &mut self.clients { for (id, client) in &mut self.clients {
let id = *id; let id = *id;
{ let stream = self
let mut stream = pin!(&mut client.stream); .client_streams
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) { .get_mut(&id)
Poll::Pending => {} .context("Logic error: Client has no stream")?;
Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), let mut stream = pin!(stream);
Poll::Ready(Ok(())) => {}
}
}
if !client.outbox.is_empty() {
let mut stream = pin!(&mut client.stream);
match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) { match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
@ -55,40 +52,38 @@ impl App {
); );
} }
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
cx.waker().wake_by_ref(); if let Some(frame) = client.poll_send() {
let Some(msg) = client.outbox.pop_front() else { if let Err(err) = stream.as_mut().start_send(frame) {
return Poll::Ready(Err(anyhow!(
"Can't pop from outbox even though we just checked it was not empty"
)));
};
let frame = rmp_serde::to_vec(msg.as_ref())?;
if let Err(err) = stream.start_send(Bytes::from(frame)) {
return Poll::Ready(Err(err).context("start_send")); return Poll::Ready(Err(err).context("start_send"));
} }
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) {
Poll::Pending => {}
Poll::Ready(Err(err)) => {
return Poll::Ready(Err(err).context("poll_flush"));
}
Poll::Ready(Ok(())) => {}
}
tracing::debug!("Started send"); tracing::debug!("Started send");
} }
} }
} }
match client.poll_next(cx) { if let Some(ChatLine { id, line }) = client.poll_inbox() {
cx.waker().wake_by_ref();
new_messages.push(ToClient::ChatLine { id, line });
}
match stream.as_mut().poll_next(cx) {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Err(err)) => { Poll::Ready(None) => clients_to_delete.push(id),
tracing::error!(?err, "client.poll_next error"); Poll::Ready(Some(Err(err))) => {
tracing::error!(?err, "stream.poll_next error");
} }
Poll::Ready(Ok(None)) => clients_to_delete.push(id), Poll::Ready(Some(Ok(frame))) => {
Poll::Ready(Ok(Some(ToServer::ChatLine { line, sequence }))) => { cx.waker().wake_by_ref();
tracing::debug!(id = id, ?line); if let Err(err) = client.handle_frame(frame.into()) {
if sequence != client.sequence { return Poll::Ready(Err(err).context("client.handle_frame"));
tracing::error!(
expected = client.sequence,
actual = sequence,
"Sequence mismatch"
);
return Poll::Ready(Err(anyhow!("Sequence mismatch")));
} }
tracing::info!(?sequence, id = id);
client.sequence += 1;
new_messages.push(Rc::new(ToClient::ChatLine { id, line }));
} }
} }
} }
@ -100,7 +95,7 @@ impl App {
for client in &mut self.clients.values_mut() { for client in &mut self.clients.values_mut() {
for msg in &new_messages { for msg in &new_messages {
client.outbox.push_back(Rc::clone(msg)); client.handle_outgoing(msg)?;
} }
} }
@ -109,16 +104,18 @@ impl App {
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Ready(Ok((stream, _addr))) => { Poll::Ready(Ok((stream, _addr))) => {
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
let stream = Framed::new(stream, LengthDelimitedCodec::new()); let id = self.next_client_id;
self.next_client_id += 1;
let client = Client { let client = Client {
id: self.next_client_id, id,
inbox: Default::default(),
outbox: Default::default(), outbox: Default::default(),
sequence: 0, sequence: 0,
stream,
}; };
self.next_client_id += 1; let stream = Framed::new(stream, LengthDelimitedCodec::new());
tracing::info!(id = client.id, "Accepted client"); tracing::info!(id = client.id, "Accepted client");
self.clients.insert(client.id, client); self.clients.insert(id, client);
self.client_streams.insert(id, stream);
} }
} }
@ -130,35 +127,51 @@ impl App {
} }
} }
struct ChatLine {
id: Id,
line: String,
}
struct Client { struct Client {
id: u64, id: u64,
outbox: VecDeque<Rc<ToClient>>, inbox: VecDeque<ChatLine>,
outbox: VecDeque<Bytes>,
sequence: u64, sequence: u64,
stream: Framed<TcpStream, LengthDelimitedCodec>,
} }
impl Client { impl Client {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<ToServer>>> { fn handle_frame(&mut self, frame: Bytes) -> Result<()> {
let stream = pin!(&mut self.stream); match rmp_serde::from_slice(&frame)? {
let Poll::Ready(frame_opt) = stream.poll_next(cx) else { ToServer::ChatLine { line, sequence } => {
return Poll::Pending; if sequence != self.sequence {
}; tracing::error!(
expected = self.sequence,
let Some(frame) = frame_opt else { actual = sequence,
return Poll::Ready(Ok(None)); "Sequence mismatch"
}; );
bail!("Sequence mismatch");
cx.waker().wake_by_ref(); }
tracing::debug!("Got network data"); self.sequence += 1;
self.inbox.push_back(ChatLine { id: self.id, line });
let frame = match frame { }
Err(err) => return Poll::Ready(Err(err).context("Network framing decode error")), }
Ok(x) => x, Ok(())
}; }
let msg = match rmp_serde::from_slice(&frame) {
Err(err) => return Poll::Ready(Err(err).context("MsgPack decode error")), fn handle_outgoing(&mut self, msg: &ToClient) -> Result<()> {
Ok(x) => x, if !self.outbox.is_empty() {
}; bail!("Outbox full");
Poll::Ready(Ok(Some(msg))) }
let bytes = rmp_serde::to_vec(msg)?.into();
self.outbox.push_back(bytes);
Ok(())
}
fn poll_inbox(&mut self) -> Option<ChatLine> {
self.inbox.pop_front()
}
fn poll_send(&mut self) -> Option<Bytes> {
self.outbox.pop_front()
} }
} }