From 7f7477b74bc8bafacf4b3c7e67f3c406d815aee5 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Wed, 4 Aug 2021 12:37:23 +0100 Subject: [PATCH] limit response body size Signed-off-by: Ian Jackson --- src/bin/client.rs | 9 +++++---- src/prelude.rs | 2 +- src/utils.rs | 17 +++++++++++++++++ 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index 0a7441e..da82193 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -5,6 +5,8 @@ use hippotat::prelude::*; use hippotat_macros::into_crlfs; +const MAX_BATCH_DOWN_RESP_OVERHEAD: usize = 10_000; + #[derive(StructOpt,Debug)] pub struct Opts { /// Increase debug level @@ -16,7 +18,7 @@ pub struct Opts { } type OutstandingRequest<'r> = Pin> + Send + 'r + dyn Future>> + Send + 'r >>; impl HCC for T where @@ -135,9 +137,8 @@ fn submit_request<'r, 'c:'r, C:HCC>( let resp = resp.await.context("make request")?; let status = resp.status(); let resp = resp.into_body(); - // xxx: some size limit to avoid mallocing the universe - let resp = hyper::body::to_bytes(resp).await - .context("HTTP error fetching response body")?; + let max_body = c.ic.max_batch_down.sat() + MAX_BATCH_DOWN_RESP_OVERHEAD; + let resp = read_limited_body(max_body, resp).await?; if ! status.is_success() { throw!(anyhow!("HTTP error status={} body={:?}", diff --git a/src/prelude.rs b/src/prelude.rs index c60db81..fd9fd83 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -27,7 +27,7 @@ pub use anyhow::{anyhow, Context}; pub use cervine::Cow as Cervine; pub use extend::ext; pub use fehler::{throw, throws}; -pub use futures::{poll, future}; +pub use futures::{poll, future, StreamExt as _}; pub use hyper::body::{Bytes, Buf as _}; pub use hyper::Uri; pub use hyper_tls::HttpsConnector; diff --git a/src/utils.rs b/src/utils.rs index 37984e5..2271fd4 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -16,6 +16,23 @@ impl Result where AE: From { } } +#[throws(AE)] +pub async fn read_limited_body(limit: usize, mut stream: S) -> Box<[u8]> +where S: futures::Stream> + Unpin, + // we also require that the Stream is cancellation-safe + E: std::error::Error + Sync + Send + 'static, +{ + let mut accum = vec![]; + while let Some(item) = stream.next().await { + let b = item.context("HTTP error fetching response body")?; + if accum.len() + b.len() > limit { + throw!(anyhow!("maximum response body size {} exceeded", limit)); + } + accum.extend(b); + } + accum.into() +} + use sha2::Digest as _; type HmacH = sha2::Sha256; -- 2.30.2