chiark / gitweb /
break out submit_request
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Fri, 30 Jul 2021 18:25:47 +0000 (19:25 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Fri, 30 Jul 2021 18:25:47 +0000 (19:25 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs

index f449b683aefccb5ba597cd20211d55c4356b37a8..905e589f5b24096c0461d45cc06af6c0da5e16d5 100644 (file)
@@ -11,6 +11,52 @@ struct Client {
   rx_write_stream: something,
 }*/
 
+
+type OutstandingRequest<'r> = Pin<Box<
+    dyn Future<Output=Result<Bytes,AE>> + Send + 'r
+    >>;
+
+#[throws(AE)]
+fn submit_request<'r, 'ic:'r, 'hc:'r, C>(ic: &'ic InstanceConfig,
+                      hclient: &'hc Arc<hyper::Client<C>>,
+                      reqs: &mut Vec<OutstandingRequest<'r>>,
+                      upbound: Vec<Vec<u8>>,
+)
+where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
+ {
+          let body = hyper::body::Body::wrap_stream(
+            futures::stream::iter(
+              Itertools::intersperse(
+                upbound.into_iter().map(|u| Bytes::from(u)),
+                slip::SLIP_END_SLICE.into()
+              ).map(Ok::<_,Void>)
+            )
+          );
+
+          let req = hyper::Request::post(&ic.url).body(body)
+            .context("construct request")?;
+
+          let resp = hclient.request(req);
+          let fut = Box::pin(async move {
+            let r = async { tokio::time::timeout( ic.http_timeout, async {
+              let resp = resp.await.context("make request")?;
+              if ! resp.status().is_success() {
+                throw!(anyhow!("HTTP error status {}", &resp.status()));
+              }
+              let resp = resp.into_body();
+              // xxx: some size limit to avoid mallocing the universe
+              let resp = hyper::body::to_bytes(resp).await
+                .context("HTTP error fetching response body")?;
+              Ok::<_,AE>(resp)
+            }).await? }.await;
+            if r.is_err() {
+              tokio::time::sleep(ic.http_retry).await;
+            }
+            r
+          });
+          reqs.push(fut);
+}
+
 #[allow(unused_variables)] // xxx
 async fn run_client<C>(ic: InstanceConfig, hclient: Arc<hyper::Client<C>>)
                        -> Result<Void, AE>
@@ -44,9 +90,8 @@ where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
 
   let stream_for_rx = ipif.stdin .take().unwrap();
 
-  let mut reqs: Vec<Pin<Box<
-      dyn Future<Output=Result<Bytes,AE>> + Send
-      >>> = Vec::with_capacity(ic.max_requests_outstanding.sat());
+  let mut reqs: Vec<OutstandingRequest>
+    = Vec::with_capacity(ic.max_requests_outstanding.sat());
 
   // xxx check that ic settings are all honoured
 
@@ -86,37 +131,7 @@ where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
           dbg!(&reqs.len(), &upbound_total, &upbound.len());
 
           //: impl futures::Stream<Cow<&[u8]>>
-          let body = hyper::body::Body::wrap_stream(
-            futures::stream::iter(
-              Itertools::intersperse(
-                upbound.into_iter().map(|u| Bytes::from(u)),
-                slip::SLIP_END_SLICE.into()
-              ).map(Ok::<_,Void>)
-            )
-          );
-
-          let req = hyper::Request::post(&ic.url).body(body)
-            .context("construct request")?;
-
-          let resp = hclient.request(req);
-          let fut = Box::pin(async {
-            let r = async { tokio::time::timeout( ic.http_timeout, async {
-              let resp = resp.await.context("make request")?;
-              if ! resp.status().is_success() {
-                throw!(anyhow!("HTTP error status {}", &resp.status()));
-              }
-              let resp = resp.into_body();
-              // xxx: some size limit to avoid mallocing the universe
-              let resp = hyper::body::to_bytes(resp).await
-                .context("HTTP error fetching response body")?;
-              Ok::<_,AE>(resp)
-            }).await? }.await;
-            if r.is_err() {
-              tokio::time::sleep(ic.http_retry).await;
-            }
-            r
-          });
-          reqs.push(fut);
+          submit_request(&ic, &hclient, &mut reqs, upbound)?;
         }
 
         (got, goti, _) = async { future::select_all(&mut reqs).await },