chiark / gitweb /
Report when http req fails, not after retry delay
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 31 Jul 2021 11:20:01 +0000 (12:20 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 31 Jul 2021 11:20:01 +0000 (12:20 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs

index 2bbde41f427575f1c042a5c669ecf51d12a669a3..c6922dfa2cebcaa2ef8d703ae6bc684bd8ca0043 100644 (file)
@@ -5,7 +5,7 @@
 use hippotat::prelude::*;
 
 type OutstandingRequest<'r> = Pin<Box<
-    dyn Future<Output=Result<Bytes,AE>> + Send + 'r
+    dyn Future<Output=Option<Bytes>> + Send + 'r
     >>;
 
 impl<T> HCC for T where
@@ -13,9 +13,10 @@ impl<T> HCC for T where
 trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
 
 #[throws(AE)]
-fn submit_request<'r, 'ic:'r, 'hc:'r, C:HCC>(
+fn submit_request<'r, 'ic:'r, 'hc:'r, 'rep:'r, C:HCC>(
   ic: &'ic InstanceConfig,
   hclient: &'hc Arc<hyper::Client<C>>,
+  reporter: &'rep parking_lot::Mutex<Reporter>,
   reqs: &mut Vec<OutstandingRequest<'r>>,
   upbound: Vec<Vec<u8>>
 ) {
@@ -42,9 +43,13 @@ fn submit_request<'r, 'ic:'r, 'hc:'r, C:HCC>(
       // xxx: some size limit to avoid mallocing the universe
       let resp = hyper::body::to_bytes(resp).await
         .context("HTTP error fetching response body")?;
+
       Ok::<_,AE>(resp)
     }).await? }.await;
-    if r.is_err() {
+
+    let r = reporter.lock().report(r);
+
+    if r.is_none() {
       tokio::time::sleep(ic.http_retry).await;
     }
     r
@@ -60,7 +65,7 @@ async fn run_client<C:HCC>(
 {
   debug!("{}: config: {:?}", &ic, &ic);
 
-  let mut reporter = Reporter { };
+  let reporter = parking_lot::Mutex::new(Reporter { });
 
   let mut ipif = tokio::process::Command::new("sh")
     .args(&["-c", &ic.ipif])
@@ -136,7 +141,7 @@ async fn run_client<C:HCC>(
           dbg!(&reqs.len(), &upbound_total, &upbound.len());
 
           //: impl futures::Stream<Cow<&[u8]>>
-          submit_request(&ic, &hclient, &mut reqs, upbound)?;
+          submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?;
         }
 
         () = async { },
@@ -145,14 +150,14 @@ async fn run_client<C:HCC>(
             reqs.len() < ic.max_requests_outstanding.sat()) =>
         {
           let upbound = tx_defer.take().into_iter().collect_vec();
-          submit_request(&ic, &hclient, &mut reqs, upbound)?;
+          submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?;
         }
 
         (got, goti, _) = async { future::select_all(&mut reqs).await },
           if ! reqs.is_empty() =>
         {
           reqs.swap_remove(goti);
-          if let Some(got) = reporter.report(got) {
+          if let Some(got) = got {
             dbg!(&got.remaining()); // xxx
           }
         }