Compare commits

...

2 Commits

Author SHA1 Message Date
_ a0b6726af0 add interactive chatting with stdin 2025-02-24 05:02:33 +00:00
_ 54560e118f add display names which currently do nothing 2025-02-24 04:45:39 +00:00
5 changed files with 116 additions and 27 deletions

View File

@ -1,6 +1,7 @@
use crate::prelude::*;
pub(crate) struct Args {
pub(crate) name: String,
pub(crate) period_ms: u32,
pub(crate) port: u16,
}
@ -8,6 +9,7 @@ pub(crate) struct Args {
impl Default for Args {
fn default() -> Self {
Self {
name: "anon".to_string(),
period_ms: 1_000,
port: 9000,
}
@ -27,12 +29,16 @@ impl App {
let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into()));
timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
Ok(Self {
client: Default::default(),
client: Client::new(args.name)?,
stream,
timer,
})
}
pub(crate) fn handle_stdin(&mut self, line: String) -> Result<()> {
self.client.handle_stdin(line)
}
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.step(cx) {
Ok(()) => Poll::Pending,
@ -88,28 +94,58 @@ struct Client {
}
impl Client {
fn new(name: String) -> Result<Self> {
let mut this = Self {
outbox: Default::default(),
sequence: 0,
};
this.handle_set_name(name)?;
Ok(this)
}
fn enqueue(&mut self, msg: &ToServer) -> Result<()> {
if !self.outbox.is_empty() {
bail!("Dropped message, outbox is full");
}
let frame = rmp_serde::to_vec(&msg)?;
self.outbox.push_back(frame.into());
Ok(())
}
fn handle_frame(&self, frame: Bytes) -> Result<()> {
match rmp_serde::from_slice(&frame)? {
ToClient::ChatLine { id, line } => tracing::info!(?id, ?line),
ToClient::ClientConnected { id } => tracing::info!(?id, "Connected"),
ToClient::ClientDisconnected { id } => tracing::info!(?id, "Disconnected"),
let ToClient { id, name, event } = rmp_serde::from_slice(&frame)?;
match event {
ToClientEvent::ChatLine { line } => tracing::info!(?name, ?line),
ToClientEvent::Connected => tracing::info!(?id, "Connected"),
ToClientEvent::Disconnected => tracing::info!(?id, ?name, "Disconnected"),
ToClientEvent::NameChanged { old_name } => {
tracing::info!(?id, ?old_name, new_name = name, "Name changed")
}
}
Ok(())
}
fn handle_timeout(&mut self) -> Result<()> {
if !self.outbox.is_empty() {
bail!("Dropped message, outbox is full");
}
fn handle_set_name(&mut self, name: String) -> Result<()> {
self.enqueue(&ToServer::SetName { name })
}
pub(crate) fn handle_stdin(&mut self, line: String) -> Result<()> {
let msg = ToServer::ChatLine {
line,
sequence: self.sequence,
};
self.sequence += 1;
self.enqueue(&msg)
}
fn handle_timeout(&mut self) -> Result<()> {
let msg = ToServer::ChatLine {
line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(),
sequence: self.sequence,
};
let frame = rmp_serde::to_vec(&msg)?;
self.outbox.push_back(frame.into());
self.sequence += 1;
Ok(())
self.enqueue(&msg)
}
fn poll_send(&mut self) -> Option<Bytes> {

View File

@ -24,6 +24,9 @@ impl Args {
loop {
match args.next().as_deref() {
None => break,
Some("--name") => {
client_args.name = args.next().context("Missing arg")?.to_string()
}
Some("--period-ms") => {
client_args.period_ms =
u32::from_str(&args.next().context("Missing arg")?)?
@ -68,6 +71,12 @@ async fn main() -> Result<()> {
async fn run_client(args: client::Args) -> Result<()> {
let mut ctrl_c = signal(SignalKind::interrupt())?;
let (stdin_tx, mut stdin_rx) = tokio::sync::mpsc::channel(1);
std::thread::spawn(move || {
for line in std::io::stdin().lines() {
stdin_tx.blocking_send(line).unwrap();
}
});
let mut app = client::App::new(args).await?;
loop {
let ctl = poll_fn(|cx| {
@ -77,6 +86,24 @@ async fn run_client(args: client::Args) -> Result<()> {
return Poll::Ready(ControlFlow::Break(Ok(())));
}
match stdin_rx.poll_recv(cx) {
Poll::Pending => {}
Poll::Ready(None) => {
return Poll::Ready(ControlFlow::Break(Err(anyhow!(
"Stdin thread somehow died"
))));
}
Poll::Ready(Some(line)) => {
let line = line.unwrap();
cx.waker().wake_by_ref();
if let Err(err) = app.handle_stdin(line) {
return Poll::Ready(ControlFlow::Break(
Err(err).context("app.handle_stdin"),
));
}
}
}
match app.poll_run(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(ControlFlow::Continue(())),

View File

@ -2,13 +2,22 @@ use crate::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
pub(crate) enum ToClient {
ChatLine { id: Id, line: String },
ClientConnected { id: Id },
ClientDisconnected { id: Id },
pub(crate) struct ToClient {
pub(crate) id: Id,
pub(crate) name: String,
pub(crate) event: ToClientEvent,
}
#[derive(Deserialize, Serialize)]
pub(crate) enum ToClientEvent {
ChatLine { line: String },
Connected,
Disconnected,
NameChanged { old_name: String },
}
#[derive(Deserialize, Serialize)]
pub(crate) enum ToServer {
ChatLine { line: String, sequence: u64 },
SetName { name: String },
}

View File

@ -1,5 +1,5 @@
pub(crate) use crate::messages::{ToClient, ToServer};
pub use anyhow::{Context as _, Result, bail};
pub(crate) use crate::messages::{ToClient, ToClientEvent, ToServer};
pub use anyhow::{Context as _, Result, anyhow, bail};
pub use bytes::Bytes;
pub use futures_core::stream::Stream;
pub use futures_sink::Sink;

View File

@ -104,23 +104,30 @@ impl Server {
}
fn handle_client_disconnected(&mut self, id: Id) -> Result<()> {
self.clients.remove(&id);
self.broadcast(&ToClient::ClientDisconnected { id })?;
let client = self
.clients
.remove(&id)
.context("Can't remove client who isn't there")?;
self.broadcast(&ToClient {
id,
name: client.name,
event: ToClientEvent::Disconnected,
})?;
Ok(())
}
fn handle_client_frame(&mut self, id: Id, frame: Bytes) -> Result<()> {
let msg = {
let (name, event) = {
let client = self
.clients
.get_mut(&id)
.context("Logic error: Stream has no associated client")?;
let Some(msg) = client.handle_frame(frame).context("client.handle_frame")? else {
let Some(event) = client.handle_frame(frame).context("client.handle_frame")? else {
return Ok(());
};
msg
(client.name.clone(), event)
};
self.broadcast(&msg)?;
self.broadcast(&ToClient { id, name, event })?;
Ok(())
}
@ -129,6 +136,7 @@ impl Server {
self.next_client_id += 1;
let client = Client {
id,
name: "anon".to_string(),
outbox: Default::default(),
sequence: 0,
};
@ -137,8 +145,12 @@ impl Server {
total = self.clients.len(),
"Accepted client"
);
self.broadcast(&ToClient {
id,
name: client.name.clone(),
event: ToClientEvent::Connected,
})?;
self.clients.insert(id, client);
self.broadcast(&ToClient::ClientConnected { id })?;
Ok(id)
}
@ -153,12 +165,13 @@ impl Server {
struct Client {
id: u64,
name: String,
outbox: VecDeque<Bytes>,
sequence: u64,
}
impl Client {
fn handle_frame(&mut self, frame: Bytes) -> Result<Option<ToClient>> {
fn handle_frame(&mut self, frame: Bytes) -> Result<Option<ToClientEvent>> {
match rmp_serde::from_slice(&frame)? {
ToServer::ChatLine { line, sequence } => {
if sequence != self.sequence {
@ -170,7 +183,11 @@ impl Client {
bail!("Sequence mismatch");
}
self.sequence += 1;
Ok(Some(ToClient::ChatLine { id: self.id, line }))
Ok(Some(ToClientEvent::ChatLine { line }))
}
ToServer::SetName { name } => {
let old_name = std::mem::replace(&mut self.name, name);
Ok(Some(ToClientEvent::NameChanged { old_name }))
}
}
}