From: Ian Jackson Date: Fri, 30 Jul 2021 18:25:47 +0000 (+0100) Subject: break out submit_request X-Git-Tag: hippotat/1.0.0~424 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=ff23f6369487516860bf903495502e5c59071e0f;p=hippotat.git break out submit_request Signed-off-by: Ian Jackson --- diff --git a/src/bin/client.rs b/src/bin/client.rs index f449b68..905e589 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -11,6 +11,52 @@ struct Client { rx_write_stream: something, }*/ + +type OutstandingRequest<'r> = Pin> + Send + 'r + >>; + +#[throws(AE)] +fn submit_request<'r, 'ic:'r, 'hc:'r, C>(ic: &'ic InstanceConfig, + hclient: &'hc Arc>, + reqs: &mut Vec>, + upbound: Vec>, +) +where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static, + { + let body = hyper::body::Body::wrap_stream( + futures::stream::iter( + Itertools::intersperse( + upbound.into_iter().map(|u| Bytes::from(u)), + slip::SLIP_END_SLICE.into() + ).map(Ok::<_,Void>) + ) + ); + + let req = hyper::Request::post(&ic.url).body(body) + .context("construct request")?; + + let resp = hclient.request(req); + let fut = Box::pin(async move { + let r = async { tokio::time::timeout( ic.http_timeout, async { + let resp = resp.await.context("make request")?; + if ! resp.status().is_success() { + throw!(anyhow!("HTTP error status {}", &resp.status())); + } + let resp = resp.into_body(); + // xxx: some size limit to avoid mallocing the universe + let resp = hyper::body::to_bytes(resp).await + .context("HTTP error fetching response body")?; + Ok::<_,AE>(resp) + }).await? }.await; + if r.is_err() { + tokio::time::sleep(ic.http_retry).await; + } + r + }); + reqs.push(fut); +} + #[allow(unused_variables)] // xxx async fn run_client(ic: InstanceConfig, hclient: Arc>) -> Result @@ -44,9 +90,8 @@ where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static, let stream_for_rx = ipif.stdin .take().unwrap(); - let mut reqs: Vec> + Send - >>> = Vec::with_capacity(ic.max_requests_outstanding.sat()); + let mut reqs: Vec + = Vec::with_capacity(ic.max_requests_outstanding.sat()); // xxx check that ic settings are all honoured @@ -86,37 +131,7 @@ where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static, dbg!(&reqs.len(), &upbound_total, &upbound.len()); //: impl futures::Stream> - let body = hyper::body::Body::wrap_stream( - futures::stream::iter( - Itertools::intersperse( - upbound.into_iter().map(|u| Bytes::from(u)), - slip::SLIP_END_SLICE.into() - ).map(Ok::<_,Void>) - ) - ); - - let req = hyper::Request::post(&ic.url).body(body) - .context("construct request")?; - - let resp = hclient.request(req); - let fut = Box::pin(async { - let r = async { tokio::time::timeout( ic.http_timeout, async { - let resp = resp.await.context("make request")?; - if ! resp.status().is_success() { - throw!(anyhow!("HTTP error status {}", &resp.status())); - } - let resp = resp.into_body(); - // xxx: some size limit to avoid mallocing the universe - let resp = hyper::body::to_bytes(resp).await - .context("HTTP error fetching response body")?; - Ok::<_,AE>(resp) - }).await? }.await; - if r.is_err() { - tokio::time::sleep(ic.http_retry).await; - } - r - }); - reqs.push(fut); + submit_request(&ic, &hclient, &mut reqs, upbound)?; } (got, goti, _) = async { future::select_all(&mut reqs).await },