fix a bug and add some args

main
_ 2025-02-23 18:06:07 +00:00
parent 8a26584dd5
commit 40e4a30166
5 changed files with 140 additions and 50 deletions

View File

@ -1,19 +1,35 @@
use crate::prelude::*;
pub(crate) struct Args {
pub(crate) period_ms: u32,
pub(crate) port: u16,
}
impl Default for Args {
fn default() -> Self {
Self {
period_ms: 1_000,
port: 9000,
}
}
}
pub(crate) struct App {
outbox: VecDeque<ToServer>,
stream: Framed<TcpStream, BytesCodec>,
sequence: u64,
stream: Framed<TcpStream, LengthDelimitedCodec>,
timer: Interval,
}
impl App {
pub(crate) async fn new() -> Result<Self> {
let stream = TcpStream::connect("127.0.0.1:9000").await?;
let stream = Framed::new(stream, BytesCodec::new());
let mut timer = tokio::time::interval(Duration::from_millis(1000));
pub(crate) async fn new(args: Args) -> Result<Self> {
let stream = TcpStream::connect(("127.0.0.1", args.port)).await?;
let stream = Framed::new(stream, LengthDelimitedCodec::new());
let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into()));
timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
Ok(Self {
outbox: Default::default(),
sequence: 0,
stream,
timer,
})
@ -29,7 +45,7 @@ impl App {
}
}
if ! self.outbox.is_empty() {
if !self.outbox.is_empty() {
let mut stream = pin!(&mut self.stream);
match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) {
Poll::Pending => {}
@ -68,8 +84,10 @@ impl App {
cx.waker().wake_by_ref();
if self.outbox.is_empty() {
self.outbox.push_back(ToServer::ChatLine {
line: "tick".to_string(),
line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(),
sequence: self.sequence,
});
self.sequence += 1;
}
}

View File

@ -10,8 +10,8 @@ struct Args {
}
enum Subcommand {
Client,
Server,
Client(client::Args),
Server(server::Args),
}
impl Args {
@ -19,8 +19,36 @@ impl Args {
let mut args = std::env::args().skip(1);
let cmd = match args.next().as_deref() {
None => bail!("Need a subcommand, e.g. `client` or `server`"),
Some("client") => Subcommand::Client,
Some("server") => Subcommand::Server,
Some("client") => {
let mut client_args = client::Args::default();
loop {
match args.next().as_deref() {
None => break,
Some("--period-ms") => {
client_args.period_ms =
u32::from_str(&args.next().context("Missing arg")?)?
}
Some("--port") => {
client_args.port = u16::from_str(&args.next().context("Missing arg")?)?
}
Some(_) => bail!("Invalid arg"),
}
}
Subcommand::Client(client_args)
}
Some("server") => {
let mut server_args = server::Args::default();
loop {
match args.next().as_deref() {
None => break,
Some("--port") => {
server_args.port = u16::from_str(&args.next().context("Missing arg")?)?
}
Some(_) => bail!("Invalid arg"),
}
}
Subcommand::Server(server_args)
}
Some(_) => bail!("Invalid subcommand"),
};
Ok(Self { cmd })
@ -33,14 +61,14 @@ async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
match args.cmd {
Subcommand::Client => run_client().await,
Subcommand::Server => run_server().await,
Subcommand::Client(args) => run_client(args).await,
Subcommand::Server(args) => run_server(args).await,
}
}
async fn run_client() -> Result<()> {
async fn run_client(args: client::Args) -> Result<()> {
let mut ctrl_c = signal(SignalKind::interrupt())?;
let mut app = client::App::new().await?;
let mut app = client::App::new(args).await?;
loop {
let ctl = poll_fn(|cx| {
tracing::debug!("Loop");
@ -62,9 +90,9 @@ async fn run_client() -> Result<()> {
}
}
async fn run_server() -> Result<()> {
async fn run_server(args: server::Args) -> Result<()> {
let mut ctrl_c = signal(SignalKind::interrupt())?;
let mut app = server::App::new().await?;
let mut app = server::App::new(args).await?;
poll_fn(|cx| {
tracing::debug!("Loop");
if ctrl_c.poll_recv(cx).is_ready() {

View File

@ -7,5 +7,5 @@ pub(crate) enum ToClient {
#[derive(Deserialize, Serialize)]
pub(crate) enum ToServer {
ChatLine { line: String },
ChatLine { line: String, sequence: u64 },
}

View File

@ -9,6 +9,7 @@ pub use std::{
ops::ControlFlow,
pin::pin,
rc::Rc,
str::FromStr,
task::{Context, Poll},
time::Duration,
};
@ -17,4 +18,5 @@ pub use tokio::{
signal::unix::{SignalKind, signal},
time::{Interval, MissedTickBehavior},
};
pub use tokio_util::codec::{BytesCodec, Framed};
// Don't use BytesCodec, it is _nonsense_
pub use tokio_util::codec::{Framed, LengthDelimitedCodec};

View File

@ -1,21 +1,24 @@
use crate::prelude::*;
pub(crate) struct Args {
pub(crate) port: u16,
}
impl Default for Args {
fn default() -> Self {
Self { port: 9000 }
}
}
pub(crate) struct App {
clients: Vec<Client>,
listener: TcpListener,
next_client_id: u64,
}
struct Client {
closed: bool,
id: u64,
outbox: VecDeque<Rc<ToClient>>,
stream: Framed<TcpStream, BytesCodec>,
}
impl App {
pub(crate) async fn new() -> Result<Self> {
let listener = TcpListener::bind("0.0.0.0:9000").await?;
pub(crate) async fn new(args: Args) -> Result<Self> {
let listener = TcpListener::bind(("0.0.0.0", args.port)).await?;
Ok(Self {
clients: Default::default(),
@ -38,17 +41,21 @@ impl App {
}
}
if ! client.outbox.is_empty() {
if !client.outbox.is_empty() {
let mut stream = pin!(&mut client.stream);
match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) {
Poll::Pending => {}
Poll::Ready(Err(err)) => {
return Poll::Ready(Err(err).context("Can't check network write half for readiness"));
return Poll::Ready(
Err(err).context("Can't check network write half for readiness"),
);
}
Poll::Ready(Ok(())) => {
cx.waker().wake_by_ref();
let Some(msg) = client.outbox.pop_front() else {
return Poll::Ready(Err(anyhow!("Can't pop from outbox even though we just checked it was not empty")));
return Poll::Ready(Err(anyhow!(
"Can't pop from outbox even though we just checked it was not empty"
)));
};
let frame = rmp_serde::to_vec(msg.as_ref())?;
if let Err(err) = stream.start_send(Bytes::from(frame)) {
@ -59,27 +66,27 @@ impl App {
}
}
let stream = pin!(&mut client.stream);
match stream.poll_next(cx) {
match client.poll_next(cx) {
Poll::Pending => {}
Poll::Ready(None) => {
cx.waker().wake_by_ref();
client.closed = true;
Poll::Ready(Err(err)) => {
tracing::error!(?err, "client.poll_next error");
}
Poll::Ready(Some(frame)) => {
tracing::info!("Got network data");
cx.waker().wake_by_ref();
let frame = frame.context("network framing decode")?;
let msg: ToServer = rmp_serde::from_slice(&frame)?;
match msg {
ToServer::ChatLine { line } => {
tracing::info!(id = client.id, ?line);
new_messages.push(Rc::new(ToClient::ChatLine {
id: client.id,
line,
}));
}
Poll::Ready(Ok(None)) => client.closed = true,
Poll::Ready(Ok(Some(ToServer::ChatLine { line, sequence }))) => {
tracing::info!(id = client.id, ?line);
if sequence != client.sequence {
tracing::error!(
expected = client.sequence,
actual = sequence,
"Sequence mismatch"
);
return Poll::Ready(Err(anyhow!("Sequence mismatch")));
}
client.sequence += 1;
new_messages.push(Rc::new(ToClient::ChatLine {
id: client.id,
line,
}));
}
}
}
@ -102,11 +109,12 @@ impl App {
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Ready(Ok((stream, _addr))) => {
cx.waker().wake_by_ref();
let stream = Framed::new(stream, BytesCodec::new());
let stream = Framed::new(stream, LengthDelimitedCodec::new());
let client = Client {
closed: false,
id: self.next_client_id,
outbox: Default::default(),
sequence: 0,
stream,
};
self.next_client_id += 1;
@ -122,3 +130,37 @@ impl App {
Poll::Pending
}
}
struct Client {
closed: bool,
id: u64,
outbox: VecDeque<Rc<ToClient>>,
sequence: u64,
stream: Framed<TcpStream, LengthDelimitedCodec>,
}
impl Client {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<ToServer>>> {
let stream = pin!(&mut self.stream);
let Poll::Ready(frame_opt) = stream.poll_next(cx) else {
return Poll::Pending;
};
let Some(frame) = frame_opt else {
return Poll::Ready(Ok(None));
};
cx.waker().wake_by_ref();
tracing::info!("Got network data");
let frame = match frame {
Err(err) => return Poll::Ready(Err(err).context("Network framing decode error")),
Ok(x) => x,
};
let msg = match rmp_serde::from_slice(&frame) {
Err(err) => return Poll::Ready(Err(err).context("MsgPack decode error")),
Ok(x) => x,
};
Poll::Ready(Ok(Some(msg)))
}
}