diff --git a/src/client.rs b/src/client.rs index 78c6a4b..ffe1938 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,19 +1,35 @@ use crate::prelude::*; +pub(crate) struct Args { + pub(crate) period_ms: u32, + pub(crate) port: u16, +} + +impl Default for Args { + fn default() -> Self { + Self { + period_ms: 1_000, + port: 9000, + } + } +} + pub(crate) struct App { outbox: VecDeque, - stream: Framed, + sequence: u64, + stream: Framed, timer: Interval, } impl App { - pub(crate) async fn new() -> Result { - let stream = TcpStream::connect("127.0.0.1:9000").await?; - let stream = Framed::new(stream, BytesCodec::new()); - let mut timer = tokio::time::interval(Duration::from_millis(1000)); + pub(crate) async fn new(args: Args) -> Result { + let stream = TcpStream::connect(("127.0.0.1", args.port)).await?; + let stream = Framed::new(stream, LengthDelimitedCodec::new()); + let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into())); timer.set_missed_tick_behavior(MissedTickBehavior::Skip); Ok(Self { outbox: Default::default(), + sequence: 0, stream, timer, }) @@ -29,7 +45,7 @@ impl App { } } - if ! self.outbox.is_empty() { + if !self.outbox.is_empty() { let mut stream = pin!(&mut self.stream); match <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { Poll::Pending => {} @@ -68,8 +84,10 @@ impl App { cx.waker().wake_by_ref(); if self.outbox.is_empty() { self.outbox.push_back(ToServer::ChatLine { - line: "tick".to_string(), + line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(), + sequence: self.sequence, }); + self.sequence += 1; } } diff --git a/src/main.rs b/src/main.rs index ff94bdb..528cbbe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,8 +10,8 @@ struct Args { } enum Subcommand { - Client, - Server, + Client(client::Args), + Server(server::Args), } impl Args { @@ -19,8 +19,36 @@ impl Args { let mut args = std::env::args().skip(1); let cmd = match args.next().as_deref() { None => bail!("Need a subcommand, e.g. `client` or `server`"), - Some("client") => Subcommand::Client, - Some("server") => Subcommand::Server, + Some("client") => { + let mut client_args = client::Args::default(); + loop { + match args.next().as_deref() { + None => break, + Some("--period-ms") => { + client_args.period_ms = + u32::from_str(&args.next().context("Missing arg")?)? + } + Some("--port") => { + client_args.port = u16::from_str(&args.next().context("Missing arg")?)? + } + Some(_) => bail!("Invalid arg"), + } + } + Subcommand::Client(client_args) + } + Some("server") => { + let mut server_args = server::Args::default(); + loop { + match args.next().as_deref() { + None => break, + Some("--port") => { + server_args.port = u16::from_str(&args.next().context("Missing arg")?)? + } + Some(_) => bail!("Invalid arg"), + } + } + Subcommand::Server(server_args) + } Some(_) => bail!("Invalid subcommand"), }; Ok(Self { cmd }) @@ -33,14 +61,14 @@ async fn main() -> Result<()> { tracing_subscriber::fmt::init(); match args.cmd { - Subcommand::Client => run_client().await, - Subcommand::Server => run_server().await, + Subcommand::Client(args) => run_client(args).await, + Subcommand::Server(args) => run_server(args).await, } } -async fn run_client() -> Result<()> { +async fn run_client(args: client::Args) -> Result<()> { let mut ctrl_c = signal(SignalKind::interrupt())?; - let mut app = client::App::new().await?; + let mut app = client::App::new(args).await?; loop { let ctl = poll_fn(|cx| { tracing::debug!("Loop"); @@ -62,9 +90,9 @@ async fn run_client() -> Result<()> { } } -async fn run_server() -> Result<()> { +async fn run_server(args: server::Args) -> Result<()> { let mut ctrl_c = signal(SignalKind::interrupt())?; - let mut app = server::App::new().await?; + let mut app = server::App::new(args).await?; poll_fn(|cx| { tracing::debug!("Loop"); if ctrl_c.poll_recv(cx).is_ready() { diff --git a/src/messages.rs b/src/messages.rs index 63311c9..4370de4 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -7,5 +7,5 @@ pub(crate) enum ToClient { #[derive(Deserialize, Serialize)] pub(crate) enum ToServer { - ChatLine { line: String }, + ChatLine { line: String, sequence: u64 }, } diff --git a/src/prelude.rs b/src/prelude.rs index d33b33a..88d0ab1 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -9,6 +9,7 @@ pub use std::{ ops::ControlFlow, pin::pin, rc::Rc, + str::FromStr, task::{Context, Poll}, time::Duration, }; @@ -17,4 +18,5 @@ pub use tokio::{ signal::unix::{SignalKind, signal}, time::{Interval, MissedTickBehavior}, }; -pub use tokio_util::codec::{BytesCodec, Framed}; +// Don't use BytesCodec, it is _nonsense_ +pub use tokio_util::codec::{Framed, LengthDelimitedCodec}; diff --git a/src/server.rs b/src/server.rs index d527108..0f5be01 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,21 +1,24 @@ use crate::prelude::*; +pub(crate) struct Args { + pub(crate) port: u16, +} + +impl Default for Args { + fn default() -> Self { + Self { port: 9000 } + } +} + pub(crate) struct App { clients: Vec, listener: TcpListener, next_client_id: u64, } -struct Client { - closed: bool, - id: u64, - outbox: VecDeque>, - stream: Framed, -} - impl App { - pub(crate) async fn new() -> Result { - let listener = TcpListener::bind("0.0.0.0:9000").await?; + pub(crate) async fn new(args: Args) -> Result { + let listener = TcpListener::bind(("0.0.0.0", args.port)).await?; Ok(Self { clients: Default::default(), @@ -38,17 +41,21 @@ impl App { } } - if ! client.outbox.is_empty() { + if !client.outbox.is_empty() { let mut stream = pin!(&mut client.stream); match <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { Poll::Pending => {} Poll::Ready(Err(err)) => { - return Poll::Ready(Err(err).context("Can't check network write half for readiness")); + return Poll::Ready( + Err(err).context("Can't check network write half for readiness"), + ); } Poll::Ready(Ok(())) => { cx.waker().wake_by_ref(); let Some(msg) = client.outbox.pop_front() else { - return Poll::Ready(Err(anyhow!("Can't pop from outbox even though we just checked it was not empty"))); + return Poll::Ready(Err(anyhow!( + "Can't pop from outbox even though we just checked it was not empty" + ))); }; let frame = rmp_serde::to_vec(msg.as_ref())?; if let Err(err) = stream.start_send(Bytes::from(frame)) { @@ -59,27 +66,27 @@ impl App { } } - let stream = pin!(&mut client.stream); - match stream.poll_next(cx) { + match client.poll_next(cx) { Poll::Pending => {} - Poll::Ready(None) => { - cx.waker().wake_by_ref(); - client.closed = true; + Poll::Ready(Err(err)) => { + tracing::error!(?err, "client.poll_next error"); } - Poll::Ready(Some(frame)) => { - tracing::info!("Got network data"); - cx.waker().wake_by_ref(); - let frame = frame.context("network framing decode")?; - let msg: ToServer = rmp_serde::from_slice(&frame)?; - match msg { - ToServer::ChatLine { line } => { - tracing::info!(id = client.id, ?line); - new_messages.push(Rc::new(ToClient::ChatLine { - id: client.id, - line, - })); - } + Poll::Ready(Ok(None)) => client.closed = true, + Poll::Ready(Ok(Some(ToServer::ChatLine { line, sequence }))) => { + tracing::info!(id = client.id, ?line); + if sequence != client.sequence { + tracing::error!( + expected = client.sequence, + actual = sequence, + "Sequence mismatch" + ); + return Poll::Ready(Err(anyhow!("Sequence mismatch"))); } + client.sequence += 1; + new_messages.push(Rc::new(ToClient::ChatLine { + id: client.id, + line, + })); } } } @@ -102,11 +109,12 @@ impl App { Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), Poll::Ready(Ok((stream, _addr))) => { cx.waker().wake_by_ref(); - let stream = Framed::new(stream, BytesCodec::new()); + let stream = Framed::new(stream, LengthDelimitedCodec::new()); let client = Client { closed: false, id: self.next_client_id, outbox: Default::default(), + sequence: 0, stream, }; self.next_client_id += 1; @@ -122,3 +130,37 @@ impl App { Poll::Pending } } + +struct Client { + closed: bool, + id: u64, + outbox: VecDeque>, + sequence: u64, + stream: Framed, +} + +impl Client { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + let stream = pin!(&mut self.stream); + let Poll::Ready(frame_opt) = stream.poll_next(cx) else { + return Poll::Pending; + }; + + let Some(frame) = frame_opt else { + return Poll::Ready(Ok(None)); + }; + + cx.waker().wake_by_ref(); + tracing::info!("Got network data"); + + let frame = match frame { + Err(err) => return Poll::Ready(Err(err).context("Network framing decode error")), + Ok(x) => x, + }; + let msg = match rmp_serde::from_slice(&frame) { + Err(err) => return Poll::Ready(Err(err).context("MsgPack decode error")), + Ok(x) => x, + }; + Poll::Ready(Ok(Some(msg))) + } +}