chiark / gitweb /
new max_batch_down protocol, define
[hippotat.git] / src / bin / client.rs
index d9593125bf5a2b544a0b834f66a9904ba5a57082..70fe9b96af78e000903f16e3108435121ecddc55 100644 (file)
@@ -25,6 +25,7 @@ trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
 
 struct ClientContext<'c,C> {
   ic: &'c InstanceConfig,
+  effective_http_timeout: Duration,
   hclient: &'c Arc<hyper::Client<C>>,
   reporter: &'c parking_lot::Mutex<Reporter<'c>>,
 }
@@ -60,11 +61,13 @@ fn submit_request<'r, 'c:'r, C:HCC>(
        {}
        {}
        {}
+       {}
        {}"#),
                        &c.ic.link.client,
                        token,
                        c.ic.target_requests_outstanding,
                        show_timeout,
+                       c.ic.max_batch_down,
   );
 
   let prefix2 = format!(into_crlfs!(
@@ -128,7 +131,7 @@ fn submit_request<'r, 'c:'r, C:HCC>(
 
   let resp = c.hclient.request(req);
   let fut = Box::pin(async move {
-    let r = async { tokio::time::timeout( c.ic.http_timeout, async {
+    let r = async { tokio::time::timeout( c.effective_http_timeout, async {
       let resp = resp.await.context("make request")?;
       let status = resp.status();
       let resp = resp.into_body();
@@ -137,7 +140,6 @@ fn submit_request<'r, 'c:'r, C:HCC>(
         .context("HTTP error fetching response body")?;
 
       if ! status.is_success() {
-        // xxx get body and log it
         throw!(anyhow!("HTTP error status={} body={:?}",
                        &status, String::from_utf8_lossy(&resp)));
       }
@@ -155,9 +157,6 @@ fn submit_request<'r, 'c:'r, C:HCC>(
   reqs.push(fut);
 }
 
-#[allow(unused_variables)] // xxx
-#[allow(unused_mut)] // xxx
-#[allow(dead_code)] // xxx
 async fn run_client<C:HCC>(
   ic: InstanceConfig,
   hclient: Arc<hyper::Client<C>>
@@ -171,6 +170,9 @@ async fn run_client<C:HCC>(
     reporter: &reporter,
     hclient: &hclient,
     ic: &ic,
+    effective_http_timeout: ic.http_timeout.checked_add(ic.http_timeout_grace)
+      .ok_or_else(|| anyhow!("calculate effective http timeout ({:?} + {:?})",
+                             ic.http_timeout, ic.http_timeout_grace))?,
   };
 
   let mut ipif = tokio::process::Command::new("sh")
@@ -260,7 +262,6 @@ async fn run_client<C:HCC>(
           (reqs.len() < ic.target_requests_outstanding.sat() ||
            (reqs.len() < ic.max_requests_outstanding.sat() &&
             ! upbound.is_empty()))
-          // xxx backpressure, if too much in rx_queue
           =>
         {
           submit_request(&c, &mut req_num, &mut reqs,