main
_ 2025-02-23 19:28:28 -06:00
parent fd93df42e1
commit 4d018413f8
1 changed files with 25 additions and 20 deletions

View File

@ -34,49 +34,54 @@ impl App {
} }
pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { pub(crate) fn poll_run(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.step(cx) {
Ok(()) => Poll::Pending,
Err(err) => Poll::Ready(Err(err)),
}
}
fn step(&mut self, cx: &mut Context<'_>) -> Result<()> {
let mut stream = pin!(&mut self.stream); let mut stream = pin!(&mut self.stream);
match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) { match <_ as futures_sink::Sink<Bytes>>::poll_ready(stream.as_mut(), cx) {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Err(err)) => { Poll::Ready(result) => {
return Poll::Ready( result?;
Err(err).context("Can't check network write half for readiness"),
);
}
Poll::Ready(Ok(())) => {
if let Some(frame) = self.client.poll_send() { if let Some(frame) = self.client.poll_send() {
if let Err(err) = stream.as_mut().start_send(frame) { stream
return Poll::Ready(Err(err).context("stream.start_send")); .as_mut()
} .start_send(frame)
.context("stream.start_send")?;
tracing::debug!("Started send"); tracing::debug!("Started send");
} }
match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) { match <_ as futures_sink::Sink<Bytes>>::poll_flush(stream.as_mut(), cx) {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err).context("poll_flush")), Poll::Ready(result) => {
Poll::Ready(Ok(())) => {} result.context("poll_flush")?;
}
} }
} }
} }
match stream.as_mut().poll_next(cx) { match stream.as_mut().poll_next(cx) {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(None) => return Poll::Ready(Err(anyhow!("Server closed cxn"))), Poll::Ready(frame_opt) => {
Poll::Ready(Some(frame)) => { let frame = frame_opt.context("Server closed cxn")?;
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
let frame = frame.context("network framing decode")?; let frame = frame.context("network framing decode")?;
if let Err(err) = self.client.handle_frame(frame.into()) { self.client
return Poll::Ready(Err(err).context("client.handle_frame")); .handle_frame(frame.into())
} .context("client.handle_frame")?;
} }
} }
if self.timer.poll_tick(cx).is_ready() { if self.timer.poll_tick(cx).is_ready() {
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
if let Err(err) = self.client.handle_timeout() { self.client
return Poll::Ready(Err(err).context("client.handle_timeout")); .handle_timeout()
} .context("client.handle_timeout")?;
} }
Poll::Pending Ok(())
} }
} }