chiark / gitweb /
client: wip code
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Wed, 28 Jul 2021 23:54:17 +0000 (00:54 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Wed, 28 Jul 2021 23:54:17 +0000 (00:54 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs

index bb147acc19c860bca1bd6829dfd11df28c084db1..c7b1f450dbb9bd0b6d923187b240cd520ec2aa9f 100644 (file)
@@ -43,6 +43,8 @@ async fn run_client<C>(ic: InstanceConfig, hclient: Arc<hyper::Client<C>>)
 
   let mut reqs = Vec::with_capacity(ic.max_requests_outstanding.sat());
 
+  // xxx check that ic settings are all honoured
+
   async {
     loop {
       select! {
@@ -85,24 +87,37 @@ async fn run_client<C>(ic: InstanceConfig, hclient: Arc<hyper::Client<C>>)
                 Cow::from(&[SLIP_END] as &'static [u8])
               )
             ).context("construct request")?;
-  
-//          dbg!(&req);
-                          
-//          hclient.request
 
-/*
-          Body
-            made out of Stream
-            made out of futures::stream::iter
-            
-          
-          let datalen = 
-          
-          let o = 0;
-          let i = 
-*/
-          reqs.push(());
-          // xxx make new request
+          let resp = hclient.request(req);
+          let fut = Box::pin(tokio::timeout(
+            ic.http_timeout,
+            async {
+              let r = async {
+                let resp = resp.await;
+                if ! resp.status().is_success() {
+                  throw!("HTTP error status {}: {}", &resp.status());
+                }
+                let resp = resp.into_body();
+                // xxx: some size limit to avoid mallocing the universe
+                let resp = resp.aggregate().await
+                  .context("HTTP error fetching response body")?;
+                Ok::<_,AE>(resp)
+              };
+              if r.is_err() {
+                tokio::time::sleep(&ic.http_retry).await;
+              }
+              r
+            }
+          ));
+          reqs.push(fut);
+        }
+
+        (got, goti, _) = future::select_all(&mut reqs)
+        {
+          reqs.swap_remove(goti);
+          if let Some(got) = reporter.report(got) {
+            dbg!(got.len()); // xxx
+          }
         }
       }
     }