From: Ian Jackson Date: Thu, 29 Jul 2021 00:31:29 +0000 (+0100) Subject: client: wip code X-Git-Tag: hippotat/1.0.0~431 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=5588b6ce644725a543d7e5fa3e205b61b0ab0d7c;p=hippotat.git client: wip code Signed-off-by: Ian Jackson --- diff --git a/src/bin/client.rs b/src/bin/client.rs index 6cdb45c..9e9028c 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -56,7 +56,7 @@ where C: hyper::client::connect::Connect + Clone + Send + Sync, reqs.len() < ic.max_requests_outstanding.sat() => { let mut upbound_total = 0; - let mut upbound = vec![]; + let mut upbound: Vec> = vec![]; let mut to_process: Option>,_>> = Some(packet); while let Some(packet) = to_process.take() { @@ -83,37 +83,37 @@ where C: hyper::client::connect::Connect + Clone + Send + Sync, assert!( to_process.is_none() ); dbg!(&reqs.len(), &upbound_total, &upbound.len()); + //: impl futures::Stream> let body = hyper::body::Body::wrap_stream( + futures::stream::iter( Itertools::intersperse( upbound.into_iter().map(|u| Cow::from(u)), Cow::from(&[SLIP_END] as &'static [u8]) - ).into() + ).map(Ok) + ) ); let req = hyper::Request::post(&ic.url).body(body) .context("construct request")?; let resp = hclient.request(req); - let fut = Box::pin(tokio::time::timeout( - ic.http_timeout, - async { - let r = async { - let resp = resp.await.context("make request")?; - if ! resp.status().is_success() { - throw!(anyhow!("HTTP error status {}", &resp.status())); - } - let resp = resp.into_body(); - // xxx: some size limit to avoid mallocing the universe - let resp = hyper::body::aggregate(resp).await - .context("HTTP error fetching response body")?; - Ok::<_,AE>(resp) - }.await; - if r.is_err() { - tokio::time::sleep(ic.http_retry).await; + let fut = Box::pin(async { + let r = async { tokio::time::timeout( ic.http_timeout, async { + let resp = resp.await.context("make request")?; + if ! resp.status().is_success() { + throw!(anyhow!("HTTP error status {}", &resp.status())); } - r + let resp = resp.into_body(); + // xxx: some size limit to avoid mallocing the universe + let resp = hyper::body::aggregate(resp).await + .context("HTTP error fetching response body")?; + Ok::<_,AE>(resp) + }).await? }.await; + if r.is_err() { + tokio::time::sleep(ic.http_retry).await; } - )); + r + }); reqs.push(fut); } @@ -121,7 +121,7 @@ where C: hyper::client::connect::Connect + Clone + Send + Sync, { reqs.swap_remove(goti); if let Some(got) = reporter.report(got) { - dbg!(got.len()); // xxx + dbg!(&got.remaining()); // xxx } } } diff --git a/src/prelude.rs b/src/prelude.rs index 08cc6c5..f1afa4e 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -23,6 +23,7 @@ pub use anyhow::{anyhow, Context}; pub use extend::ext; pub use fehler::{throw, throws}; pub use futures::{poll, future}; +pub use hyper::body::{Buf as _}; pub use hyper::Uri; pub use hyper_tls::HttpsConnector; pub use ipnet::IpNet;