chiark / gitweb /
introduce ClientContext
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 31 Jul 2021 11:24:15 +0000 (12:24 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 31 Jul 2021 11:24:15 +0000 (12:24 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs

index c6922dfa2cebcaa2ef8d703ae6bc684bd8ca0043..fc8e2e4ef40548051a6fc5d2bdd78253ed54d8e2 100644 (file)
@@ -12,11 +12,15 @@ impl<T> HCC for T where
         T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
 trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
 
+struct ClientContext<'c,C> {
+  ic: &'c InstanceConfig,
+  hclient: &'c Arc<hyper::Client<C>>,
+  reporter: &'c parking_lot::Mutex<Reporter>,
+}
+
 #[throws(AE)]
-fn submit_request<'r, 'ic:'r, 'hc:'r, 'rep:'r, C:HCC>(
-  ic: &'ic InstanceConfig,
-  hclient: &'hc Arc<hyper::Client<C>>,
-  reporter: &'rep parking_lot::Mutex<Reporter>,
+fn submit_request<'r, 'c:'r, C:HCC>(
+  c: &'c ClientContext<C>,
   reqs: &mut Vec<OutstandingRequest<'r>>,
   upbound: Vec<Vec<u8>>
 ) {
@@ -29,12 +33,12 @@ fn submit_request<'r, 'ic:'r, 'hc:'r, 'rep:'r, C:HCC>(
     )
   );
 
-  let req = hyper::Request::post(&ic.url).body(body)
+  let req = hyper::Request::post(&c.ic.url).body(body)
     .context("construct request")?;
 
-  let resp = hclient.request(req);
+  let resp = c.hclient.request(req);
   let fut = Box::pin(async move {
-    let r = async { tokio::time::timeout( ic.http_timeout, async {
+    let r = async { tokio::time::timeout( c.ic.http_timeout, async {
       let resp = resp.await.context("make request")?;
       if ! resp.status().is_success() {
         throw!(anyhow!("HTTP error status {}", &resp.status()));
@@ -47,10 +51,10 @@ fn submit_request<'r, 'ic:'r, 'hc:'r, 'rep:'r, C:HCC>(
       Ok::<_,AE>(resp)
     }).await? }.await;
 
-    let r = reporter.lock().report(r);
+    let r = c.reporter.lock().report(r);
 
     if r.is_none() {
-      tokio::time::sleep(ic.http_retry).await;
+      tokio::time::sleep(c.ic.http_retry).await;
     }
     r
   });
@@ -67,6 +71,12 @@ async fn run_client<C:HCC>(
 
   let reporter = parking_lot::Mutex::new(Reporter { });
 
+  let c = ClientContext {
+    reporter: &reporter,
+    hclient: &hclient,
+    ic: &ic,
+  };
+
   let mut ipif = tokio::process::Command::new("sh")
     .args(&["-c", &ic.ipif])
     .stdin (process::Stdio::piped())
@@ -141,7 +151,7 @@ async fn run_client<C:HCC>(
           dbg!(&reqs.len(), &upbound_total, &upbound.len());
 
           //: impl futures::Stream<Cow<&[u8]>>
-          submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?;
+          submit_request(&c, &mut reqs, upbound)?;
         }
 
         () = async { },
@@ -150,7 +160,7 @@ async fn run_client<C:HCC>(
             reqs.len() < ic.max_requests_outstanding.sat()) =>
         {
           let upbound = tx_defer.take().into_iter().collect_vec();
-          submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?;
+          submit_request(&c, &mut reqs, upbound)?;
         }
 
         (got, goti, _) = async { future::select_all(&mut reqs).await },