use hippotat::prelude::*;
use hippotat_macros::into_crlfs;
+const MAX_BATCH_DOWN_RESP_OVERHEAD: usize = 10_000;
+
#[derive(StructOpt,Debug)]
pub struct Opts {
/// Increase debug level
}
type OutstandingRequest<'r> = Pin<Box<
- dyn Future<Output=Option<Bytes>> + Send + 'r
+ dyn Future<Output=Option<Box<[u8]>>> + Send + 'r
>>;
impl<T> HCC for T where
let resp = resp.await.context("make request")?;
let status = resp.status();
let resp = resp.into_body();
- // xxx: some size limit to avoid mallocing the universe
- let resp = hyper::body::to_bytes(resp).await
- .context("HTTP error fetching response body")?;
+ let max_body = c.ic.max_batch_down.sat() + MAX_BATCH_DOWN_RESP_OVERHEAD;
+ let resp = read_limited_body(max_body, resp).await?;
if ! status.is_success() {
throw!(anyhow!("HTTP error status={} body={:?}",
pub use cervine::Cow as Cervine;
pub use extend::ext;
pub use fehler::{throw, throws};
-pub use futures::{poll, future};
+pub use futures::{poll, future, StreamExt as _};
pub use hyper::body::{Bytes, Buf as _};
pub use hyper::Uri;
pub use hyper_tls::HttpsConnector;
}
}
+#[throws(AE)]
+pub async fn read_limited_body<S,E>(limit: usize, mut stream: S) -> Box<[u8]>
+where S: futures::Stream<Item=Result<hyper::body::Bytes,E>> + Unpin,
+ // we also require that the Stream is cancellation-safe
+ E: std::error::Error + Sync + Send + 'static,
+{
+ let mut accum = vec![];
+ while let Some(item) = stream.next().await {
+ let b = item.context("HTTP error fetching response body")?;
+ if accum.len() + b.len() > limit {
+ throw!(anyhow!("maximum response body size {} exceeded", limit));
+ }
+ accum.extend(b);
+ }
+ accum.into()
+}
+
use sha2::Digest as _;
type HmacH = sha2::Sha256;