refactor
parent
2606946240
commit
e3c23a0931
|
@ -1,5 +1,5 @@
|
||||||
pub(crate) use crate::messages::{ToClient, ToServer};
|
pub(crate) use crate::messages::{ToClient, ToServer};
|
||||||
pub use anyhow::{Context as _, Result, anyhow, bail};
|
pub use anyhow::{Context as _, Result, bail};
|
||||||
pub use bytes::Bytes;
|
pub use bytes::Bytes;
|
||||||
pub use futures_core::stream::Stream;
|
pub use futures_core::stream::Stream;
|
||||||
pub use futures_sink::Sink;
|
pub use futures_sink::Sink;
|
||||||
|
|
|
@ -48,16 +48,12 @@ impl App {
|
||||||
<_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx)
|
<_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx)
|
||||||
{
|
{
|
||||||
result.context("Can't check network write half for readiness")?;
|
result.context("Can't check network write half for readiness")?;
|
||||||
let client = self
|
if let Some(frame) = self.server.poll_send(*id)? {
|
||||||
.server
|
|
||||||
.clients
|
|
||||||
.get_mut(id)
|
|
||||||
.context("Logic error: Stream has no associated client")?;
|
|
||||||
if let Some(frame) = client.poll_send() {
|
|
||||||
stream.as_mut().start_send(frame).context("start_send")?;
|
stream.as_mut().start_send(frame).context("start_send")?;
|
||||||
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) {
|
if let Poll::Ready(result) =
|
||||||
Poll::Pending => {}
|
<_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx)
|
||||||
Poll::Ready(result) => result.context("poll_flush")?,
|
{
|
||||||
|
result.context("poll_flush")?;
|
||||||
}
|
}
|
||||||
tracing::debug!("Started send");
|
tracing::debug!("Started send");
|
||||||
}
|
}
|
||||||
|
@ -70,14 +66,7 @@ impl App {
|
||||||
Poll::Ready(Some(result)) => {
|
Poll::Ready(Some(result)) => {
|
||||||
let frame = result.context("stream.poll_next")?;
|
let frame = result.context("stream.poll_next")?;
|
||||||
cx.waker().wake_by_ref();
|
cx.waker().wake_by_ref();
|
||||||
let client = self
|
self.server.handle_client_frame(*id, frame.into())?;
|
||||||
.server
|
|
||||||
.clients
|
|
||||||
.get_mut(id)
|
|
||||||
.context("Logic error: Stream has no associated client")?;
|
|
||||||
client
|
|
||||||
.handle_frame(frame.into())
|
|
||||||
.context("client.handle_frame")?
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,33 +75,15 @@ impl App {
|
||||||
for id in clients_to_remove {
|
for id in clients_to_remove {
|
||||||
tracing::info!(?id, "Closing client");
|
tracing::info!(?id, "Closing client");
|
||||||
self.client_streams.remove(&id);
|
self.client_streams.remove(&id);
|
||||||
self.server.clients.remove(&id);
|
self.server.handle_client_disconnected(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcast chat lines across all clients
|
if let Poll::Ready(result) = self.listener.poll_accept(cx) {
|
||||||
|
let (stream, _addr) = result.context("listener.poll_accept")?;
|
||||||
let mut new_messages = vec![];
|
cx.waker().wake_by_ref();
|
||||||
for client in &mut self.server.clients.values_mut() {
|
let stream = Framed::new(stream, LengthDelimitedCodec::new());
|
||||||
if let Some(ChatLine { id, line }) = client.poll_inbox() {
|
let id = self.server.handle_new_client();
|
||||||
new_messages.push(ToClient::ChatLine { id, line });
|
self.client_streams.insert(id, stream);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for client in &mut self.server.clients.values_mut() {
|
|
||||||
for msg in &new_messages {
|
|
||||||
client.handle_outgoing(msg)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match self.listener.poll_accept(cx) {
|
|
||||||
Poll::Pending => {}
|
|
||||||
Poll::Ready(result) => {
|
|
||||||
let (stream, _addr) = result.context("listener.poll_accept")?;
|
|
||||||
cx.waker().wake_by_ref();
|
|
||||||
let stream = Framed::new(stream, LengthDelimitedCodec::new());
|
|
||||||
let id = self.server.handle_new_client();
|
|
||||||
self.client_streams.insert(id, stream);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -126,12 +97,32 @@ struct Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
|
fn handle_client_disconnected(&mut self, id: Id) {
|
||||||
|
self.clients.remove(&id);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_client_frame(&mut self, id: Id, frame: Bytes) -> Result<()> {
|
||||||
|
let msg = {
|
||||||
|
let client = self
|
||||||
|
.clients
|
||||||
|
.get_mut(&id)
|
||||||
|
.context("Logic error: Stream has no associated client")?;
|
||||||
|
let Some(msg) = client.handle_frame(frame).context("client.handle_frame")? else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
msg
|
||||||
|
};
|
||||||
|
for client in &mut self.clients.values_mut() {
|
||||||
|
client.handle_outgoing(&msg)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_new_client(&mut self) -> Id {
|
fn handle_new_client(&mut self) -> Id {
|
||||||
let id = self.next_client_id;
|
let id = self.next_client_id;
|
||||||
self.next_client_id += 1;
|
self.next_client_id += 1;
|
||||||
let client = Client {
|
let client = Client {
|
||||||
id,
|
id,
|
||||||
inbox: Default::default(),
|
|
||||||
outbox: Default::default(),
|
outbox: Default::default(),
|
||||||
sequence: 0,
|
sequence: 0,
|
||||||
};
|
};
|
||||||
|
@ -143,22 +134,24 @@ impl Server {
|
||||||
self.clients.insert(id, client);
|
self.clients.insert(id, client);
|
||||||
id
|
id
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
struct ChatLine {
|
fn poll_send(&mut self, id: Id) -> Result<Option<Bytes>> {
|
||||||
id: Id,
|
let client = self
|
||||||
line: String,
|
.clients
|
||||||
|
.get_mut(&id)
|
||||||
|
.context("Logic error: Stream has no associated client")?;
|
||||||
|
Ok(client.poll_send())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Client {
|
struct Client {
|
||||||
id: u64,
|
id: u64,
|
||||||
inbox: VecDeque<ChatLine>,
|
|
||||||
outbox: VecDeque<Bytes>,
|
outbox: VecDeque<Bytes>,
|
||||||
sequence: u64,
|
sequence: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
fn handle_frame(&mut self, frame: Bytes) -> Result<()> {
|
fn handle_frame(&mut self, frame: Bytes) -> Result<Option<ToClient>> {
|
||||||
match rmp_serde::from_slice(&frame)? {
|
match rmp_serde::from_slice(&frame)? {
|
||||||
ToServer::ChatLine { line, sequence } => {
|
ToServer::ChatLine { line, sequence } => {
|
||||||
if sequence != self.sequence {
|
if sequence != self.sequence {
|
||||||
|
@ -170,10 +163,9 @@ impl Client {
|
||||||
bail!("Sequence mismatch");
|
bail!("Sequence mismatch");
|
||||||
}
|
}
|
||||||
self.sequence += 1;
|
self.sequence += 1;
|
||||||
self.inbox.push_back(ChatLine { id: self.id, line });
|
Ok(Some(ToClient::ChatLine { id: self.id, line }))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_outgoing(&mut self, msg: &ToClient) -> Result<()> {
|
fn handle_outgoing(&mut self, msg: &ToClient) -> Result<()> {
|
||||||
|
@ -185,10 +177,6 @@ impl Client {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_inbox(&mut self) -> Option<ChatLine> {
|
|
||||||
self.inbox.pop_front()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll_send(&mut self) -> Option<Bytes> {
|
fn poll_send(&mut self) -> Option<Bytes> {
|
||||||
self.outbox.pop_front()
|
self.outbox.pop_front()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue