reqs.len() < ic.max_requests_outstanding.sat() =>
{
let mut upbound_total = 0;
- let mut upbound = vec![];
+ let mut upbound: Vec<Vec<u8>> = vec![];
let mut to_process: Option<Result<Option<Vec<u8>>,_>>
= Some(packet);
while let Some(packet) = to_process.take() {
assert!( to_process.is_none() );
dbg!(&reqs.len(), &upbound_total, &upbound.len());
+ //: impl futures::Stream<Cow<&[u8]>>
let body = hyper::body::Body::wrap_stream(
+ futures::stream::iter(
Itertools::intersperse(
upbound.into_iter().map(|u| Cow::from(u)),
Cow::from(&[SLIP_END] as &'static [u8])
- ).into()
+ ).map(Ok)
+ )
);
let req = hyper::Request::post(&ic.url).body(body)
.context("construct request")?;
let resp = hclient.request(req);
- let fut = Box::pin(tokio::time::timeout(
- ic.http_timeout,
- async {
- let r = async {
- let resp = resp.await.context("make request")?;
- if ! resp.status().is_success() {
- throw!(anyhow!("HTTP error status {}", &resp.status()));
- }
- let resp = resp.into_body();
- // xxx: some size limit to avoid mallocing the universe
- let resp = hyper::body::aggregate(resp).await
- .context("HTTP error fetching response body")?;
- Ok::<_,AE>(resp)
- }.await;
- if r.is_err() {
- tokio::time::sleep(ic.http_retry).await;
+ let fut = Box::pin(async {
+ let r = async { tokio::time::timeout( ic.http_timeout, async {
+ let resp = resp.await.context("make request")?;
+ if ! resp.status().is_success() {
+ throw!(anyhow!("HTTP error status {}", &resp.status()));
}
- r
+ let resp = resp.into_body();
+ // xxx: some size limit to avoid mallocing the universe
+ let resp = hyper::body::aggregate(resp).await
+ .context("HTTP error fetching response body")?;
+ Ok::<_,AE>(resp)
+ }).await? }.await;
+ if r.is_err() {
+ tokio::time::sleep(ic.http_retry).await;
}
- ));
+ r
+ });
reqs.push(fut);
}
{
reqs.swap_remove(goti);
if let Some(got) = reporter.report(got) {
- dbg!(got.len()); // xxx
+ dbg!(&got.remaining()); // xxx
}
}
}