rx_write_stream: something,
}*/
+
+type OutstandingRequest<'r> = Pin<Box<
+ dyn Future<Output=Result<Bytes,AE>> + Send + 'r
+ >>;
+
+#[throws(AE)]
+fn submit_request<'r, 'ic:'r, 'hc:'r, C>(ic: &'ic InstanceConfig,
+ hclient: &'hc Arc<hyper::Client<C>>,
+ reqs: &mut Vec<OutstandingRequest<'r>>,
+ upbound: Vec<Vec<u8>>,
+)
+where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
+ {
+ let body = hyper::body::Body::wrap_stream(
+ futures::stream::iter(
+ Itertools::intersperse(
+ upbound.into_iter().map(|u| Bytes::from(u)),
+ slip::SLIP_END_SLICE.into()
+ ).map(Ok::<_,Void>)
+ )
+ );
+
+ let req = hyper::Request::post(&ic.url).body(body)
+ .context("construct request")?;
+
+ let resp = hclient.request(req);
+ let fut = Box::pin(async move {
+ let r = async { tokio::time::timeout( ic.http_timeout, async {
+ let resp = resp.await.context("make request")?;
+ if ! resp.status().is_success() {
+ throw!(anyhow!("HTTP error status {}", &resp.status()));
+ }
+ let resp = resp.into_body();
+ // xxx: some size limit to avoid mallocing the universe
+ let resp = hyper::body::to_bytes(resp).await
+ .context("HTTP error fetching response body")?;
+ Ok::<_,AE>(resp)
+ }).await? }.await;
+ if r.is_err() {
+ tokio::time::sleep(ic.http_retry).await;
+ }
+ r
+ });
+ reqs.push(fut);
+}
+
#[allow(unused_variables)] // xxx
async fn run_client<C>(ic: InstanceConfig, hclient: Arc<hyper::Client<C>>)
-> Result<Void, AE>
let stream_for_rx = ipif.stdin .take().unwrap();
- let mut reqs: Vec<Pin<Box<
- dyn Future<Output=Result<Bytes,AE>> + Send
- >>> = Vec::with_capacity(ic.max_requests_outstanding.sat());
+ let mut reqs: Vec<OutstandingRequest>
+ = Vec::with_capacity(ic.max_requests_outstanding.sat());
// xxx check that ic settings are all honoured
dbg!(&reqs.len(), &upbound_total, &upbound.len());
//: impl futures::Stream<Cow<&[u8]>>
- let body = hyper::body::Body::wrap_stream(
- futures::stream::iter(
- Itertools::intersperse(
- upbound.into_iter().map(|u| Bytes::from(u)),
- slip::SLIP_END_SLICE.into()
- ).map(Ok::<_,Void>)
- )
- );
-
- let req = hyper::Request::post(&ic.url).body(body)
- .context("construct request")?;
-
- let resp = hclient.request(req);
- let fut = Box::pin(async {
- let r = async { tokio::time::timeout( ic.http_timeout, async {
- let resp = resp.await.context("make request")?;
- if ! resp.status().is_success() {
- throw!(anyhow!("HTTP error status {}", &resp.status()));
- }
- let resp = resp.into_body();
- // xxx: some size limit to avoid mallocing the universe
- let resp = hyper::body::to_bytes(resp).await
- .context("HTTP error fetching response body")?;
- Ok::<_,AE>(resp)
- }).await? }.await;
- if r.is_err() {
- tokio::time::sleep(ic.http_retry).await;
- }
- r
- });
- reqs.push(fut);
+ submit_request(&ic, &hclient, &mut reqs, upbound)?;
}
(got, goti, _) = async { future::select_all(&mut reqs).await },