Compare commits
	
		
			8 Commits 
		
	
	
		
			4548f9daee
			...
			c4de3ad00d
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  _ | c4de3ad00d | |
|  _ | e3c23a0931 | |
|  _ | 2606946240 | |
|  _ | 7119603145 | |
|  _ | a2b286b01d | |
|  _ | 425503d66d | |
|  _ | 4d018413f8 | |
|  _ | fd93df42e1 | 
							
								
								
									
										116
									
								
								src/client.rs
								
								
								
								
							
							
						
						
									
										116
									
								
								src/client.rs
								
								
								
								
							|  | @ -15,8 +15,7 @@ impl Default for Args { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub(crate) struct App { | pub(crate) struct App { | ||||||
|     outbox: VecDeque<ToServer>, |     client: Client, | ||||||
|     sequence: u64, |  | ||||||
|     stream: Framed<TcpStream, LengthDelimitedCodec>, |     stream: Framed<TcpStream, LengthDelimitedCodec>, | ||||||
|     timer: Interval, |     timer: Interval, | ||||||
| } | } | ||||||
|  | @ -28,69 +27,92 @@ impl App { | ||||||
|         let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into())); |         let mut timer = tokio::time::interval(Duration::from_millis(args.period_ms.into())); | ||||||
|         timer.set_missed_tick_behavior(MissedTickBehavior::Skip); |         timer.set_missed_tick_behavior(MissedTickBehavior::Skip); | ||||||
|         Ok(Self { |         Ok(Self { | ||||||
|             outbox: Default::default(), |             client: Default::default(), | ||||||
|             sequence: 0, |  | ||||||
|             stream, |             stream, | ||||||
|             timer, |             timer, | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { |     pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||||||
|  |         match self.step(cx) { | ||||||
|  |             Ok(()) => Poll::Pending, | ||||||
|  |             Err(err) => Poll::Ready(Err(err)), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn step(&mut self, cx: &mut Context<'_>) -> Result<()> { | ||||||
|  |         let mut stream = pin!(&mut self.stream); | ||||||
|  |         if let Poll::Ready(result) = | ||||||
|  |             <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) | ||||||
|         { |         { | ||||||
|             let mut stream = pin!(&mut self.stream); |             result?; | ||||||
|  |             if let Some(frame) = self.client.poll_send() { | ||||||
|  |                 stream | ||||||
|  |                     .as_mut() | ||||||
|  |                     .start_send(frame) | ||||||
|  |                     .context("stream.start_send")?; | ||||||
|  |                 tracing::debug!("Started send"); | ||||||
|  |             } | ||||||
|             match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) { |             match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) { | ||||||
|                 Poll::Pending => {} |                 Poll::Pending => {} | ||||||
|                 Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), |                 Poll::Ready(result) => { | ||||||
|                 Poll::Ready(Ok(())) => {} |                     result.context("poll_flush")?; | ||||||
|             } |  | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|         if !self.outbox.is_empty() { |  | ||||||
|             let mut stream = pin!(&mut self.stream); |  | ||||||
|             match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) { |  | ||||||
|                 Poll::Pending => {} |  | ||||||
|                 Poll::Ready(Err(err)) => { |  | ||||||
|                     return Poll::Ready( |  | ||||||
|                         Err(err).context("Can't check network write half for readiness"), |  | ||||||
|                     ); |  | ||||||
|                 } |  | ||||||
|                 Poll::Ready(Ok(())) => { |  | ||||||
|                     cx.waker().wake_by_ref(); |  | ||||||
|                     let frame = rmp_serde::to_vec(&self.outbox.pop_front())?; |  | ||||||
|                     if let Err(err) = stream.start_send(Bytes::from(frame)) { |  | ||||||
|                         return Poll::Ready(Err(err).context("start_send")); |  | ||||||
|                     } |  | ||||||
|                     tracing::debug!("Started send"); |  | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         let stream = pin!(&mut self.stream); |         if let Poll::Ready(frame_opt) = stream.as_mut().poll_next(cx) { | ||||||
|         match stream.poll_next(cx) { |             let frame = frame_opt.context("Server closed cxn")?; | ||||||
|             Poll::Pending => {} |             cx.waker().wake_by_ref(); | ||||||
|             Poll::Ready(None) => return Poll::Ready(Err(anyhow!("Server closed cxn"))), |             let frame = frame.context("network framing decode")?; | ||||||
|             Poll::Ready(Some(frame)) => { |             self.client | ||||||
|                 cx.waker().wake_by_ref(); |                 .handle_frame(frame.into()) | ||||||
|                 let frame = frame.context("network framing decode")?; |                 .context("client.handle_frame")?; | ||||||
|                 match rmp_serde::from_slice(&frame)? { |  | ||||||
|                     ToClient::ChatLine { id, line } => { |  | ||||||
|                         tracing::info!(?id, ?line); |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         if self.timer.poll_tick(cx).is_ready() { |         if self.timer.poll_tick(cx).is_ready() { | ||||||
|             cx.waker().wake_by_ref(); |             cx.waker().wake_by_ref(); | ||||||
|             if self.outbox.is_empty() { |             self.client | ||||||
|                 self.outbox.push_back(ToServer::ChatLine { |                 .handle_timeout() | ||||||
|                     line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(), |                 .context("client.handle_timeout")?; | ||||||
|                     sequence: self.sequence, |  | ||||||
|                 }); |  | ||||||
|                 self.sequence += 1; |  | ||||||
|             } |  | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         Poll::Pending |         Ok(()) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[derive(Default)] | ||||||
|  | struct Client { | ||||||
|  |     outbox: VecDeque<Bytes>, | ||||||
|  |     sequence: u64, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl Client { | ||||||
|  |     fn handle_frame(&self, frame: Bytes) -> Result<()> { | ||||||
|  |         match rmp_serde::from_slice(&frame)? { | ||||||
|  |             ToClient::ChatLine { id, line } => tracing::info!(?id, ?line), | ||||||
|  |             ToClient::ClientConnected { id } => tracing::info!(?id, "Connected"), | ||||||
|  |             ToClient::ClientDisconnected { id } => tracing::info!(?id, "Disconnected"), | ||||||
|  |         } | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn handle_timeout(&mut self) -> Result<()> { | ||||||
|  |         if !self.outbox.is_empty() { | ||||||
|  |             bail!("Dropped message, outbox is full"); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         let msg = ToServer::ChatLine { | ||||||
|  |             line: "There was a time, in the era of great chaos, when the Earth and the Moon were at war with each other. A daredevil from the Moon piloted a bizarre aircraft. It was feared, and because of its shape, called EINHANDER.".to_string(), | ||||||
|  |             sequence: self.sequence, | ||||||
|  |         }; | ||||||
|  |         let frame = rmp_serde::to_vec(&msg)?; | ||||||
|  |         self.outbox.push_back(frame.into()); | ||||||
|  |         self.sequence += 1; | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn poll_send(&mut self) -> Option<Bytes> { | ||||||
|  |         self.outbox.pop_front() | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -1,8 +1,11 @@ | ||||||
|  | use crate::prelude::*; | ||||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||||
| 
 | 
 | ||||||
| #[derive(Deserialize, Serialize)] | #[derive(Deserialize, Serialize)] | ||||||
| pub(crate) enum ToClient { | pub(crate) enum ToClient { | ||||||
|     ChatLine { id: u64, line: String }, |     ChatLine { id: Id, line: String }, | ||||||
|  |     ClientConnected { id: Id }, | ||||||
|  |     ClientDisconnected { id: Id }, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Deserialize, Serialize)] | #[derive(Deserialize, Serialize)] | ||||||
|  |  | ||||||
|  | @ -1,14 +1,13 @@ | ||||||
| pub(crate) use crate::messages::{ToClient, ToServer}; | pub(crate) use crate::messages::{ToClient, ToServer}; | ||||||
| pub use anyhow::{Context as _, Result, anyhow, bail}; | pub use anyhow::{Context as _, Result, bail}; | ||||||
| pub use bytes::Bytes; | pub use bytes::Bytes; | ||||||
| pub use futures_core::stream::Stream; | pub use futures_core::stream::Stream; | ||||||
| pub use futures_sink::Sink; | pub use futures_sink::Sink; | ||||||
| pub use std::{ | pub use std::{ | ||||||
|     collections::VecDeque, |     collections::{BTreeMap, VecDeque}, | ||||||
|     future::poll_fn, |     future::poll_fn, | ||||||
|     ops::ControlFlow, |     ops::ControlFlow, | ||||||
|     pin::pin, |     pin::pin, | ||||||
|     rc::Rc, |  | ||||||
|     str::FromStr, |     str::FromStr, | ||||||
|     task::{Context, Poll}, |     task::{Context, Poll}, | ||||||
|     time::Duration, |     time::Duration, | ||||||
|  | @ -20,3 +19,5 @@ pub use tokio::{ | ||||||
| }; | }; | ||||||
| // Don't use BytesCodec, it is _nonsense_
 | // Don't use BytesCodec, it is _nonsense_
 | ||||||
| pub use tokio_util::codec::{Framed, LengthDelimitedCodec}; | pub use tokio_util::codec::{Framed, LengthDelimitedCodec}; | ||||||
|  | 
 | ||||||
|  | pub type Id = u64; | ||||||
|  |  | ||||||
							
								
								
									
										249
									
								
								src/server.rs
								
								
								
								
							
							
						
						
									
										249
									
								
								src/server.rs
								
								
								
								
							|  | @ -11,9 +11,9 @@ impl Default for Args { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub(crate) struct App { | pub(crate) struct App { | ||||||
|     clients: Vec<Client>, |     client_streams: BTreeMap<Id, Framed<TcpStream, LengthDelimitedCodec>>, | ||||||
|     listener: TcpListener, |     listener: TcpListener, | ||||||
|     next_client_id: u64, |     server: Server, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl App { | impl App { | ||||||
|  | @ -21,147 +21,170 @@ impl App { | ||||||
|         let listener = TcpListener::bind(("0.0.0.0", args.port)).await?; |         let listener = TcpListener::bind(("0.0.0.0", args.port)).await?; | ||||||
| 
 | 
 | ||||||
|         Ok(Self { |         Ok(Self { | ||||||
|             clients: Default::default(), |             client_streams: Default::default(), | ||||||
|             listener, |             listener, | ||||||
|             next_client_id: 1, |             server: Default::default(), | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { |     pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { | ||||||
|         let previous_client_count = self.clients.len(); |         match self.step(cx) { | ||||||
|  |             Ok(()) => Poll::Pending, | ||||||
|  |             Err(err) => Poll::Ready(Err(err)), | ||||||
|  |         } | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|         let mut new_messages = vec![]; |     fn step(&mut self, cx: &mut Context<'_>) -> Result<()> { | ||||||
|         for client in &mut self.clients { |         let mut clients_to_remove = vec![]; | ||||||
|  | 
 | ||||||
|  |         // Pump the network streams
 | ||||||
|  |         for (id, stream) in &mut self.client_streams { | ||||||
|  |             let mut stream = pin!(stream); | ||||||
|  | 
 | ||||||
|  |             // Try to send from the client's outbox
 | ||||||
|  |             if let Poll::Ready(result) = | ||||||
|  |                 <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) | ||||||
|             { |             { | ||||||
|                 let mut stream = pin!(&mut client.stream); |                 result.context("Can't check network write half for readiness")?; | ||||||
|                 match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) { |                 if let Some(frame) = self.server.poll_send(*id)? { | ||||||
|                     Poll::Pending => {} |                     stream.as_mut().start_send(frame).context("start_send")?; | ||||||
|                     Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), |                     if let Poll::Ready(result) = | ||||||
|                     Poll::Ready(Ok(())) => {} |                         <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) | ||||||
|  |                     { | ||||||
|  |                         result.context("poll_flush")?; | ||||||
|  |                     } | ||||||
|  |                     tracing::debug!("Started send"); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             if !client.outbox.is_empty() { |             // Try to read data in
 | ||||||
|                 let mut stream = pin!(&mut client.stream); |             match stream.as_mut().poll_next(cx) { | ||||||
|                 match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) { |  | ||||||
|                     Poll::Pending => {} |  | ||||||
|                     Poll::Ready(Err(err)) => { |  | ||||||
|                         return Poll::Ready( |  | ||||||
|                             Err(err).context("Can't check network write half for readiness"), |  | ||||||
|                         ); |  | ||||||
|                     } |  | ||||||
|                     Poll::Ready(Ok(())) => { |  | ||||||
|                         cx.waker().wake_by_ref(); |  | ||||||
|                         let Some(msg) = client.outbox.pop_front() else { |  | ||||||
|                             return Poll::Ready(Err(anyhow!( |  | ||||||
|                                 "Can't pop from outbox even though we just checked it was not empty" |  | ||||||
|                             ))); |  | ||||||
|                         }; |  | ||||||
|                         let frame = rmp_serde::to_vec(msg.as_ref())?; |  | ||||||
|                         if let Err(err) = stream.start_send(Bytes::from(frame)) { |  | ||||||
|                             return Poll::Ready(Err(err).context("start_send")); |  | ||||||
|                         } |  | ||||||
|                         tracing::debug!("Started send"); |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
| 
 |  | ||||||
|             match client.poll_next(cx) { |  | ||||||
|                 Poll::Pending => {} |                 Poll::Pending => {} | ||||||
|                 Poll::Ready(Err(err)) => { |                 Poll::Ready(None) => clients_to_remove.push(*id), | ||||||
|                     tracing::error!(?err, "client.poll_next error"); |                 Poll::Ready(Some(result)) => { | ||||||
|                 } |                     let frame = result.context("stream.poll_next")?; | ||||||
|                 Poll::Ready(Ok(None)) => client.closed = true, |                     cx.waker().wake_by_ref(); | ||||||
|                 Poll::Ready(Ok(Some(ToServer::ChatLine { line, sequence }))) => { |                     self.server.handle_client_frame(*id, frame.into())?; | ||||||
|                     tracing::debug!(id = client.id, ?line); |  | ||||||
|                     if sequence != client.sequence { |  | ||||||
|                         tracing::error!( |  | ||||||
|                             expected = client.sequence, |  | ||||||
|                             actual = sequence, |  | ||||||
|                             "Sequence mismatch" |  | ||||||
|                         ); |  | ||||||
|                         return Poll::Ready(Err(anyhow!("Sequence mismatch"))); |  | ||||||
|                     } |  | ||||||
|                     tracing::info!(?sequence, id = client.id); |  | ||||||
|                     client.sequence += 1; |  | ||||||
|                     new_messages.push(Rc::new(ToClient::ChatLine { |  | ||||||
|                         id: client.id, |  | ||||||
|                         line, |  | ||||||
|                     })); |  | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         self.clients.retain(|client| { |         // Close out disconnected clients
 | ||||||
|             if client.closed { |         for id in clients_to_remove { | ||||||
|                 tracing::info!(id = client.id, "Closing client"); |             cx.waker().wake_by_ref(); | ||||||
|             } |             tracing::info!(?id, "Closing client"); | ||||||
|             !client.closed |             self.client_streams.remove(&id); | ||||||
|         }); |             self.server.handle_client_disconnected(id)?; | ||||||
| 
 |  | ||||||
|         for client in &mut self.clients { |  | ||||||
|             for msg in &new_messages { |  | ||||||
|                 client.outbox.push_back(Rc::clone(msg)); |  | ||||||
|             } |  | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         match self.listener.poll_accept(cx) { |         if let Poll::Ready(result) = self.listener.poll_accept(cx) { | ||||||
|             Poll::Pending => {} |             let (stream, _addr) = result.context("listener.poll_accept")?; | ||||||
|             Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), |             cx.waker().wake_by_ref(); | ||||||
|             Poll::Ready(Ok((stream, _addr))) => { |             let stream = Framed::new(stream, LengthDelimitedCodec::new()); | ||||||
|                 cx.waker().wake_by_ref(); |             let id = self.server.handle_new_client()?; | ||||||
|                 let stream = Framed::new(stream, LengthDelimitedCodec::new()); |             self.client_streams.insert(id, stream); | ||||||
|                 let client = Client { |  | ||||||
|                     closed: false, |  | ||||||
|                     id: self.next_client_id, |  | ||||||
|                     outbox: Default::default(), |  | ||||||
|                     sequence: 0, |  | ||||||
|                     stream, |  | ||||||
|                 }; |  | ||||||
|                 self.next_client_id += 1; |  | ||||||
|                 tracing::info!(id = client.id, "Accepted client"); |  | ||||||
|                 self.clients.push(client); |  | ||||||
|             } |  | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         if self.clients.len() != previous_client_count { |         Ok(()) | ||||||
|             tracing::info!(client_count = self.clients.len()); |     } | ||||||
|         } | } | ||||||
| 
 | 
 | ||||||
|         Poll::Pending | #[derive(Default)] | ||||||
|  | struct Server { | ||||||
|  |     clients: BTreeMap<Id, Client>, | ||||||
|  |     next_client_id: Id, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl Server { | ||||||
|  |     fn broadcast(&mut self, msg: &ToClient) -> Result<()> { | ||||||
|  |         for client in &mut self.clients.values_mut() { | ||||||
|  |             client.handle_outgoing(msg)?; | ||||||
|  |         } | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn handle_client_disconnected(&mut self, id: Id) -> Result<()> { | ||||||
|  |         self.clients.remove(&id); | ||||||
|  |         self.broadcast(&ToClient::ClientDisconnected { id })?; | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn handle_client_frame(&mut self, id: Id, frame: Bytes) -> Result<()> { | ||||||
|  |         let msg = { | ||||||
|  |             let client = self | ||||||
|  |                 .clients | ||||||
|  |                 .get_mut(&id) | ||||||
|  |                 .context("Logic error: Stream has no associated client")?; | ||||||
|  |             let Some(msg) = client.handle_frame(frame).context("client.handle_frame")? else { | ||||||
|  |                 return Ok(()); | ||||||
|  |             }; | ||||||
|  |             msg | ||||||
|  |         }; | ||||||
|  |         self.broadcast(&msg)?; | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn handle_new_client(&mut self) -> Result<Id> { | ||||||
|  |         let id = self.next_client_id; | ||||||
|  |         self.next_client_id += 1; | ||||||
|  |         let client = Client { | ||||||
|  |             id, | ||||||
|  |             outbox: Default::default(), | ||||||
|  |             sequence: 0, | ||||||
|  |         }; | ||||||
|  |         tracing::info!( | ||||||
|  |             id = client.id, | ||||||
|  |             total = self.clients.len(), | ||||||
|  |             "Accepted client" | ||||||
|  |         ); | ||||||
|  |         self.clients.insert(id, client); | ||||||
|  |         self.broadcast(&ToClient::ClientConnected { id })?; | ||||||
|  |         Ok(id) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn poll_send(&mut self, id: Id) -> Result<Option<Bytes>> { | ||||||
|  |         let client = self | ||||||
|  |             .clients | ||||||
|  |             .get_mut(&id) | ||||||
|  |             .context("Logic error: Stream has no associated client")?; | ||||||
|  |         Ok(client.poll_send()) | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| struct Client { | struct Client { | ||||||
|     closed: bool, |  | ||||||
|     id: u64, |     id: u64, | ||||||
|     outbox: VecDeque<Rc<ToClient>>, |     outbox: VecDeque<Bytes>, | ||||||
|     sequence: u64, |     sequence: u64, | ||||||
|     stream: Framed<TcpStream, LengthDelimitedCodec>, |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl Client { | impl Client { | ||||||
|     fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<ToServer>>> { |     fn handle_frame(&mut self, frame: Bytes) -> Result<Option<ToClient>> { | ||||||
|         let stream = pin!(&mut self.stream); |         match rmp_serde::from_slice(&frame)? { | ||||||
|         let Poll::Ready(frame_opt) = stream.poll_next(cx) else { |             ToServer::ChatLine { line, sequence } => { | ||||||
|             return Poll::Pending; |                 if sequence != self.sequence { | ||||||
|         }; |                     tracing::error!( | ||||||
|  |                         expected = self.sequence, | ||||||
|  |                         actual = sequence, | ||||||
|  |                         "Sequence mismatch" | ||||||
|  |                     ); | ||||||
|  |                     bail!("Sequence mismatch"); | ||||||
|  |                 } | ||||||
|  |                 self.sequence += 1; | ||||||
|  |                 Ok(Some(ToClient::ChatLine { id: self.id, line })) | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|         let Some(frame) = frame_opt else { |     fn handle_outgoing(&mut self, msg: &ToClient) -> Result<()> { | ||||||
|             return Poll::Ready(Ok(None)); |         if !self.outbox.is_empty() { | ||||||
|         }; |             bail!("Outbox full"); | ||||||
|  |         } | ||||||
|  |         let bytes = rmp_serde::to_vec(msg)?.into(); | ||||||
|  |         self.outbox.push_back(bytes); | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|         cx.waker().wake_by_ref(); |     fn poll_send(&mut self) -> Option<Bytes> { | ||||||
|         tracing::debug!("Got network data"); |         self.outbox.pop_front() | ||||||
| 
 |  | ||||||
|         let frame = match frame { |  | ||||||
|             Err(err) => return Poll::Ready(Err(err).context("Network framing decode error")), |  | ||||||
|             Ok(x) => x, |  | ||||||
|         }; |  | ||||||
|         let msg = match rmp_serde::from_slice(&frame) { |  | ||||||
|             Err(err) => return Poll::Ready(Err(err).context("MsgPack decode error")), |  | ||||||
|             Ok(x) => x, |  | ||||||
|         }; |  | ||||||
|         Poll::Ready(Ok(Some(msg))) |  | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue