Compare commits
8 Commits
4548f9daee
...
c4de3ad00d
Author | SHA1 | Date |
---|---|---|
![]() |
c4de3ad00d | |
![]() |
e3c23a0931 | |
![]() |
2606946240 | |
![]() |
7119603145 | |
![]() |
a2b286b01d | |
![]() |
425503d66d | |
![]() |
4d018413f8 | |
![]() |
fd93df42e1 |
106
src/client.rs
106
src/client.rs
|
@ -15,8 +15,7 @@ impl Default for Args {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct App {
|
pub(crate) struct App {
|
||||||
outbox: VecDeque<ToServer>,
|
client: Client,
|
||||||
sequence: u64,
|
|
||||||
stream: Framed<TcpStream, LengthDelimitedCodec>,
|
stream: Framed<TcpStream, LengthDelimitedCodec>,
|
||||||
timer: Interval,
|
timer: Interval,
|
||||||
}
|
}
|
||||||
|
@ -28,69 +27,92 @@ impl App {
|
||||||
let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into()));
|
let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into()));
|
||||||
timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
outbox: Default::default(),
|
client: Default::default(),
|
||||||
sequence: 0,
|
|
||||||
stream,
|
stream,
|
||||||
timer,
|
timer,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||||
{
|
match self.step(cx) {
|
||||||
let mut stream = pin!(&mut self.stream);
|
Ok(()) => Poll::Pending,
|
||||||
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) {
|
Err(err) => Poll::Ready(Err(err)),
|
||||||
Poll::Pending => {}
|
|
||||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")),
|
|
||||||
Poll::Ready(Ok(())) => {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !self.outbox.is_empty() {
|
fn step(&mut self, cx: &mut Context<'_>) -> Result<()> {
|
||||||
let mut stream = pin!(&mut self.stream);
|
let mut stream = pin!(&mut self.stream);
|
||||||
match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) {
|
if let Poll::Ready(result) =
|
||||||
Poll::Pending => {}
|
<_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx)
|
||||||
Poll::Ready(Err(err)) => {
|
{
|
||||||
return Poll::Ready(
|
result?;
|
||||||
Err(err).context("Can't check network write half for readiness"),
|
if let Some(frame) = self.client.poll_send() {
|
||||||
);
|
stream
|
||||||
}
|
.as_mut()
|
||||||
Poll::Ready(Ok(())) => {
|
.start_send(frame)
|
||||||
cx.waker().wake_by_ref();
|
.context("stream.start_send")?;
|
||||||
let frame = rmp_serde::to_vec(&self.outbox.pop_front())?;
|
|
||||||
if let Err(err) = stream.start_send(Bytes::from(frame)) {
|
|
||||||
return Poll::Ready(Err(err).context("start_send"));
|
|
||||||
}
|
|
||||||
tracing::debug!("Started send");
|
tracing::debug!("Started send");
|
||||||
}
|
}
|
||||||
|
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) {
|
||||||
|
Poll::Pending => {}
|
||||||
|
Poll::Ready(result) => {
|
||||||
|
result.context("poll_flush")?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let stream = pin!(&mut self.stream);
|
if let Poll::Ready(frame_opt) = stream.as_mut().poll_next(cx) {
|
||||||
match stream.poll_next(cx) {
|
let frame = frame_opt.context("Server closed cxn")?;
|
||||||
Poll::Pending => {}
|
|
||||||
Poll::Ready(None) => return Poll::Ready(Err(anyhow!("Server closed cxn"))),
|
|
||||||
Poll::Ready(Some(frame)) => {
|
|
||||||
cx.waker().wake_by_ref();
|
cx.waker().wake_by_ref();
|
||||||
let frame = frame.context("network framing decode")?;
|
let frame = frame.context("network framing decode")?;
|
||||||
match rmp_serde::from_slice(&frame)? {
|
self.client
|
||||||
ToClient::ChatLine { id, line } => {
|
.handle_frame(frame.into())
|
||||||
tracing::info!(?id, ?line);
|
.context("client.handle_frame")?;
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.timer.poll_tick(cx).is_ready() {
|
if self.timer.poll_tick(cx).is_ready() {
|
||||||
cx.waker().wake_by_ref();
|
cx.waker().wake_by_ref();
|
||||||
if self.outbox.is_empty() {
|
self.client
|
||||||
self.outbox.push_back(ToServer::ChatLine {
|
.handle_timeout()
|
||||||
line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(),
|
.context("client.handle_timeout")?;
|
||||||
sequence: self.sequence,
|
}
|
||||||
});
|
|
||||||
self.sequence += 1;
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Poll::Pending
|
#[derive(Default)]
|
||||||
|
struct Client {
|
||||||
|
outbox: VecDeque<Bytes>,
|
||||||
|
sequence: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
fn handle_frame(&self, frame: Bytes) -> Result<()> {
|
||||||
|
match rmp_serde::from_slice(&frame)? {
|
||||||
|
ToClient::ChatLine { id, line } => tracing::info!(?id, ?line),
|
||||||
|
ToClient::ClientConnected { id } => tracing::info!(?id, "Connected"),
|
||||||
|
ToClient::ClientDisconnected { id } => tracing::info!(?id, "Disconnected"),
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_timeout(&mut self) -> Result<()> {
|
||||||
|
if !self.outbox.is_empty() {
|
||||||
|
bail!("Dropped message, outbox is full");
|
||||||
|
}
|
||||||
|
|
||||||
|
let msg = ToServer::ChatLine {
|
||||||
|
line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(),
|
||||||
|
sequence: self.sequence,
|
||||||
|
};
|
||||||
|
let frame = rmp_serde::to_vec(&msg)?;
|
||||||
|
self.outbox.push_back(frame.into());
|
||||||
|
self.sequence += 1;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_send(&mut self) -> Option<Bytes> {
|
||||||
|
self.outbox.pop_front()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
|
use crate::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
#[derive(Deserialize, Serialize)]
|
||||||
pub(crate) enum ToClient {
|
pub(crate) enum ToClient {
|
||||||
ChatLine { id: u64, line: String },
|
ChatLine { id: Id, line: String },
|
||||||
|
ClientConnected { id: Id },
|
||||||
|
ClientDisconnected { id: Id },
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
#[derive(Deserialize, Serialize)]
|
||||||
|
|
|
@ -1,14 +1,13 @@
|
||||||
pub(crate) use crate::messages::{ToClient, ToServer};
|
pub(crate) use crate::messages::{ToClient, ToServer};
|
||||||
pub use anyhow::{Context as _, Result, anyhow, bail};
|
pub use anyhow::{Context as _, Result, bail};
|
||||||
pub use bytes::Bytes;
|
pub use bytes::Bytes;
|
||||||
pub use futures_core::stream::Stream;
|
pub use futures_core::stream::Stream;
|
||||||
pub use futures_sink::Sink;
|
pub use futures_sink::Sink;
|
||||||
pub use std::{
|
pub use std::{
|
||||||
collections::VecDeque,
|
collections::{BTreeMap, VecDeque},
|
||||||
future::poll_fn,
|
future::poll_fn,
|
||||||
ops::ControlFlow,
|
ops::ControlFlow,
|
||||||
pin::pin,
|
pin::pin,
|
||||||
rc::Rc,
|
|
||||||
str::FromStr,
|
str::FromStr,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
|
@ -20,3 +19,5 @@ pub use tokio::{
|
||||||
};
|
};
|
||||||
// Don't use BytesCodec, it is _nonsense_
|
// Don't use BytesCodec, it is _nonsense_
|
||||||
pub use tokio_util::codec::{Framed, LengthDelimitedCodec};
|
pub use tokio_util::codec::{Framed, LengthDelimitedCodec};
|
||||||
|
|
||||||
|
pub type Id = u64;
|
||||||
|
|
239
src/server.rs
239
src/server.rs
|
@ -11,9 +11,9 @@ impl Default for Args {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct App {
|
pub(crate) struct App {
|
||||||
clients: Vec<Client>,
|
client_streams: BTreeMap<Id, Framed<TcpStream, LengthDelimitedCodec>>,
|
||||||
listener: TcpListener,
|
listener: TcpListener,
|
||||||
next_client_id: u64,
|
server: Server,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl App {
|
impl App {
|
||||||
|
@ -21,147 +21,170 @@ impl App {
|
||||||
let listener = TcpListener::bind(("0.0.0.0", args.port)).await?;
|
let listener = TcpListener::bind(("0.0.0.0", args.port)).await?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
clients: Default::default(),
|
client_streams: Default::default(),
|
||||||
listener,
|
listener,
|
||||||
next_client_id: 1,
|
server: Default::default(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||||
let previous_client_count = self.clients.len();
|
match self.step(cx) {
|
||||||
|
Ok(()) => Poll::Pending,
|
||||||
|
Err(err) => Poll::Ready(Err(err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut new_messages = vec![];
|
fn step(&mut self, cx: &mut Context<'_>) -> Result<()> {
|
||||||
for client in &mut self.clients {
|
let mut clients_to_remove = vec![];
|
||||||
|
|
||||||
|
// Pump the network streams
|
||||||
|
for (id, stream) in &mut self.client_streams {
|
||||||
|
let mut stream = pin!(stream);
|
||||||
|
|
||||||
|
// Try to send from the client's outbox
|
||||||
|
if let Poll::Ready(result) =
|
||||||
|
<_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx)
|
||||||
{
|
{
|
||||||
let mut stream = pin!(&mut client.stream);
|
result.context("Can't check network write half for readiness")?;
|
||||||
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) {
|
if let Some(frame) = self.server.poll_send(*id)? {
|
||||||
Poll::Pending => {}
|
stream.as_mut().start_send(frame).context("start_send")?;
|
||||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")),
|
if let Poll::Ready(result) =
|
||||||
Poll::Ready(Ok(())) => {}
|
<_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx)
|
||||||
}
|
{
|
||||||
}
|
result.context("poll_flush")?;
|
||||||
|
|
||||||
if !client.outbox.is_empty() {
|
|
||||||
let mut stream = pin!(&mut client.stream);
|
|
||||||
match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) {
|
|
||||||
Poll::Pending => {}
|
|
||||||
Poll::Ready(Err(err)) => {
|
|
||||||
return Poll::Ready(
|
|
||||||
Err(err).context("Can't check network write half for readiness"),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Poll::Ready(Ok(())) => {
|
|
||||||
cx.waker().wake_by_ref();
|
|
||||||
let Some(msg) = client.outbox.pop_front() else {
|
|
||||||
return Poll::Ready(Err(anyhow!(
|
|
||||||
"Can't pop from outbox even though we just checked it was not empty"
|
|
||||||
)));
|
|
||||||
};
|
|
||||||
let frame = rmp_serde::to_vec(msg.as_ref())?;
|
|
||||||
if let Err(err) = stream.start_send(Bytes::from(frame)) {
|
|
||||||
return Poll::Ready(Err(err).context("start_send"));
|
|
||||||
}
|
}
|
||||||
tracing::debug!("Started send");
|
tracing::debug!("Started send");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
match client.poll_next(cx) {
|
// Try to read data in
|
||||||
|
match stream.as_mut().poll_next(cx) {
|
||||||
Poll::Pending => {}
|
Poll::Pending => {}
|
||||||
Poll::Ready(Err(err)) => {
|
Poll::Ready(None) => clients_to_remove.push(*id),
|
||||||
tracing::error!(?err, "client.poll_next error");
|
Poll::Ready(Some(result)) => {
|
||||||
}
|
let frame = result.context("stream.poll_next")?;
|
||||||
Poll::Ready(Ok(None)) => client.closed = true,
|
cx.waker().wake_by_ref();
|
||||||
Poll::Ready(Ok(Some(ToServer::ChatLine { line, sequence }))) => {
|
self.server.handle_client_frame(*id, frame.into())?;
|
||||||
tracing::debug!(id = client.id, ?line);
|
|
||||||
if sequence != client.sequence {
|
|
||||||
tracing::error!(
|
|
||||||
expected = client.sequence,
|
|
||||||
actual = sequence,
|
|
||||||
"Sequence mismatch"
|
|
||||||
);
|
|
||||||
return Poll::Ready(Err(anyhow!("Sequence mismatch")));
|
|
||||||
}
|
|
||||||
tracing::info!(?sequence, id = client.id);
|
|
||||||
client.sequence += 1;
|
|
||||||
new_messages.push(Rc::new(ToClient::ChatLine {
|
|
||||||
id: client.id,
|
|
||||||
line,
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.clients.retain(|client| {
|
// Close out disconnected clients
|
||||||
if client.closed {
|
for id in clients_to_remove {
|
||||||
tracing::info!(id = client.id, "Closing client");
|
cx.waker().wake_by_ref();
|
||||||
}
|
tracing::info!(?id, "Closing client");
|
||||||
!client.closed
|
self.client_streams.remove(&id);
|
||||||
});
|
self.server.handle_client_disconnected(id)?;
|
||||||
|
|
||||||
for client in &mut self.clients {
|
|
||||||
for msg in &new_messages {
|
|
||||||
client.outbox.push_back(Rc::clone(msg));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.listener.poll_accept(cx) {
|
if let Poll::Ready(result) = self.listener.poll_accept(cx) {
|
||||||
Poll::Pending => {}
|
let (stream, _addr) = result.context("listener.poll_accept")?;
|
||||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
|
|
||||||
Poll::Ready(Ok((stream, _addr))) => {
|
|
||||||
cx.waker().wake_by_ref();
|
cx.waker().wake_by_ref();
|
||||||
let stream = Framed::new(stream, LengthDelimitedCodec::new());
|
let stream = Framed::new(stream, LengthDelimitedCodec::new());
|
||||||
|
let id = self.server.handle_new_client()?;
|
||||||
|
self.client_streams.insert(id, stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct Server {
|
||||||
|
clients: BTreeMap<Id, Client>,
|
||||||
|
next_client_id: Id,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Server {
|
||||||
|
fn broadcast(&mut self, msg: &ToClient) -> Result<()> {
|
||||||
|
for client in &mut self.clients.values_mut() {
|
||||||
|
client.handle_outgoing(msg)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_client_disconnected(&mut self, id: Id) -> Result<()> {
|
||||||
|
self.clients.remove(&id);
|
||||||
|
self.broadcast(&ToClient::ClientDisconnected { id })?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_client_frame(&mut self, id: Id, frame: Bytes) -> Result<()> {
|
||||||
|
let msg = {
|
||||||
|
let client = self
|
||||||
|
.clients
|
||||||
|
.get_mut(&id)
|
||||||
|
.context("Logic error: Stream has no associated client")?;
|
||||||
|
let Some(msg) = client.handle_frame(frame).context("client.handle_frame")? else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
msg
|
||||||
|
};
|
||||||
|
self.broadcast(&msg)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_new_client(&mut self) -> Result<Id> {
|
||||||
|
let id = self.next_client_id;
|
||||||
|
self.next_client_id += 1;
|
||||||
let client = Client {
|
let client = Client {
|
||||||
closed: false,
|
id,
|
||||||
id: self.next_client_id,
|
|
||||||
outbox: Default::default(),
|
outbox: Default::default(),
|
||||||
sequence: 0,
|
sequence: 0,
|
||||||
stream,
|
|
||||||
};
|
};
|
||||||
self.next_client_id += 1;
|
tracing::info!(
|
||||||
tracing::info!(id = client.id, "Accepted client");
|
id = client.id,
|
||||||
self.clients.push(client);
|
total = self.clients.len(),
|
||||||
}
|
"Accepted client"
|
||||||
|
);
|
||||||
|
self.clients.insert(id, client);
|
||||||
|
self.broadcast(&ToClient::ClientConnected { id })?;
|
||||||
|
Ok(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.clients.len() != previous_client_count {
|
fn poll_send(&mut self, id: Id) -> Result<Option<Bytes>> {
|
||||||
tracing::info!(client_count = self.clients.len());
|
let client = self
|
||||||
}
|
.clients
|
||||||
|
.get_mut(&id)
|
||||||
Poll::Pending
|
.context("Logic error: Stream has no associated client")?;
|
||||||
|
Ok(client.poll_send())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Client {
|
struct Client {
|
||||||
closed: bool,
|
|
||||||
id: u64,
|
id: u64,
|
||||||
outbox: VecDeque<Rc<ToClient>>,
|
outbox: VecDeque<Bytes>,
|
||||||
sequence: u64,
|
sequence: u64,
|
||||||
stream: Framed<TcpStream, LengthDelimitedCodec>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<ToServer>>> {
|
fn handle_frame(&mut self, frame: Bytes) -> Result<Option<ToClient>> {
|
||||||
let stream = pin!(&mut self.stream);
|
match rmp_serde::from_slice(&frame)? {
|
||||||
let Poll::Ready(frame_opt) = stream.poll_next(cx) else {
|
ToServer::ChatLine { line, sequence } => {
|
||||||
return Poll::Pending;
|
if sequence != self.sequence {
|
||||||
};
|
tracing::error!(
|
||||||
|
expected = self.sequence,
|
||||||
let Some(frame) = frame_opt else {
|
actual = sequence,
|
||||||
return Poll::Ready(Ok(None));
|
"Sequence mismatch"
|
||||||
};
|
);
|
||||||
|
bail!("Sequence mismatch");
|
||||||
cx.waker().wake_by_ref();
|
}
|
||||||
tracing::debug!("Got network data");
|
self.sequence += 1;
|
||||||
|
Ok(Some(ToClient::ChatLine { id: self.id, line }))
|
||||||
let frame = match frame {
|
}
|
||||||
Err(err) => return Poll::Ready(Err(err).context("Network framing decode error")),
|
}
|
||||||
Ok(x) => x,
|
}
|
||||||
};
|
|
||||||
let msg = match rmp_serde::from_slice(&frame) {
|
fn handle_outgoing(&mut self, msg: &ToClient) -> Result<()> {
|
||||||
Err(err) => return Poll::Ready(Err(err).context("MsgPack decode error")),
|
if !self.outbox.is_empty() {
|
||||||
Ok(x) => x,
|
bail!("Outbox full");
|
||||||
};
|
}
|
||||||
Poll::Ready(Ok(Some(msg)))
|
let bytes = rmp_serde::to_vec(msg)?.into();
|
||||||
|
self.outbox.push_back(bytes);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_send(&mut self) -> Option<Bytes> {
|
||||||
|
self.outbox.pop_front()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue