use hippotat::prelude::*;
type OutstandingRequest<'r> = Pin<Box<
- dyn Future<Output=Result<Bytes,AE>> + Send + 'r
+ dyn Future<Output=Option<Bytes>> + Send + 'r
>>;
impl<T> HCC for T where
trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
#[throws(AE)]
-fn submit_request<'r, 'ic:'r, 'hc:'r, C:HCC>(
+fn submit_request<'r, 'ic:'r, 'hc:'r, 'rep:'r, C:HCC>(
ic: &'ic InstanceConfig,
hclient: &'hc Arc<hyper::Client<C>>,
+ reporter: &'rep parking_lot::Mutex<Reporter>,
reqs: &mut Vec<OutstandingRequest<'r>>,
upbound: Vec<Vec<u8>>
) {
// xxx: some size limit to avoid mallocing the universe
let resp = hyper::body::to_bytes(resp).await
.context("HTTP error fetching response body")?;
+
Ok::<_,AE>(resp)
}).await? }.await;
- if r.is_err() {
+
+ let r = reporter.lock().report(r);
+
+ if r.is_none() {
tokio::time::sleep(ic.http_retry).await;
}
r
{
debug!("{}: config: {:?}", &ic, &ic);
- let mut reporter = Reporter { };
+ let reporter = parking_lot::Mutex::new(Reporter { });
let mut ipif = tokio::process::Command::new("sh")
.args(&["-c", &ic.ipif])
dbg!(&reqs.len(), &upbound_total, &upbound.len());
//: impl futures::Stream<Cow<&[u8]>>
- submit_request(&ic, &hclient, &mut reqs, upbound)?;
+ submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?;
}
() = async { },
reqs.len() < ic.max_requests_outstanding.sat()) =>
{
let upbound = tx_defer.take().into_iter().collect_vec();
- submit_request(&ic, &hclient, &mut reqs, upbound)?;
+ submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?;
}
(got, goti, _) = async { future::select_all(&mut reqs).await },
if ! reqs.is_empty() =>
{
reqs.swap_remove(goti);
- if let Some(got) = reporter.report(got) {
+ if let Some(got) = got {
dbg!(&got.remaining()); // xxx
}
}