Compare commits
No commits in common. "a0b6726af0d6bfd981234d09ff7d8cffb5494f37" and "c4de3ad00dfbc71f8c1d149d053ecf2b33cbceb0" have entirely different histories.
a0b6726af0
...
c4de3ad00d
|
@ -1,7 +1,6 @@
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
|
|
||||||
pub(crate) struct Args {
|
pub(crate) struct Args {
|
||||||
pub(crate) name: String,
|
|
||||||
pub(crate) period_ms: u32,
|
pub(crate) period_ms: u32,
|
||||||
pub(crate) port: u16,
|
pub(crate) port: u16,
|
||||||
}
|
}
|
||||||
|
@ -9,7 +8,6 @@ pub(crate) struct Args {
|
||||||
impl Default for Args {
|
impl Default for Args {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
name: "anon".to_string(),
|
|
||||||
period_ms: 1_000,
|
period_ms: 1_000,
|
||||||
port: 9000,
|
port: 9000,
|
||||||
}
|
}
|
||||||
|
@ -29,16 +27,12 @@ impl App {
|
||||||
let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into()));
|
let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into()));
|
||||||
timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
client: Client::new(args.name)?,
|
client: Default::default(),
|
||||||
stream,
|
stream,
|
||||||
timer,
|
timer,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn handle_stdin(&mut self, line: String) -> Result<()> {
|
|
||||||
self.client.handle_stdin(line)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
|
||||||
match self.step(cx) {
|
match self.step(cx) {
|
||||||
Ok(()) => Poll::Pending,
|
Ok(()) => Poll::Pending,
|
||||||
|
@ -94,58 +88,28 @@ struct Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
fn new(name: String) -> Result<Self> {
|
|
||||||
let mut this = Self {
|
|
||||||
outbox: Default::default(),
|
|
||||||
sequence: 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
this.handle_set_name(name)?;
|
|
||||||
Ok(this)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn enqueue(&mut self, msg: &ToServer) -> Result<()> {
|
|
||||||
if !self.outbox.is_empty() {
|
|
||||||
bail!("Dropped message, outbox is full");
|
|
||||||
}
|
|
||||||
let frame = rmp_serde::to_vec(&msg)?;
|
|
||||||
self.outbox.push_back(frame.into());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_frame(&self, frame: Bytes) -> Result<()> {
|
fn handle_frame(&self, frame: Bytes) -> Result<()> {
|
||||||
let ToClient { id, name, event } = rmp_serde::from_slice(&frame)?;
|
match rmp_serde::from_slice(&frame)? {
|
||||||
match event {
|
ToClient::ChatLine { id, line } => tracing::info!(?id, ?line),
|
||||||
ToClientEvent::ChatLine { line } => tracing::info!(?name, ?line),
|
ToClient::ClientConnected { id } => tracing::info!(?id, "Connected"),
|
||||||
ToClientEvent::Connected => tracing::info!(?id, "Connected"),
|
ToClient::ClientDisconnected { id } => tracing::info!(?id, "Disconnected"),
|
||||||
ToClientEvent::Disconnected => tracing::info!(?id, ?name, "Disconnected"),
|
|
||||||
ToClientEvent::NameChanged { old_name } => {
|
|
||||||
tracing::info!(?id, ?old_name, new_name = name, "Name changed")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_set_name(&mut self, name: String) -> Result<()> {
|
|
||||||
self.enqueue(&ToServer::SetName { name })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn handle_stdin(&mut self, line: String) -> Result<()> {
|
|
||||||
let msg = ToServer::ChatLine {
|
|
||||||
line,
|
|
||||||
sequence: self.sequence,
|
|
||||||
};
|
|
||||||
self.sequence += 1;
|
|
||||||
self.enqueue(&msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_timeout(&mut self) -> Result<()> {
|
fn handle_timeout(&mut self) -> Result<()> {
|
||||||
|
if !self.outbox.is_empty() {
|
||||||
|
bail!("Dropped message, outbox is full");
|
||||||
|
}
|
||||||
|
|
||||||
let msg = ToServer::ChatLine {
|
let msg = ToServer::ChatLine {
|
||||||
line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(),
|
line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(),
|
||||||
sequence: self.sequence,
|
sequence: self.sequence,
|
||||||
};
|
};
|
||||||
|
let frame = rmp_serde::to_vec(&msg)?;
|
||||||
|
self.outbox.push_back(frame.into());
|
||||||
self.sequence += 1;
|
self.sequence += 1;
|
||||||
self.enqueue(&msg)
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_send(&mut self) -> Option<Bytes> {
|
fn poll_send(&mut self) -> Option<Bytes> {
|
||||||
|
|
27
src/main.rs
27
src/main.rs
|
@ -24,9 +24,6 @@ impl Args {
|
||||||
loop {
|
loop {
|
||||||
match args.next().as_deref() {
|
match args.next().as_deref() {
|
||||||
None => break,
|
None => break,
|
||||||
Some("--name") => {
|
|
||||||
client_args.name = args.next().context("Missing arg")?.to_string()
|
|
||||||
}
|
|
||||||
Some("--period-ms") => {
|
Some("--period-ms") => {
|
||||||
client_args.period_ms =
|
client_args.period_ms =
|
||||||
u32::from_str(&args.next().context("Missing arg")?)?
|
u32::from_str(&args.next().context("Missing arg")?)?
|
||||||
|
@ -71,12 +68,6 @@ async fn main() -> Result<()> {
|
||||||
|
|
||||||
async fn run_client(args: client::Args) -> Result<()> {
|
async fn run_client(args: client::Args) -> Result<()> {
|
||||||
let mut ctrl_c = signal(SignalKind::interrupt())?;
|
let mut ctrl_c = signal(SignalKind::interrupt())?;
|
||||||
let (stdin_tx, mut stdin_rx) = tokio::sync::mpsc::channel(1);
|
|
||||||
std::thread::spawn(move || {
|
|
||||||
for line in std::io::stdin().lines() {
|
|
||||||
stdin_tx.blocking_send(line).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
let mut app = client::App::new(args).await?;
|
let mut app = client::App::new(args).await?;
|
||||||
loop {
|
loop {
|
||||||
let ctl = poll_fn(|cx| {
|
let ctl = poll_fn(|cx| {
|
||||||
|
@ -86,24 +77,6 @@ async fn run_client(args: client::Args) -> Result<()> {
|
||||||
return Poll::Ready(ControlFlow::Break(Ok(())));
|
return Poll::Ready(ControlFlow::Break(Ok(())));
|
||||||
}
|
}
|
||||||
|
|
||||||
match stdin_rx.poll_recv(cx) {
|
|
||||||
Poll::Pending => {}
|
|
||||||
Poll::Ready(None) => {
|
|
||||||
return Poll::Ready(ControlFlow::Break(Err(anyhow!(
|
|
||||||
"Stdin thread somehow died"
|
|
||||||
))));
|
|
||||||
}
|
|
||||||
Poll::Ready(Some(line)) => {
|
|
||||||
let line = line.unwrap();
|
|
||||||
cx.waker().wake_by_ref();
|
|
||||||
if let Err(err) = app.handle_stdin(line) {
|
|
||||||
return Poll::Ready(ControlFlow::Break(
|
|
||||||
Err(err).context("app.handle_stdin"),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match app.poll_run(cx) {
|
match app.poll_run(cx) {
|
||||||
Poll::Pending => Poll::Pending,
|
Poll::Pending => Poll::Pending,
|
||||||
Poll::Ready(Ok(())) => Poll::Ready(ControlFlow::Continue(())),
|
Poll::Ready(Ok(())) => Poll::Ready(ControlFlow::Continue(())),
|
||||||
|
|
|
@ -2,22 +2,13 @@ use crate::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
#[derive(Deserialize, Serialize)]
|
||||||
pub(crate) struct ToClient {
|
pub(crate) enum ToClient {
|
||||||
pub(crate) id: Id,
|
ChatLine { id: Id, line: String },
|
||||||
pub(crate) name: String,
|
ClientConnected { id: Id },
|
||||||
pub(crate) event: ToClientEvent,
|
ClientDisconnected { id: Id },
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
|
||||||
pub(crate) enum ToClientEvent {
|
|
||||||
ChatLine { line: String },
|
|
||||||
Connected,
|
|
||||||
Disconnected,
|
|
||||||
NameChanged { old_name: String },
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
#[derive(Deserialize, Serialize)]
|
||||||
pub(crate) enum ToServer {
|
pub(crate) enum ToServer {
|
||||||
ChatLine { line: String, sequence: u64 },
|
ChatLine { line: String, sequence: u64 },
|
||||||
SetName { name: String },
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
pub(crate) use crate::messages::{ToClient, ToClientEvent, ToServer};
|
pub(crate) use crate::messages::{ToClient, ToServer};
|
||||||
pub use anyhow::{Context as _, Result, anyhow, bail};
|
pub use anyhow::{Context as _, Result, bail};
|
||||||
pub use bytes::Bytes;
|
pub use bytes::Bytes;
|
||||||
pub use futures_core::stream::Stream;
|
pub use futures_core::stream::Stream;
|
||||||
pub use futures_sink::Sink;
|
pub use futures_sink::Sink;
|
||||||
|
|
|
@ -104,30 +104,23 @@ impl Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_client_disconnected(&mut self, id: Id) -> Result<()> {
|
fn handle_client_disconnected(&mut self, id: Id) -> Result<()> {
|
||||||
let client = self
|
self.clients.remove(&id);
|
||||||
.clients
|
self.broadcast(&ToClient::ClientDisconnected { id })?;
|
||||||
.remove(&id)
|
|
||||||
.context("Can't remove client who isn't there")?;
|
|
||||||
self.broadcast(&ToClient {
|
|
||||||
id,
|
|
||||||
name: client.name,
|
|
||||||
event: ToClientEvent::Disconnected,
|
|
||||||
})?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_client_frame(&mut self, id: Id, frame: Bytes) -> Result<()> {
|
fn handle_client_frame(&mut self, id: Id, frame: Bytes) -> Result<()> {
|
||||||
let (name, event) = {
|
let msg = {
|
||||||
let client = self
|
let client = self
|
||||||
.clients
|
.clients
|
||||||
.get_mut(&id)
|
.get_mut(&id)
|
||||||
.context("Logic error: Stream has no associated client")?;
|
.context("Logic error: Stream has no associated client")?;
|
||||||
let Some(event) = client.handle_frame(frame).context("client.handle_frame")? else {
|
let Some(msg) = client.handle_frame(frame).context("client.handle_frame")? else {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
(client.name.clone(), event)
|
msg
|
||||||
};
|
};
|
||||||
self.broadcast(&ToClient { id, name, event })?;
|
self.broadcast(&msg)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,7 +129,6 @@ impl Server {
|
||||||
self.next_client_id += 1;
|
self.next_client_id += 1;
|
||||||
let client = Client {
|
let client = Client {
|
||||||
id,
|
id,
|
||||||
name: "anon".to_string(),
|
|
||||||
outbox: Default::default(),
|
outbox: Default::default(),
|
||||||
sequence: 0,
|
sequence: 0,
|
||||||
};
|
};
|
||||||
|
@ -145,12 +137,8 @@ impl Server {
|
||||||
total = self.clients.len(),
|
total = self.clients.len(),
|
||||||
"Accepted client"
|
"Accepted client"
|
||||||
);
|
);
|
||||||
self.broadcast(&ToClient {
|
|
||||||
id,
|
|
||||||
name: client.name.clone(),
|
|
||||||
event: ToClientEvent::Connected,
|
|
||||||
})?;
|
|
||||||
self.clients.insert(id, client);
|
self.clients.insert(id, client);
|
||||||
|
self.broadcast(&ToClient::ClientConnected { id })?;
|
||||||
Ok(id)
|
Ok(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,13 +153,12 @@ impl Server {
|
||||||
|
|
||||||
struct Client {
|
struct Client {
|
||||||
id: u64,
|
id: u64,
|
||||||
name: String,
|
|
||||||
outbox: VecDeque<Bytes>,
|
outbox: VecDeque<Bytes>,
|
||||||
sequence: u64,
|
sequence: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
fn handle_frame(&mut self, frame: Bytes) -> Result<Option<ToClientEvent>> {
|
fn handle_frame(&mut self, frame: Bytes) -> Result<Option<ToClient>> {
|
||||||
match rmp_serde::from_slice(&frame)? {
|
match rmp_serde::from_slice(&frame)? {
|
||||||
ToServer::ChatLine { line, sequence } => {
|
ToServer::ChatLine { line, sequence } => {
|
||||||
if sequence != self.sequence {
|
if sequence != self.sequence {
|
||||||
|
@ -183,11 +170,7 @@ impl Client {
|
||||||
bail!("Sequence mismatch");
|
bail!("Sequence mismatch");
|
||||||
}
|
}
|
||||||
self.sequence += 1;
|
self.sequence += 1;
|
||||||
Ok(Some(ToClientEvent::ChatLine { line }))
|
Ok(Some(ToClient::ChatLine { id: self.id, line }))
|
||||||
}
|
|
||||||
ToServer::SetName { name } => {
|
|
||||||
let old_name = std::mem::replace(&mut self.name, name);
|
|
||||||
Ok(Some(ToClientEvent::NameChanged { old_name }))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue