T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
+struct ClientContext<'c,C> {
+ ic: &'c InstanceConfig,
+ hclient: &'c Arc<hyper::Client<C>>,
+ reporter: &'c parking_lot::Mutex<Reporter>,
+}
+
#[throws(AE)]
-fn submit_request<'r, 'ic:'r, 'hc:'r, 'rep:'r, C:HCC>(
- ic: &'ic InstanceConfig,
- hclient: &'hc Arc<hyper::Client<C>>,
- reporter: &'rep parking_lot::Mutex<Reporter>,
+fn submit_request<'r, 'c:'r, C:HCC>(
+ c: &'c ClientContext<C>,
reqs: &mut Vec<OutstandingRequest<'r>>,
upbound: Vec<Vec<u8>>
) {
)
);
- let req = hyper::Request::post(&ic.url).body(body)
+ let req = hyper::Request::post(&c.ic.url).body(body)
.context("construct request")?;
- let resp = hclient.request(req);
+ let resp = c.hclient.request(req);
let fut = Box::pin(async move {
- let r = async { tokio::time::timeout( ic.http_timeout, async {
+ let r = async { tokio::time::timeout( c.ic.http_timeout, async {
let resp = resp.await.context("make request")?;
if ! resp.status().is_success() {
throw!(anyhow!("HTTP error status {}", &resp.status()));
Ok::<_,AE>(resp)
}).await? }.await;
- let r = reporter.lock().report(r);
+ let r = c.reporter.lock().report(r);
if r.is_none() {
- tokio::time::sleep(ic.http_retry).await;
+ tokio::time::sleep(c.ic.http_retry).await;
}
r
});
let reporter = parking_lot::Mutex::new(Reporter { });
+ let c = ClientContext {
+ reporter: &reporter,
+ hclient: &hclient,
+ ic: &ic,
+ };
+
let mut ipif = tokio::process::Command::new("sh")
.args(&["-c", &ic.ipif])
.stdin (process::Stdio::piped())
dbg!(&reqs.len(), &upbound_total, &upbound.len());
//: impl futures::Stream<Cow<&[u8]>>
- submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?;
+ submit_request(&c, &mut reqs, upbound)?;
}
() = async { },
reqs.len() < ic.max_requests_outstanding.sat()) =>
{
let upbound = tx_defer.take().into_iter().collect_vec();
- submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?;
+ submit_request(&c, &mut reqs, upbound)?;
}
(got, goti, _) = async { future::select_all(&mut reqs).await },