chiark / gitweb /
client: wip code
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Thu, 29 Jul 2021 00:31:29 +0000 (01:31 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Thu, 29 Jul 2021 00:31:29 +0000 (01:31 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs
src/prelude.rs

index 6cdb45c94ac3e86089d6f881cf16efde60b64059..9e9028ca33aff9cb03a073e74f989352e738ad7e 100644 (file)
@@ -56,7 +56,7 @@ where C: hyper::client::connect::Connect + Clone + Send + Sync,
            reqs.len() < ic.max_requests_outstanding.sat() =>
         {
           let mut upbound_total = 0;
-          let mut upbound = vec![];
+          let mut upbound: Vec<Vec<u8>> = vec![];
           let mut to_process: Option<Result<Option<Vec<u8>>,_>>
             = Some(packet);
           while let Some(packet) = to_process.take() {
@@ -83,37 +83,37 @@ where C: hyper::client::connect::Connect + Clone + Send + Sync,
           assert!( to_process.is_none() );
           dbg!(&reqs.len(), &upbound_total, &upbound.len());
 
+          //: impl futures::Stream<Cow<&[u8]>>
           let body = hyper::body::Body::wrap_stream(
+            futures::stream::iter(
               Itertools::intersperse(
                 upbound.into_iter().map(|u| Cow::from(u)),
                 Cow::from(&[SLIP_END] as &'static [u8])
-              ).into()
+              ).map(Ok)
+            )
           );
 
           let req = hyper::Request::post(&ic.url).body(body)
             .context("construct request")?;
 
           let resp = hclient.request(req);
-          let fut = Box::pin(tokio::time::timeout(
-            ic.http_timeout,
-            async {
-              let r = async {
-                let resp = resp.await.context("make request")?;
-                if ! resp.status().is_success() {
-                  throw!(anyhow!("HTTP error status {}", &resp.status()));
-                }
-                let resp = resp.into_body();
-                // xxx: some size limit to avoid mallocing the universe
-                let resp = hyper::body::aggregate(resp).await
-                  .context("HTTP error fetching response body")?;
-                Ok::<_,AE>(resp)
-              }.await;
-              if r.is_err() {
-                tokio::time::sleep(ic.http_retry).await;
+          let fut = Box::pin(async {
+            let r = async { tokio::time::timeout( ic.http_timeout, async {
+              let resp = resp.await.context("make request")?;
+              if ! resp.status().is_success() {
+                throw!(anyhow!("HTTP error status {}", &resp.status()));
               }
-              r
+              let resp = resp.into_body();
+              // xxx: some size limit to avoid mallocing the universe
+              let resp = hyper::body::aggregate(resp).await
+                .context("HTTP error fetching response body")?;
+              Ok::<_,AE>(resp)
+            }).await? }.await;
+            if r.is_err() {
+              tokio::time::sleep(ic.http_retry).await;
             }
-          ));
+            r
+          });
           reqs.push(fut);
         }
 
@@ -121,7 +121,7 @@ where C: hyper::client::connect::Connect + Clone + Send + Sync,
         {
           reqs.swap_remove(goti);
           if let Some(got) = reporter.report(got) {
-            dbg!(got.len()); // xxx
+            dbg!(&got.remaining()); // xxx
           }
         }
       }
index 08cc6c595a3dcf6c7d3ac2b786bdc4a26e358beb..f1afa4ee99a74364503cca54ea783cc8b45953c3 100644 (file)
@@ -23,6 +23,7 @@ pub use anyhow::{anyhow, Context};
 pub use extend::ext;
 pub use fehler::{throw, throws};
 pub use futures::{poll, future};
+pub use hyper::body::{Buf as _};
 pub use hyper::Uri;
 pub use hyper_tls::HttpsConnector;
 pub use ipnet::IpNet;