Compare commits
	
		
			8 Commits 
		
	
	
		
			4548f9daee
			...
			c4de3ad00d
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  _ | c4de3ad00d | |
|  _ | e3c23a0931 | |
|  _ | 2606946240 | |
|  _ | 7119603145 | |
|  _ | a2b286b01d | |
|  _ | 425503d66d | |
|  _ | 4d018413f8 | |
|  _ | fd93df42e1 | 
							
								
								
									
										116
									
								
								src/client.rs
								
								
								
								
							
							
						
						
									
										116
									
								
								src/client.rs
								
								
								
								
							|  | @ -15,8 +15,7 @@ impl Default for Args { | |||
| } | ||||
| 
 | ||||
| pub(crate) struct App { | ||||
|     outbox: VecDeque<ToServer>, | ||||
|     sequence: u64, | ||||
|     client: Client, | ||||
|     stream: Framed<TcpStream, LengthDelimitedCodec>, | ||||
|     timer: Interval, | ||||
| } | ||||
|  | @ -28,69 +27,92 @@ impl App { | |||
|         let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into())); | ||||
|         timer.set_missed_tick_behavior(MissedTickBehavior::Skip); | ||||
|         Ok(Self { | ||||
|             outbox: Default::default(), | ||||
|             sequence: 0, | ||||
|             client: Default::default(), | ||||
|             stream, | ||||
|             timer, | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||||
|         match self.step(cx) { | ||||
|             Ok(()) => Poll::Pending, | ||||
|             Err(err) => Poll::Ready(Err(err)), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn step(&mut self, cx: &mut Context<'_>) -> Result<()> { | ||||
|         let mut stream = pin!(&mut self.stream); | ||||
|         if let Poll::Ready(result) = | ||||
|             <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) | ||||
|         { | ||||
|             let mut stream = pin!(&mut self.stream); | ||||
|             result?; | ||||
|             if let Some(frame) = self.client.poll_send() { | ||||
|                 stream | ||||
|                     .as_mut() | ||||
|                     .start_send(frame) | ||||
|                     .context("stream.start_send")?; | ||||
|                 tracing::debug!("Started send"); | ||||
|             } | ||||
|             match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) { | ||||
|                 Poll::Pending => {} | ||||
|                 Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), | ||||
|                 Poll::Ready(Ok(())) => {} | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         if !self.outbox.is_empty() { | ||||
|             let mut stream = pin!(&mut self.stream); | ||||
|             match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) { | ||||
|                 Poll::Pending => {} | ||||
|                 Poll::Ready(Err(err)) => { | ||||
|                     return Poll::Ready( | ||||
|                         Err(err).context("Can't check network write half for readiness"), | ||||
|                     ); | ||||
|                 } | ||||
|                 Poll::Ready(Ok(())) => { | ||||
|                     cx.waker().wake_by_ref(); | ||||
|                     let frame = rmp_serde::to_vec(&self.outbox.pop_front())?; | ||||
|                     if let Err(err) = stream.start_send(Bytes::from(frame)) { | ||||
|                         return Poll::Ready(Err(err).context("start_send")); | ||||
|                     } | ||||
|                     tracing::debug!("Started send"); | ||||
|                 Poll::Ready(result) => { | ||||
|                     result.context("poll_flush")?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         let stream = pin!(&mut self.stream); | ||||
|         match stream.poll_next(cx) { | ||||
|             Poll::Pending => {} | ||||
|             Poll::Ready(None) => return Poll::Ready(Err(anyhow!("Server closed cxn"))), | ||||
|             Poll::Ready(Some(frame)) => { | ||||
|                 cx.waker().wake_by_ref(); | ||||
|                 let frame = frame.context("network framing decode")?; | ||||
|                 match rmp_serde::from_slice(&frame)? { | ||||
|                     ToClient::ChatLine { id, line } => { | ||||
|                         tracing::info!(?id, ?line); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         if let Poll::Ready(frame_opt) = stream.as_mut().poll_next(cx) { | ||||
|             let frame = frame_opt.context("Server closed cxn")?; | ||||
|             cx.waker().wake_by_ref(); | ||||
|             let frame = frame.context("network framing decode")?; | ||||
|             self.client | ||||
|                 .handle_frame(frame.into()) | ||||
|                 .context("client.handle_frame")?; | ||||
|         } | ||||
| 
 | ||||
|         if self.timer.poll_tick(cx).is_ready() { | ||||
|             cx.waker().wake_by_ref(); | ||||
|             if self.outbox.is_empty() { | ||||
|                 self.outbox.push_back(ToServer::ChatLine { | ||||
|                     line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(), | ||||
|                     sequence: self.sequence, | ||||
|                 }); | ||||
|                 self.sequence += 1; | ||||
|             } | ||||
|             self.client | ||||
|                 .handle_timeout() | ||||
|                 .context("client.handle_timeout")?; | ||||
|         } | ||||
| 
 | ||||
|         Poll::Pending | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Default)] | ||||
| struct Client { | ||||
|     outbox: VecDeque<Bytes>, | ||||
|     sequence: u64, | ||||
| } | ||||
| 
 | ||||
| impl Client { | ||||
|     fn handle_frame(&self, frame: Bytes) -> Result<()> { | ||||
|         match rmp_serde::from_slice(&frame)? { | ||||
|             ToClient::ChatLine { id, line } => tracing::info!(?id, ?line), | ||||
|             ToClient::ClientConnected { id } => tracing::info!(?id, "Connected"), | ||||
|             ToClient::ClientDisconnected { id } => tracing::info!(?id, "Disconnected"), | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     fn handle_timeout(&mut self) -> Result<()> { | ||||
|         if !self.outbox.is_empty() { | ||||
|             bail!("Dropped message, outbox is full"); | ||||
|         } | ||||
| 
 | ||||
|         let msg = ToServer::ChatLine { | ||||
|             line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(), | ||||
|             sequence: self.sequence, | ||||
|         }; | ||||
|         let frame = rmp_serde::to_vec(&msg)?; | ||||
|         self.outbox.push_back(frame.into()); | ||||
|         self.sequence += 1; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     fn poll_send(&mut self) -> Option<Bytes> { | ||||
|         self.outbox.pop_front() | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -1,8 +1,11 @@ | |||
| use crate::prelude::*; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| 
 | ||||
| #[derive(Deserialize, Serialize)] | ||||
| pub(crate) enum ToClient { | ||||
|     ChatLine { id: u64, line: String }, | ||||
|     ChatLine { id: Id, line: String }, | ||||
|     ClientConnected { id: Id }, | ||||
|     ClientDisconnected { id: Id }, | ||||
| } | ||||
| 
 | ||||
| #[derive(Deserialize, Serialize)] | ||||
|  |  | |||
|  | @ -1,14 +1,13 @@ | |||
| pub(crate) use crate::messages::{ToClient, ToServer}; | ||||
| pub use anyhow::{Context as _, Result, anyhow, bail}; | ||||
| pub use anyhow::{Context as _, Result, bail}; | ||||
| pub use bytes::Bytes; | ||||
| pub use futures_core::stream::Stream; | ||||
| pub use futures_sink::Sink; | ||||
| pub use std::{ | ||||
|     collections::VecDeque, | ||||
|     collections::{BTreeMap, VecDeque}, | ||||
|     future::poll_fn, | ||||
|     ops::ControlFlow, | ||||
|     pin::pin, | ||||
|     rc::Rc, | ||||
|     str::FromStr, | ||||
|     task::{Context, Poll}, | ||||
|     time::Duration, | ||||
|  | @ -20,3 +19,5 @@ pub use tokio::{ | |||
| }; | ||||
| // Don't use BytesCodec, it is _nonsense_
 | ||||
| pub use tokio_util::codec::{Framed, LengthDelimitedCodec}; | ||||
| 
 | ||||
| pub type Id = u64; | ||||
|  |  | |||
							
								
								
									
										249
									
								
								src/server.rs
								
								
								
								
							
							
						
						
									
										249
									
								
								src/server.rs
								
								
								
								
							|  | @ -11,9 +11,9 @@ impl Default for Args { | |||
| } | ||||
| 
 | ||||
| pub(crate) struct App { | ||||
|     clients: Vec<Client>, | ||||
|     client_streams: BTreeMap<Id, Framed<TcpStream, LengthDelimitedCodec>>, | ||||
|     listener: TcpListener, | ||||
|     next_client_id: u64, | ||||
|     server: Server, | ||||
| } | ||||
| 
 | ||||
| impl App { | ||||
|  | @ -21,147 +21,170 @@ impl App { | |||
|         let listener = TcpListener::bind(("0.0.0.0", args.port)).await?; | ||||
| 
 | ||||
|         Ok(Self { | ||||
|             clients: Default::default(), | ||||
|             client_streams: Default::default(), | ||||
|             listener, | ||||
|             next_client_id: 1, | ||||
|             server: Default::default(), | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|     pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||||
|         let previous_client_count = self.clients.len(); | ||||
|         match self.step(cx) { | ||||
|             Ok(()) => Poll::Pending, | ||||
|             Err(err) => Poll::Ready(Err(err)), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|         let mut new_messages = vec![]; | ||||
|         for client in &mut self.clients { | ||||
|     fn step(&mut self, cx: &mut Context<'_>) -> Result<()> { | ||||
|         let mut clients_to_remove = vec![]; | ||||
| 
 | ||||
|         // Pump the network streams
 | ||||
|         for (id, stream) in &mut self.client_streams { | ||||
|             let mut stream = pin!(stream); | ||||
| 
 | ||||
|             // Try to send from the client's outbox
 | ||||
|             if let Poll::Ready(result) = | ||||
|                 <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) | ||||
|             { | ||||
|                 let mut stream = pin!(&mut client.stream); | ||||
|                 match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) { | ||||
|                     Poll::Pending => {} | ||||
|                     Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), | ||||
|                     Poll::Ready(Ok(())) => {} | ||||
|                 result.context("Can't check network write half for readiness")?; | ||||
|                 if let Some(frame) = self.server.poll_send(*id)? { | ||||
|                     stream.as_mut().start_send(frame).context("start_send")?; | ||||
|                     if let Poll::Ready(result) = | ||||
|                         <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) | ||||
|                     { | ||||
|                         result.context("poll_flush")?; | ||||
|                     } | ||||
|                     tracing::debug!("Started send"); | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             if !client.outbox.is_empty() { | ||||
|                 let mut stream = pin!(&mut client.stream); | ||||
|                 match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) { | ||||
|                     Poll::Pending => {} | ||||
|                     Poll::Ready(Err(err)) => { | ||||
|                         return Poll::Ready( | ||||
|                             Err(err).context("Can't check network write half for readiness"), | ||||
|                         ); | ||||
|                     } | ||||
|                     Poll::Ready(Ok(())) => { | ||||
|                         cx.waker().wake_by_ref(); | ||||
|                         let Some(msg) = client.outbox.pop_front() else { | ||||
|                             return Poll::Ready(Err(anyhow!( | ||||
|                                 "Can't pop from outbox even though we just checked it was not empty" | ||||
|                             ))); | ||||
|                         }; | ||||
|                         let frame = rmp_serde::to_vec(msg.as_ref())?; | ||||
|                         if let Err(err) = stream.start_send(Bytes::from(frame)) { | ||||
|                             return Poll::Ready(Err(err).context("start_send")); | ||||
|                         } | ||||
|                         tracing::debug!("Started send"); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             match client.poll_next(cx) { | ||||
|             // Try to read data in
 | ||||
|             match stream.as_mut().poll_next(cx) { | ||||
|                 Poll::Pending => {} | ||||
|                 Poll::Ready(Err(err)) => { | ||||
|                     tracing::error!(?err, "client.poll_next error"); | ||||
|                 } | ||||
|                 Poll::Ready(Ok(None)) => client.closed = true, | ||||
|                 Poll::Ready(Ok(Some(ToServer::ChatLine { line, sequence }))) => { | ||||
|                     tracing::debug!(id = client.id, ?line); | ||||
|                     if sequence != client.sequence { | ||||
|                         tracing::error!( | ||||
|                             expected = client.sequence, | ||||
|                             actual = sequence, | ||||
|                             "Sequence mismatch" | ||||
|                         ); | ||||
|                         return Poll::Ready(Err(anyhow!("Sequence mismatch"))); | ||||
|                     } | ||||
|                     tracing::info!(?sequence, id = client.id); | ||||
|                     client.sequence += 1; | ||||
|                     new_messages.push(Rc::new(ToClient::ChatLine { | ||||
|                         id: client.id, | ||||
|                         line, | ||||
|                     })); | ||||
|                 Poll::Ready(None) => clients_to_remove.push(*id), | ||||
|                 Poll::Ready(Some(result)) => { | ||||
|                     let frame = result.context("stream.poll_next")?; | ||||
|                     cx.waker().wake_by_ref(); | ||||
|                     self.server.handle_client_frame(*id, frame.into())?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         self.clients.retain(|client| { | ||||
|             if client.closed { | ||||
|                 tracing::info!(id = client.id, "Closing client"); | ||||
|             } | ||||
|             !client.closed | ||||
|         }); | ||||
| 
 | ||||
|         for client in &mut self.clients { | ||||
|             for msg in &new_messages { | ||||
|                 client.outbox.push_back(Rc::clone(msg)); | ||||
|             } | ||||
|         // Close out disconnected clients
 | ||||
|         for id in clients_to_remove { | ||||
|             cx.waker().wake_by_ref(); | ||||
|             tracing::info!(?id, "Closing client"); | ||||
|             self.client_streams.remove(&id); | ||||
|             self.server.handle_client_disconnected(id)?; | ||||
|         } | ||||
| 
 | ||||
|         match self.listener.poll_accept(cx) { | ||||
|             Poll::Pending => {} | ||||
|             Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), | ||||
|             Poll::Ready(Ok((stream, _addr))) => { | ||||
|                 cx.waker().wake_by_ref(); | ||||
|                 let stream = Framed::new(stream, LengthDelimitedCodec::new()); | ||||
|                 let client = Client { | ||||
|                     closed: false, | ||||
|                     id: self.next_client_id, | ||||
|                     outbox: Default::default(), | ||||
|                     sequence: 0, | ||||
|                     stream, | ||||
|                 }; | ||||
|                 self.next_client_id += 1; | ||||
|                 tracing::info!(id = client.id, "Accepted client"); | ||||
|                 self.clients.push(client); | ||||
|             } | ||||
|         if let Poll::Ready(result) = self.listener.poll_accept(cx) { | ||||
|             let (stream, _addr) = result.context("listener.poll_accept")?; | ||||
|             cx.waker().wake_by_ref(); | ||||
|             let stream = Framed::new(stream, LengthDelimitedCodec::new()); | ||||
|             let id = self.server.handle_new_client()?; | ||||
|             self.client_streams.insert(id, stream); | ||||
|         } | ||||
| 
 | ||||
|         if self.clients.len() != previous_client_count { | ||||
|             tracing::info!(client_count = self.clients.len()); | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|         Poll::Pending | ||||
| #[derive(Default)] | ||||
| struct Server { | ||||
|     clients: BTreeMap<Id, Client>, | ||||
|     next_client_id: Id, | ||||
| } | ||||
| 
 | ||||
| impl Server { | ||||
|     fn broadcast(&mut self, msg: &ToClient) -> Result<()> { | ||||
|         for client in &mut self.clients.values_mut() { | ||||
|             client.handle_outgoing(msg)?; | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     fn handle_client_disconnected(&mut self, id: Id) -> Result<()> { | ||||
|         self.clients.remove(&id); | ||||
|         self.broadcast(&ToClient::ClientDisconnected { id })?; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     fn handle_client_frame(&mut self, id: Id, frame: Bytes) -> Result<()> { | ||||
|         let msg = { | ||||
|             let client = self | ||||
|                 .clients | ||||
|                 .get_mut(&id) | ||||
|                 .context("Logic error: Stream has no associated client")?; | ||||
|             let Some(msg) = client.handle_frame(frame).context("client.handle_frame")? else { | ||||
|                 return Ok(()); | ||||
|             }; | ||||
|             msg | ||||
|         }; | ||||
|         self.broadcast(&msg)?; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     fn handle_new_client(&mut self) -> Result<Id> { | ||||
|         let id = self.next_client_id; | ||||
|         self.next_client_id += 1; | ||||
|         let client = Client { | ||||
|             id, | ||||
|             outbox: Default::default(), | ||||
|             sequence: 0, | ||||
|         }; | ||||
|         tracing::info!( | ||||
|             id = client.id, | ||||
|             total = self.clients.len(), | ||||
|             "Accepted client" | ||||
|         ); | ||||
|         self.clients.insert(id, client); | ||||
|         self.broadcast(&ToClient::ClientConnected { id })?; | ||||
|         Ok(id) | ||||
|     } | ||||
| 
 | ||||
|     fn poll_send(&mut self, id: Id) -> Result<Option<Bytes>> { | ||||
|         let client = self | ||||
|             .clients | ||||
|             .get_mut(&id) | ||||
|             .context("Logic error: Stream has no associated client")?; | ||||
|         Ok(client.poll_send()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| struct Client { | ||||
|     closed: bool, | ||||
|     id: u64, | ||||
|     outbox: VecDeque<Rc<ToClient>>, | ||||
|     outbox: VecDeque<Bytes>, | ||||
|     sequence: u64, | ||||
|     stream: Framed<TcpStream, LengthDelimitedCodec>, | ||||
| } | ||||
| 
 | ||||
| impl Client { | ||||
|     fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<ToServer>>> { | ||||
|         let stream = pin!(&mut self.stream); | ||||
|         let Poll::Ready(frame_opt) = stream.poll_next(cx) else { | ||||
|             return Poll::Pending; | ||||
|         }; | ||||
|     fn handle_frame(&mut self, frame: Bytes) -> Result<Option<ToClient>> { | ||||
|         match rmp_serde::from_slice(&frame)? { | ||||
|             ToServer::ChatLine { line, sequence } => { | ||||
|                 if sequence != self.sequence { | ||||
|                     tracing::error!( | ||||
|                         expected = self.sequence, | ||||
|                         actual = sequence, | ||||
|                         "Sequence mismatch" | ||||
|                     ); | ||||
|                     bail!("Sequence mismatch"); | ||||
|                 } | ||||
|                 self.sequence += 1; | ||||
|                 Ok(Some(ToClient::ChatLine { id: self.id, line })) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|         let Some(frame) = frame_opt else { | ||||
|             return Poll::Ready(Ok(None)); | ||||
|         }; | ||||
|     fn handle_outgoing(&mut self, msg: &ToClient) -> Result<()> { | ||||
|         if !self.outbox.is_empty() { | ||||
|             bail!("Outbox full"); | ||||
|         } | ||||
|         let bytes = rmp_serde::to_vec(msg)?.into(); | ||||
|         self.outbox.push_back(bytes); | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|         cx.waker().wake_by_ref(); | ||||
|         tracing::debug!("Got network data"); | ||||
| 
 | ||||
|         let frame = match frame { | ||||
|             Err(err) => return Poll::Ready(Err(err).context("Network framing decode error")), | ||||
|             Ok(x) => x, | ||||
|         }; | ||||
|         let msg = match rmp_serde::from_slice(&frame) { | ||||
|             Err(err) => return Poll::Ready(Err(err).context("MsgPack decode error")), | ||||
|             Ok(x) => x, | ||||
|         }; | ||||
|         Poll::Ready(Ok(Some(msg))) | ||||
|     fn poll_send(&mut self) -> Option<Bytes> { | ||||
|         self.outbox.pop_front() | ||||
|     } | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue