From: Ian Jackson Date: Sat, 31 Jul 2021 11:24:15 +0000 (+0100) Subject: introduce ClientContext X-Git-Tag: hippotat/1.0.0~419 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=76d4ac6402b1423c7e6aac4e88854befd5b592fa;p=hippotat.git introduce ClientContext Signed-off-by: Ian Jackson --- diff --git a/src/bin/client.rs b/src/bin/client.rs index c6922df..fc8e2e4 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -12,11 +12,15 @@ impl HCC for T where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { } trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { } +struct ClientContext<'c,C> { + ic: &'c InstanceConfig, + hclient: &'c Arc>, + reporter: &'c parking_lot::Mutex, +} + #[throws(AE)] -fn submit_request<'r, 'ic:'r, 'hc:'r, 'rep:'r, C:HCC>( - ic: &'ic InstanceConfig, - hclient: &'hc Arc>, - reporter: &'rep parking_lot::Mutex, +fn submit_request<'r, 'c:'r, C:HCC>( + c: &'c ClientContext, reqs: &mut Vec>, upbound: Vec> ) { @@ -29,12 +33,12 @@ fn submit_request<'r, 'ic:'r, 'hc:'r, 'rep:'r, C:HCC>( ) ); - let req = hyper::Request::post(&ic.url).body(body) + let req = hyper::Request::post(&c.ic.url).body(body) .context("construct request")?; - let resp = hclient.request(req); + let resp = c.hclient.request(req); let fut = Box::pin(async move { - let r = async { tokio::time::timeout( ic.http_timeout, async { + let r = async { tokio::time::timeout( c.ic.http_timeout, async { let resp = resp.await.context("make request")?; if ! resp.status().is_success() { throw!(anyhow!("HTTP error status {}", &resp.status())); @@ -47,10 +51,10 @@ fn submit_request<'r, 'ic:'r, 'hc:'r, 'rep:'r, C:HCC>( Ok::<_,AE>(resp) }).await? }.await; - let r = reporter.lock().report(r); + let r = c.reporter.lock().report(r); if r.is_none() { - tokio::time::sleep(ic.http_retry).await; + tokio::time::sleep(c.ic.http_retry).await; } r }); @@ -67,6 +71,12 @@ async fn run_client( let reporter = parking_lot::Mutex::new(Reporter { }); + let c = ClientContext { + reporter: &reporter, + hclient: &hclient, + ic: &ic, + }; + let mut ipif = tokio::process::Command::new("sh") .args(&["-c", &ic.ipif]) .stdin (process::Stdio::piped()) @@ -141,7 +151,7 @@ async fn run_client( dbg!(&reqs.len(), &upbound_total, &upbound.len()); //: impl futures::Stream> - submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?; + submit_request(&c, &mut reqs, upbound)?; } () = async { }, @@ -150,7 +160,7 @@ async fn run_client( reqs.len() < ic.max_requests_outstanding.sat()) => { let upbound = tx_defer.take().into_iter().collect_vec(); - submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?; + submit_request(&c, &mut reqs, upbound)?; } (got, goti, _) = async { future::select_all(&mut reqs).await },