chiark / gitweb /
limit response body size
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Wed, 4 Aug 2021 11:37:23 +0000 (12:37 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Wed, 4 Aug 2021 11:37:23 +0000 (12:37 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs
src/prelude.rs
src/utils.rs

index 0a7441e286172a2b6be20ed3ec9b71eaf1722650..da82193b3b850a1c00aa6231644453516ac528a5 100644 (file)
@@ -5,6 +5,8 @@
 use hippotat::prelude::*;
 use hippotat_macros::into_crlfs;
 
+const MAX_BATCH_DOWN_RESP_OVERHEAD: usize = 10_000;
+
 #[derive(StructOpt,Debug)]
 pub struct Opts {
   /// Increase debug level
@@ -16,7 +18,7 @@ pub struct Opts {
 }
 
 type OutstandingRequest<'r> = Pin<Box<
-    dyn Future<Output=Option<Bytes>> + Send + 'r
+    dyn Future<Output=Option<Box<[u8]>>> + Send + 'r
     >>;
 
 impl<T> HCC for T where
@@ -135,9 +137,8 @@ fn submit_request<'r, 'c:'r, C:HCC>(
       let resp = resp.await.context("make request")?;
       let status = resp.status();
       let resp = resp.into_body();
-      // xxx: some size limit to avoid mallocing the universe
-      let resp = hyper::body::to_bytes(resp).await
-        .context("HTTP error fetching response body")?;
+      let max_body = c.ic.max_batch_down.sat() + MAX_BATCH_DOWN_RESP_OVERHEAD;
+      let resp = read_limited_body(max_body, resp).await?;
 
       if ! status.is_success() {
         throw!(anyhow!("HTTP error status={} body={:?}",
index c60db81e50b10645e86482c3b8fb3ff8d82ad692..fd9fd835fdc1e9a91615828548efb2961b5e6898 100644 (file)
@@ -27,7 +27,7 @@ pub use anyhow::{anyhow, Context};
 pub use cervine::Cow as Cervine;
 pub use extend::ext;
 pub use fehler::{throw, throws};
-pub use futures::{poll, future};
+pub use futures::{poll, future, StreamExt as _};
 pub use hyper::body::{Bytes, Buf as _};
 pub use hyper::Uri;
 pub use hyper_tls::HttpsConnector;
index 37984e5ba32ee6128575ec9c062fb4c90d15a9b9..2271fd47cb2b0a97c7136bacd58d4ddbd63d1b8f 100644 (file)
@@ -16,6 +16,23 @@ impl<T,E> Result<T,E> where AE: From<E> {
   }
 }
 
+#[throws(AE)]
+pub async fn read_limited_body<S,E>(limit: usize, mut stream: S) -> Box<[u8]>
+where S: futures::Stream<Item=Result<hyper::body::Bytes,E>> + Unpin,
+      // we also require that the Stream is cancellation-safe
+      E: std::error::Error + Sync + Send + 'static,
+{
+  let mut accum = vec![];
+  while let Some(item) = stream.next().await {
+    let b = item.context("HTTP error fetching response body")?;
+    if accum.len() + b.len() > limit {
+      throw!(anyhow!("maximum response body size {} exceeded", limit));
+    }
+    accum.extend(b);
+  }
+  accum.into()
+}
+
 use sha2::Digest as _;
 
 type HmacH = sha2::Sha256;