let r = async { tokio::time::timeout( c.ic.effective_http_timeout, async {
let resp = resp.await.context("make request")?;
let status = resp.status();
- let resp = resp.into_body();
+ let mut resp = resp.into_body();
let max_body = c.ic.max_batch_down.sat() + MAX_BATCH_DOWN_RESP_OVERHEAD;
- let resp = read_limited_body(max_body, resp).await?;
+ let resp = read_limited_bytes(max_body, &mut resp).await
+ .context("fetching response body")?;
if ! status.is_success() {
throw!(anyhow!("HTTP error status={} body={:?}",
}
}
-#[throws(AE)]
-pub async fn read_limited_body<S,E>(limit: usize, mut stream: S) -> Box<[u8]>
-where S: futures::Stream<Item=Result<hyper::body::Bytes,E>> + Unpin,
+#[derive(Error,Debug)]
+pub enum ReadLimitedError {
+ #[error("maximum size {limit} exceeded")]
+ Truncated { limit: usize },
+
+ #[error("HTTP error {0}")]
+ Hyper(#[from] hyper::Error),
+}
+
+#[throws(ReadLimitedError)]
+pub async fn read_limited_bytes<S>(limit: usize, stream: &mut S) -> Box<[u8]>
+where S: futures::Stream<Item=Result<hyper::body::Bytes,hyper::Error>>
+ + Debug + Unpin,
// we also require that the Stream is cancellation-safe
- E: std::error::Error + Sync + Send + 'static,
{
let mut accum = vec![];
while let Some(item) = stream.next().await {
- let b = item.context("HTTP error fetching response body")?;
- if accum.len() + b.len() > limit {
- throw!(anyhow!("maximum response body size {} exceeded", limit));
- }
+ let b = item?;
accum.extend(b);
+ if accum.len() > limit {
+ throw!(ReadLimitedError::Truncated { limit })
+ }
}
accum.into()
}