let mut reqs = Vec::with_capacity(ic.max_requests_outstanding.sat());
+ // xxx check that ic settings are all honoured
+
async {
loop {
select! {
Cow::from(&[SLIP_END] as &'static [u8])
)
).context("construct request")?;
-
-// dbg!(&req);
-
-// hclient.request
-/*
- Body
- made out of Stream
- made out of futures::stream::iter
-
-
- let datalen =
-
- let o = 0;
- let i =
-*/
- reqs.push(());
- // xxx make new request
+ let resp = hclient.request(req);
+ let fut = Box::pin(tokio::timeout(
+ ic.http_timeout,
+ async {
+ let r = async {
+ let resp = resp.await;
+ if ! resp.status().is_success() {
+ throw!("HTTP error status {}: {}", &resp.status());
+ }
+ let resp = resp.into_body();
+ // xxx: some size limit to avoid mallocing the universe
+ let resp = resp.aggregate().await
+ .context("HTTP error fetching response body")?;
+ Ok::<_,AE>(resp)
+ };
+ if r.is_err() {
+ tokio::time::sleep(&ic.http_retry).await;
+ }
+ r
+ }
+ ));
+ reqs.push(fut);
+ }
+
+ (got, goti, _) = future::select_all(&mut reqs)
+ {
+ reqs.swap_remove(goti);
+ if let Some(got) = reporter.report(got) {
+ dbg!(got.len()); // xxx
+ }
}
}
}