From: Ian Jackson Date: Sun, 8 Aug 2021 14:35:26 +0000 (+0100) Subject: refactor body limit, for reuse in server X-Git-Tag: hippotat/1.0.0~216 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=7e2bca555e99f2c6385d716cdc56faa5a364c2e2;p=hippotat.git refactor body limit, for reuse in server Signed-off-by: Ian Jackson --- diff --git a/src/bin/client.rs b/src/bin/client.rs index 9c2f297..29b6ef9 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -142,9 +142,10 @@ fn submit_request<'r, 'c:'r, C:HCC>( let r = async { tokio::time::timeout( c.ic.effective_http_timeout, async { let resp = resp.await.context("make request")?; let status = resp.status(); - let resp = resp.into_body(); + let mut resp = resp.into_body(); let max_body = c.ic.max_batch_down.sat() + MAX_BATCH_DOWN_RESP_OVERHEAD; - let resp = read_limited_body(max_body, resp).await?; + let resp = read_limited_bytes(max_body, &mut resp).await + .context("fetching response body")?; if ! status.is_success() { throw!(anyhow!("HTTP error status={} body={:?}", diff --git a/src/utils.rs b/src/utils.rs index 2271fd4..b84d2f2 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -16,19 +16,28 @@ impl Result where AE: From { } } -#[throws(AE)] -pub async fn read_limited_body(limit: usize, mut stream: S) -> Box<[u8]> -where S: futures::Stream> + Unpin, +#[derive(Error,Debug)] +pub enum ReadLimitedError { + #[error("maximum size {limit} exceeded")] + Truncated { limit: usize }, + + #[error("HTTP error {0}")] + Hyper(#[from] hyper::Error), +} + +#[throws(ReadLimitedError)] +pub async fn read_limited_bytes(limit: usize, stream: &mut S) -> Box<[u8]> +where S: futures::Stream> + + Debug + Unpin, // we also require that the Stream is cancellation-safe - E: std::error::Error + Sync + Send + 'static, { let mut accum = vec![]; while let Some(item) = stream.next().await { - let b = item.context("HTTP error fetching response body")?; - if accum.len() + b.len() > limit { - throw!(anyhow!("maximum response body size {} exceeded", limit)); - } + let b = item?; accum.extend(b); + if accum.len() > limit { + throw!(ReadLimitedError::Truncated { limit }) + } } accum.into() }