From 92add4d9a6691dd8cefe960289c3ca789dc5bc8a Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Fri, 30 Jul 2021 19:30:14 +0100 Subject: [PATCH] tidying Signed-off-by: Ian Jackson --- src/bin/client.rs | 94 +++++++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 49 deletions(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index 905e589..c13cb9c 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -4,63 +4,59 @@ use hippotat::prelude::*; -/* -struct Client { - requests_outstanding: Vec<>, - tx_read_stream: something, - rx_write_stream: something, -}*/ - - type OutstandingRequest<'r> = Pin> + Send + 'r >>; +impl HCC for T where + T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { } +trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { } + #[throws(AE)] -fn submit_request<'r, 'ic:'r, 'hc:'r, C>(ic: &'ic InstanceConfig, - hclient: &'hc Arc>, - reqs: &mut Vec>, - upbound: Vec>, -) -where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static, - { - let body = hyper::body::Body::wrap_stream( - futures::stream::iter( - Itertools::intersperse( - upbound.into_iter().map(|u| Bytes::from(u)), - slip::SLIP_END_SLICE.into() - ).map(Ok::<_,Void>) - ) - ); - - let req = hyper::Request::post(&ic.url).body(body) - .context("construct request")?; - - let resp = hclient.request(req); - let fut = Box::pin(async move { - let r = async { tokio::time::timeout( ic.http_timeout, async { - let resp = resp.await.context("make request")?; - if ! resp.status().is_success() { - throw!(anyhow!("HTTP error status {}", &resp.status())); - } - let resp = resp.into_body(); - // xxx: some size limit to avoid mallocing the universe - let resp = hyper::body::to_bytes(resp).await - .context("HTTP error fetching response body")?; - Ok::<_,AE>(resp) - }).await? }.await; - if r.is_err() { - tokio::time::sleep(ic.http_retry).await; - } - r - }); - reqs.push(fut); +fn submit_request<'r, 'ic:'r, 'hc:'r, C:HCC>( + ic: &'ic InstanceConfig, + hclient: &'hc Arc>, + reqs: &mut Vec>, + upbound: Vec> +) { + let body = hyper::body::Body::wrap_stream( + futures::stream::iter( + Itertools::intersperse( + upbound.into_iter().map(|u| Bytes::from(u)), + slip::SLIP_END_SLICE.into() + ).map(Ok::<_,Void>) + ) + ); + + let req = hyper::Request::post(&ic.url).body(body) + .context("construct request")?; + + let resp = hclient.request(req); + let fut = Box::pin(async move { + let r = async { tokio::time::timeout( ic.http_timeout, async { + let resp = resp.await.context("make request")?; + if ! resp.status().is_success() { + throw!(anyhow!("HTTP error status {}", &resp.status())); + } + let resp = resp.into_body(); + // xxx: some size limit to avoid mallocing the universe + let resp = hyper::body::to_bytes(resp).await + .context("HTTP error fetching response body")?; + Ok::<_,AE>(resp) + }).await? }.await; + if r.is_err() { + tokio::time::sleep(ic.http_retry).await; + } + r + }); + reqs.push(fut); } #[allow(unused_variables)] // xxx -async fn run_client(ic: InstanceConfig, hclient: Arc>) - -> Result -where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static, +async fn run_client( + ic: InstanceConfig, + hclient: Arc> +) -> Result { debug!("{}: config: {:?}", &ic, &ic); -- 2.30.2