commit 8a26584dd5c20adafec95306804829b1441f94df Author: _ <_@_> Date: Sun Feb 23 17:07:09 2025 +0000 trying out poll_* style diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..becdbae --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,524 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + +[[package]] +name = "anyhow" +version = "1.0.96" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b964d184e89d9b6b67dd2715bc8e74cf3107fb2b529990c90cf517326150bf4" + +[[package]] +name = "async_tlv" +version = "0.1.0" +dependencies = [ + "anyhow", + "bytes", + "futures-core", + "futures-sink", + "rmp-serde", + "serde", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "libc" +version = "0.2.169" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" + +[[package]] +name = "log" +version = "0.4.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" + +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "proc-macro2" +version = "1.0.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rmp" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "serde" +version = "1.0.218" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8dfc9d19bdbf6d17e22319da49161d5d0108e4188e8b680aef6299eed22df60" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.218" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f09503e191f4e797cb8aac08e9a4a4695c5edf6a2e70e376d961ddd5c969f82b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" + +[[package]] +name = "socket2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "syn" +version = "2.0.98" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + +[[package]] +name = "tokio" +version = "1.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +dependencies = [ + "backtrace", + "libc", + "mio", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-util" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tracing" +version = "0.1.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "unicode-ident" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00e2473a93778eb0bad35909dff6a10d28e63f792f16ed15e404fca9d5eeedbe" + +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..987ae3d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "async_tlv" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = "1.0.96" +bytes = "1.10.0" +futures-core = "0.3.31" +futures-sink = "0.3.31" +rmp-serde = "1.3.0" +serde = { version = "1.0.218", features = ["derive"] } +tokio = { version = "1.43.0", features = ["macros", "net", "rt-multi-thread", "signal", "time"] } +tokio-util = { version = "0.7.13", features = ["codec"] } +tracing = "0.1.41" +tracing-subscriber = "0.3.19" diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..78c6a4b --- /dev/null +++ b/src/client.rs @@ -0,0 +1,78 @@ +use crate::prelude::*; + +pub(crate) struct App { + outbox: VecDeque, + stream: Framed, + timer: Interval, +} + +impl App { + pub(crate) async fn new() -> Result { + let stream = TcpStream::connect("127.0.0.1:9000").await?; + let stream = Framed::new(stream, BytesCodec::new()); + let mut timer = tokio::time::interval(Duration::from_millis(1000)); + timer.set_missed_tick_behavior(MissedTickBehavior::Skip); + Ok(Self { + outbox: Default::default(), + stream, + timer, + }) + } + + pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll> { + { + let mut stream = pin!(&mut self.stream); + match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { + Poll::Pending => {} + Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), + Poll::Ready(Ok(())) => {} + } + } + + if ! self.outbox.is_empty() { + let mut stream = pin!(&mut self.stream); + match <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { + Poll::Pending => {} + Poll::Ready(Err(err)) => { + return Poll::Ready( + Err(err).context("Can't check network write half for readiness"), + ); + } + Poll::Ready(Ok(())) => { + cx.waker().wake_by_ref(); + let frame = rmp_serde::to_vec(&self.outbox.pop_front())?; + if let Err(err) = stream.start_send(Bytes::from(frame)) { + return Poll::Ready(Err(err).context("start_send")); + } + tracing::debug!("Started send"); + } + } + } + + let stream = pin!(&mut self.stream); + match stream.poll_next(cx) { + Poll::Pending => {} + Poll::Ready(None) => return Poll::Ready(Err(anyhow!("Server closed cxn"))), + Poll::Ready(Some(frame)) => { + cx.waker().wake_by_ref(); + let frame = frame.context("network framing decode")?; + match rmp_serde::from_slice(&frame)? { + ToClient::ChatLine { id, line } => { + tracing::info!(?id, ?line); + } + } + } + } + + if self.timer.poll_tick(cx).is_ready() { + cx.waker().wake_by_ref(); + if self.outbox.is_empty() { + self.outbox.push_back(ToServer::ChatLine { + line: "tick".to_string(), + }); + } + } + + Poll::Pending + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..ff94bdb --- /dev/null +++ b/src/main.rs @@ -0,0 +1,78 @@ +mod client; +mod messages; +mod prelude; +mod server; + +use prelude::*; + +struct Args { + cmd: Subcommand, +} + +enum Subcommand { + Client, + Server, +} + +impl Args { + fn new() -> Result { + let mut args = std::env::args().skip(1); + let cmd = match args.next().as_deref() { + None => bail!("Need a subcommand, e.g. `client` or `server`"), + Some("client") => Subcommand::Client, + Some("server") => Subcommand::Server, + Some(_) => bail!("Invalid subcommand"), + }; + Ok(Self { cmd }) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::new()?; + tracing_subscriber::fmt::init(); + + match args.cmd { + Subcommand::Client => run_client().await, + Subcommand::Server => run_server().await, + } +} + +async fn run_client() -> Result<()> { + let mut ctrl_c = signal(SignalKind::interrupt())?; + let mut app = client::App::new().await?; + loop { + let ctl = poll_fn(|cx| { + tracing::debug!("Loop"); + if ctrl_c.poll_recv(cx).is_ready() { + tracing::debug!("Caught Ctrl-C"); + return Poll::Ready(ControlFlow::Break(Ok(()))); + } + + match app.poll_run(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(())) => Poll::Ready(ControlFlow::Continue(())), + Poll::Ready(Err(err)) => Poll::Ready(ControlFlow::Break(Err(err))), + } + }) + .await; + if let ControlFlow::Break(result) = ctl { + return result; + } + } +} + +async fn run_server() -> Result<()> { + let mut ctrl_c = signal(SignalKind::interrupt())?; + let mut app = server::App::new().await?; + poll_fn(|cx| { + tracing::debug!("Loop"); + if ctrl_c.poll_recv(cx).is_ready() { + tracing::debug!("Caught Ctrl-C"); + return Poll::Ready(Ok(())); + } + + app.poll_run(cx) + }) + .await +} diff --git a/src/messages.rs b/src/messages.rs new file mode 100644 index 0000000..63311c9 --- /dev/null +++ b/src/messages.rs @@ -0,0 +1,11 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize)] +pub(crate) enum ToClient { + ChatLine { id: u64, line: String }, +} + +#[derive(Deserialize, Serialize)] +pub(crate) enum ToServer { + ChatLine { line: String }, +} diff --git a/src/prelude.rs b/src/prelude.rs new file mode 100644 index 0000000..d33b33a --- /dev/null +++ b/src/prelude.rs @@ -0,0 +1,20 @@ +pub(crate) use crate::messages::{ToClient, ToServer}; +pub use anyhow::{Context as _, Result, anyhow, bail}; +pub use bytes::Bytes; +pub use futures_core::stream::Stream; +pub use futures_sink::Sink; +pub use std::{ + collections::VecDeque, + future::poll_fn, + ops::ControlFlow, + pin::pin, + rc::Rc, + task::{Context, Poll}, + time::Duration, +}; +pub use tokio::{ + net::{TcpListener, TcpStream}, + signal::unix::{SignalKind, signal}, + time::{Interval, MissedTickBehavior}, +}; +pub use tokio_util::codec::{BytesCodec, Framed}; diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..d527108 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,124 @@ +use crate::prelude::*; + +pub(crate) struct App { + clients: Vec, + listener: TcpListener, + next_client_id: u64, +} + +struct Client { + closed: bool, + id: u64, + outbox: VecDeque>, + stream: Framed, +} + +impl App { + pub(crate) async fn new() -> Result { + let listener = TcpListener::bind("0.0.0.0:9000").await?; + + Ok(Self { + clients: Default::default(), + listener, + next_client_id: 1, + }) + } + + pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll> { + let previous_client_count = self.clients.len(); + + let mut new_messages = vec![]; + for client in &mut self.clients { + { + let mut stream = pin!(&mut client.stream); + match <_ as futures_sink::Sink>::poll_flush(stream.as_mut(), cx) { + Poll::Pending => {} + Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), + Poll::Ready(Ok(())) => {} + } + } + + if ! client.outbox.is_empty() { + let mut stream = pin!(&mut client.stream); + match <_ as futures_sink::Sink>::poll_ready(stream.as_mut(), cx) { + Poll::Pending => {} + Poll::Ready(Err(err)) => { + return Poll::Ready(Err(err).context("Can't check network write half for readiness")); + } + Poll::Ready(Ok(())) => { + cx.waker().wake_by_ref(); + let Some(msg) = client.outbox.pop_front() else { + return Poll::Ready(Err(anyhow!("Can't pop from outbox even though we just checked it was not empty"))); + }; + let frame = rmp_serde::to_vec(msg.as_ref())?; + if let Err(err) = stream.start_send(Bytes::from(frame)) { + return Poll::Ready(Err(err).context("start_send")); + } + tracing::debug!("Started send"); + } + } + } + + let stream = pin!(&mut client.stream); + match stream.poll_next(cx) { + Poll::Pending => {} + Poll::Ready(None) => { + cx.waker().wake_by_ref(); + client.closed = true; + } + Poll::Ready(Some(frame)) => { + tracing::info!("Got network data"); + cx.waker().wake_by_ref(); + let frame = frame.context("network framing decode")?; + let msg: ToServer = rmp_serde::from_slice(&frame)?; + match msg { + ToServer::ChatLine { line } => { + tracing::info!(id = client.id, ?line); + new_messages.push(Rc::new(ToClient::ChatLine { + id: client.id, + line, + })); + } + } + } + } + } + + self.clients.retain(|client| { + if client.closed { + tracing::info!(id = client.id, "Closing client"); + } + !client.closed + }); + + for client in &mut self.clients { + for msg in &new_messages { + client.outbox.push_back(Rc::clone(msg)); + } + } + + match self.listener.poll_accept(cx) { + Poll::Pending => {} + Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Ok((stream, _addr))) => { + cx.waker().wake_by_ref(); + let stream = Framed::new(stream, BytesCodec::new()); + let client = Client { + closed: false, + id: self.next_client_id, + outbox: Default::default(), + stream, + }; + self.next_client_id += 1; + tracing::info!(id = client.id, "Accepted client"); + self.clients.push(client); + } + } + + if self.clients.len() != previous_client_count { + tracing::info!(client_count = self.clients.len()); + } + + Poll::Pending + } +}