From f048b0c01fb8c9e07a8ed0e0f3c815dd239656ad Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Thu, 29 Jul 2021 01:15:58 +0100 Subject: [PATCH] client: wip code Signed-off-by: Ian Jackson --- src/bin/client.rs | 18 ++++++++++-------- src/reporter.rs | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index 78f0cba..6cdb45c 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -83,31 +83,33 @@ where C: hyper::client::connect::Connect + Clone + Send + Sync, assert!( to_process.is_none() ); dbg!(&reqs.len(), &upbound_total, &upbound.len()); - let req = hyper::Request::post(&ic.url) - .body( + let body = hyper::body::Body::wrap_stream( Itertools::intersperse( upbound.into_iter().map(|u| Cow::from(u)), Cow::from(&[SLIP_END] as &'static [u8]) - ) - ).context("construct request")?; + ).into() + ); + + let req = hyper::Request::post(&ic.url).body(body) + .context("construct request")?; let resp = hclient.request(req); let fut = Box::pin(tokio::time::timeout( ic.http_timeout, async { let r = async { - let resp = resp.await; + let resp = resp.await.context("make request")?; if ! resp.status().is_success() { throw!(anyhow!("HTTP error status {}", &resp.status())); } let resp = resp.into_body(); // xxx: some size limit to avoid mallocing the universe - let resp = resp.aggregate().await + let resp = hyper::body::aggregate(resp).await .context("HTTP error fetching response body")?; Ok::<_,AE>(resp) - }; + }.await; if r.is_err() { - tokio::time::sleep(&ic.http_retry).await; + tokio::time::sleep(ic.http_retry).await; } r } diff --git a/src/reporter.rs b/src/reporter.rs index b87417b..bb6cc36 100644 --- a/src/reporter.rs +++ b/src/reporter.rs @@ -8,7 +8,7 @@ pub struct Reporter { } impl Reporter { - pub fn report(r: Result) -> Option { + pub fn report(&mut self, r: Result) -> Option { match r { Ok(t) => { // xxx something something success -- 2.30.2