2025-02-23 17:07:09 +00:00
|
|
|
use crate::prelude::*;
|
|
|
|
|
2025-02-23 18:06:07 +00:00
|
|
|
pub(crate) struct Args {
|
|
|
|
pub(crate) port: u16,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Args {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self { port: 9000 }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-02-23 17:07:09 +00:00
|
|
|
pub(crate) struct App {
|
|
|
|
clients: Vec<Client>,
|
|
|
|
listener: TcpListener,
|
|
|
|
next_client_id: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl App {
|
2025-02-23 18:06:07 +00:00
|
|
|
pub(crate) async fn new(args: Args) -> Result<Self> {
|
|
|
|
let listener = TcpListener::bind(("0.0.0.0", args.port)).await?;
|
2025-02-23 17:07:09 +00:00
|
|
|
|
|
|
|
Ok(Self {
|
|
|
|
clients: Default::default(),
|
|
|
|
listener,
|
|
|
|
next_client_id: 1,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
|
|
|
let previous_client_count = self.clients.len();
|
|
|
|
|
|
|
|
let mut new_messages = vec![];
|
|
|
|
for client in &mut self.clients {
|
|
|
|
{
|
|
|
|
let mut stream = pin!(&mut client.stream);
|
|
|
|
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) {
|
|
|
|
Poll::Pending => {}
|
|
|
|
Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")),
|
|
|
|
Poll::Ready(Ok(())) => {}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-02-23 18:06:07 +00:00
|
|
|
if !client.outbox.is_empty() {
|
2025-02-23 17:07:09 +00:00
|
|
|
let mut stream = pin!(&mut client.stream);
|
|
|
|
match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) {
|
|
|
|
Poll::Pending => {}
|
|
|
|
Poll::Ready(Err(err)) => {
|
2025-02-23 18:06:07 +00:00
|
|
|
return Poll::Ready(
|
|
|
|
Err(err).context("Can't check network write half for readiness"),
|
|
|
|
);
|
2025-02-23 17:07:09 +00:00
|
|
|
}
|
|
|
|
Poll::Ready(Ok(())) => {
|
|
|
|
cx.waker().wake_by_ref();
|
|
|
|
let Some(msg) = client.outbox.pop_front() else {
|
2025-02-23 18:06:07 +00:00
|
|
|
return Poll::Ready(Err(anyhow!(
|
|
|
|
"Can't pop from outbox even though we just checked it was not empty"
|
|
|
|
)));
|
2025-02-23 17:07:09 +00:00
|
|
|
};
|
|
|
|
let frame = rmp_serde::to_vec(msg.as_ref())?;
|
|
|
|
if let Err(err) = stream.start_send(Bytes::from(frame)) {
|
|
|
|
return Poll::Ready(Err(err).context("start_send"));
|
|
|
|
}
|
|
|
|
tracing::debug!("Started send");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-02-23 18:06:07 +00:00
|
|
|
match client.poll_next(cx) {
|
2025-02-23 17:07:09 +00:00
|
|
|
Poll::Pending => {}
|
2025-02-23 18:06:07 +00:00
|
|
|
Poll::Ready(Err(err)) => {
|
|
|
|
tracing::error!(?err, "client.poll_next error");
|
2025-02-23 17:07:09 +00:00
|
|
|
}
|
2025-02-23 18:06:07 +00:00
|
|
|
Poll::Ready(Ok(None)) => client.closed = true,
|
|
|
|
Poll::Ready(Ok(Some(ToServer::ChatLine { line, sequence }))) => {
|
2025-02-23 20:55:29 +00:00
|
|
|
tracing::debug!(id = client.id, ?line);
|
2025-02-23 18:06:07 +00:00
|
|
|
if sequence != client.sequence {
|
|
|
|
tracing::error!(
|
|
|
|
expected = client.sequence,
|
|
|
|
actual = sequence,
|
|
|
|
"Sequence mismatch"
|
|
|
|
);
|
|
|
|
return Poll::Ready(Err(anyhow!("Sequence mismatch")));
|
2025-02-23 17:07:09 +00:00
|
|
|
}
|
2025-02-23 20:55:29 +00:00
|
|
|
tracing::info!(?sequence, id = client.id);
|
2025-02-23 18:06:07 +00:00
|
|
|
client.sequence += 1;
|
|
|
|
new_messages.push(Rc::new(ToClient::ChatLine {
|
|
|
|
id: client.id,
|
|
|
|
line,
|
|
|
|
}));
|
2025-02-23 17:07:09 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
self.clients.retain(|client| {
|
|
|
|
if client.closed {
|
|
|
|
tracing::info!(id = client.id, "Closing client");
|
|
|
|
}
|
|
|
|
!client.closed
|
|
|
|
});
|
|
|
|
|
|
|
|
for client in &mut self.clients {
|
|
|
|
for msg in &new_messages {
|
|
|
|
client.outbox.push_back(Rc::clone(msg));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
match self.listener.poll_accept(cx) {
|
|
|
|
Poll::Pending => {}
|
|
|
|
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
|
|
|
|
Poll::Ready(Ok((stream, _addr))) => {
|
|
|
|
cx.waker().wake_by_ref();
|
2025-02-23 18:06:07 +00:00
|
|
|
let stream = Framed::new(stream, LengthDelimitedCodec::new());
|
2025-02-23 17:07:09 +00:00
|
|
|
let client = Client {
|
|
|
|
closed: false,
|
|
|
|
id: self.next_client_id,
|
|
|
|
outbox: Default::default(),
|
2025-02-23 18:06:07 +00:00
|
|
|
sequence: 0,
|
2025-02-23 17:07:09 +00:00
|
|
|
stream,
|
|
|
|
};
|
|
|
|
self.next_client_id += 1;
|
|
|
|
tracing::info!(id = client.id, "Accepted client");
|
|
|
|
self.clients.push(client);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if self.clients.len() != previous_client_count {
|
|
|
|
tracing::info!(client_count = self.clients.len());
|
|
|
|
}
|
|
|
|
|
|
|
|
Poll::Pending
|
|
|
|
}
|
|
|
|
}
|
2025-02-23 18:06:07 +00:00
|
|
|
|
|
|
|
struct Client {
|
|
|
|
closed: bool,
|
|
|
|
id: u64,
|
|
|
|
outbox: VecDeque<Rc<ToClient>>,
|
|
|
|
sequence: u64,
|
|
|
|
stream: Framed<TcpStream, LengthDelimitedCodec>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Client {
|
|
|
|
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<ToServer>>> {
|
|
|
|
let stream = pin!(&mut self.stream);
|
|
|
|
let Poll::Ready(frame_opt) = stream.poll_next(cx) else {
|
|
|
|
return Poll::Pending;
|
|
|
|
};
|
|
|
|
|
|
|
|
let Some(frame) = frame_opt else {
|
|
|
|
return Poll::Ready(Ok(None));
|
|
|
|
};
|
|
|
|
|
|
|
|
cx.waker().wake_by_ref();
|
2025-02-23 20:55:29 +00:00
|
|
|
tracing::debug!("Got network data");
|
2025-02-23 18:06:07 +00:00
|
|
|
|
|
|
|
let frame = match frame {
|
|
|
|
Err(err) => return Poll::Ready(Err(err).context("Network framing decode error")),
|
|
|
|
Ok(x) => x,
|
|
|
|
};
|
|
|
|
let msg = match rmp_serde::from_slice(&frame) {
|
|
|
|
Err(err) => return Poll::Ready(Err(err).context("MsgPack decode error")),
|
|
|
|
Ok(x) => x,
|
|
|
|
};
|
|
|
|
Poll::Ready(Ok(Some(msg)))
|
|
|
|
}
|
|
|
|
}
|