use hippotat::prelude::*;
-/*
-struct Client {
- requests_outstanding: Vec<>,
- tx_read_stream: something,
- rx_write_stream: something,
-}*/
-
-
type OutstandingRequest<'r> = Pin<Box<
dyn Future<Output=Result<Bytes,AE>> + Send + 'r
>>;
+impl<T> HCC for T where
+ T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
+trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
+
#[throws(AE)]
-fn submit_request<'r, 'ic:'r, 'hc:'r, C>(ic: &'ic InstanceConfig,
- hclient: &'hc Arc<hyper::Client<C>>,
- reqs: &mut Vec<OutstandingRequest<'r>>,
- upbound: Vec<Vec<u8>>,
-)
-where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
- {
- let body = hyper::body::Body::wrap_stream(
- futures::stream::iter(
- Itertools::intersperse(
- upbound.into_iter().map(|u| Bytes::from(u)),
- slip::SLIP_END_SLICE.into()
- ).map(Ok::<_,Void>)
- )
- );
-
- let req = hyper::Request::post(&ic.url).body(body)
- .context("construct request")?;
-
- let resp = hclient.request(req);
- let fut = Box::pin(async move {
- let r = async { tokio::time::timeout( ic.http_timeout, async {
- let resp = resp.await.context("make request")?;
- if ! resp.status().is_success() {
- throw!(anyhow!("HTTP error status {}", &resp.status()));
- }
- let resp = resp.into_body();
- // xxx: some size limit to avoid mallocing the universe
- let resp = hyper::body::to_bytes(resp).await
- .context("HTTP error fetching response body")?;
- Ok::<_,AE>(resp)
- }).await? }.await;
- if r.is_err() {
- tokio::time::sleep(ic.http_retry).await;
- }
- r
- });
- reqs.push(fut);
+fn submit_request<'r, 'ic:'r, 'hc:'r, C:HCC>(
+ ic: &'ic InstanceConfig,
+ hclient: &'hc Arc<hyper::Client<C>>,
+ reqs: &mut Vec<OutstandingRequest<'r>>,
+ upbound: Vec<Vec<u8>>
+) {
+ let body = hyper::body::Body::wrap_stream(
+ futures::stream::iter(
+ Itertools::intersperse(
+ upbound.into_iter().map(|u| Bytes::from(u)),
+ slip::SLIP_END_SLICE.into()
+ ).map(Ok::<_,Void>)
+ )
+ );
+
+ let req = hyper::Request::post(&ic.url).body(body)
+ .context("construct request")?;
+
+ let resp = hclient.request(req);
+ let fut = Box::pin(async move {
+ let r = async { tokio::time::timeout( ic.http_timeout, async {
+ let resp = resp.await.context("make request")?;
+ if ! resp.status().is_success() {
+ throw!(anyhow!("HTTP error status {}", &resp.status()));
+ }
+ let resp = resp.into_body();
+ // xxx: some size limit to avoid mallocing the universe
+ let resp = hyper::body::to_bytes(resp).await
+ .context("HTTP error fetching response body")?;
+ Ok::<_,AE>(resp)
+ }).await? }.await;
+ if r.is_err() {
+ tokio::time::sleep(ic.http_retry).await;
+ }
+ r
+ });
+ reqs.push(fut);
}
#[allow(unused_variables)] // xxx
-async fn run_client<C>(ic: InstanceConfig, hclient: Arc<hyper::Client<C>>)
- -> Result<Void, AE>
-where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
+async fn run_client<C:HCC>(
+ ic: InstanceConfig,
+ hclient: Arc<hyper::Client<C>>
+) -> Result<Void, AE>
{
debug!("{}: config: {:?}", &ic, &ic);