chiark / gitweb /
Switch to reqwest in the client
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 1 Feb 2025 12:43:48 +0000 (12:43 +0000)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 1 Feb 2025 20:14:58 +0000 (20:14 +0000)
hyper 1.x doesn't have a connection pool.
We want that, so let's use reqwest.

The URL type has changed, annoyingly.

Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
client/client.rs
src/config.rs
src/prelude.rs

index e3a57164fe13bb2589ce087ee8c84332e3d2f6fa..75c45626e18e9207e8a413be46751e6fc3093ee3 100644 (file)
@@ -38,13 +38,9 @@ type OutstandingRequest<'r> = Pin<Box<
     dyn Future<Output=Option<Box<[u8]>>> + Send + 'r
     >>;
 
-impl<T> Hcc for T where
-        T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
-trait Hcc: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
-
-struct ClientContext<'c,C> {
+struct ClientContext<'c> {
   ic: &'c InstanceConfig,
-  hclient: &'c Arc<hyper::Client<C>>,
+  hclient: &'c reqwest::Client,
   reporter: &'c parking_lot::Mutex<Reporter<'c>>,
 }
 
@@ -55,8 +51,8 @@ struct TxQueued {
 }
 
 #[throws(AE)]
-fn submit_request<'r, 'c:'r, C:Hcc>(
-  c: &'c ClientContext<C>,
+fn submit_request<'r, 'c:'r>(
+  c: &'c ClientContext,
   req_num: &mut ReqNum,
   reqs: &mut Vec<OutstandingRequest<'r>>,
   upbound: FramesData,
@@ -139,37 +135,41 @@ fn submit_request<'r, 'c:'r, C:Hcc>(
   trace!("{} #{}: req; tx body_len={} frames={}",
          &c.ic, req_num, body_len, upbound.len());
 
-  let body = hyper::body::Body::wrap_stream(
+  let body = http_body_util::StreamBody::new(
     futures::stream::iter(
       content!(
         Bytes,
         into_iter,
         into,
-      ).map(Ok::<Bytes,Void>)
+      ).map(|by| Ok::<_, Void>(http_body::Frame::data(by)))
     )
   );
 
   let req = {
+    let url = c.ic.url.clone();
+    let mut req = reqwest::Request::new(reqwest::Method::POST, url);
+    let h = req.headers_mut();
     let ctype = r#"multipart/form-data; boundary="b""#;
-    let req = hyper::Request::post(&c.ic.url)
-      .header("Content-Type", ctype)
-      .header("Content-Length", body_len)
-      .body(body)
-      .context("construct request")?;
+    let ctype = reqwest::header::HeaderValue::from_static(ctype);
+    h.insert("Content-Type", ctype);
+    *req.body_mut() = Some(reqwest::Body::wrap(body));
     req
   };
 
-  let resp = c.hclient.request(req);
+  let resp = c.hclient.execute(req);
   let fut = Box::pin(async move {
     let r = async { tokio::time::timeout( c.ic.effective_http_timeout, async {
       let resp = resp.await.context("make request")?;
       let status = resp.status();
-      let mut resp = resp.into_body();
       let max_body = c.ic.max_batch_down.sat() + MAX_OVERHEAD;
+      let body = futures::stream::unfold(resp, |mut resp| async {
+        resp.chunk().await.transpose().map(|r| (r, resp))
+      });
+      pin!(body);
       let resp = read_limited_bytes(
-        max_body, default(), default(), Pin::new(&mut resp),
+        max_body, default(), default(), body.as_mut(),
       ).await
-        .discard_data().context("fetching response body")?;
+        .context("fetching response body")?;
 
       if ! status.is_success() {
         throw!(anyhow!("HTTP error status={} body={:?}",
@@ -191,9 +191,9 @@ fn submit_request<'r, 'c:'r, C:Hcc>(
   reqs.push(fut);
 }
 
-async fn run_client<C:Hcc>(
+async fn run_client(
   ic: InstanceConfig,
-  hclient: Arc<hyper::Client<C>>
+  hclient: reqwest::Client,
 ) -> Result<Void, AE>
 {
   debug!("{}: config: {:?}", &ic, &ic);
@@ -353,11 +353,9 @@ async fn main() {
     Ok((ics,))
   });
 
-  let https = HttpsConnector::new();
-  let hclient = hyper::Client::builder()
-    .http1_preserve_header_case(true)
-    .build::<_, hyper::Body>(https);
-  let hclient = Arc::new(hclient);
+  let hclient = reqwest::Client::builder()
+    .http1_title_case_headers()
+    .build().expect("build reqwest Client");
 
   info!("starting");
   let () = future::select_all(
index aa76686dc960f8b060e0bea5bdde57393d5f50ec..0a38080f1ae6e82dba803c349a670e2daceff7af 100644 (file)
@@ -36,7 +36,7 @@ pub struct InstanceConfig {
   #[client]  pub max_requests_outstanding:     u32,
   #[client]  pub http_retry:                   Duration,
   #[client]  pub success_report_interval:      Duration,
-  #[client]  pub url:                          Uri,
+  #[client]  pub url:                          Url,
   #[client]  pub vroutes:                      Vec<IpNet>,
   #[client]  pub ifname_client:                String,
 
@@ -203,7 +203,7 @@ impl_inspectable_config_value!{ ServerName as Display }
 impl_inspectable_config_value!{ ClientName as Display }
 impl_inspectable_config_value!{ u16 as Display }
 impl_inspectable_config_value!{ u32 as Display }
-impl_inspectable_config_value!{ hyper::Uri as Display }
+impl_inspectable_config_value!{ reqwest::Url as Display }
 
 impl_inspectable_config_value!{ IpAddr as Display }
 impl_inspectable_config_value!{ ipnet::IpNet as Display }
@@ -660,7 +660,12 @@ parseable_from_str!{u32, default() }
 parseable_from_str!{String, default() }
 parseable_from_str!{IpNet, default() }
 parseable_from_str!{IpAddr, Ipv4Addr::UNSPECIFIED.into() }
-parseable_from_str!{Uri, default() }
+
+parseable_from_str!{
+  Url,
+  "hippotat-unspecified:".parse()
+    .expect("failed to parse `hippotat-unspecified:` as a url")
+}
 
 impl<T:Parseable> Parseable for Vec<T> {
   #[throws(AE)]
@@ -880,7 +885,7 @@ impl InstanceConfig {
 
     match end {
       LinkEnd::Client => {
-        if self.url == Uri::unspecified() {
+        if self.url == Url::unspecified() {
           let addr = self.addrs.get(0).ok_or_else(
             || anyhow!("client needs addrs or url set")
           )?;
index f39ae74a5863a44ff414f0307b1045b93a519f8c..a0d5a93886b1b73ba67ff1111ac764a958486c80 100644 (file)
@@ -33,7 +33,7 @@ pub use easy_ext::ext;
 pub use fehler::{throw, throws};
 pub use futures::{poll, future, FutureExt, StreamExt, TryStreamExt};
 pub use hyper::body::{Bytes, Buf, HttpBody};
-pub use hyper::{Method, Uri};
+pub use hyper::{Method};
 pub use hyper_tls::HttpsConnector;
 pub use ipnet::IpNet;
 pub use itertools::{iproduct, izip, Itertools};
@@ -42,6 +42,7 @@ pub use lazy_static::lazy_static;
 pub use log::{trace, debug, info, warn, error};
 pub use memchr::memmem;
 pub use pin_project_lite::pin_project;
+pub use reqwest::Url;
 pub use subtle::ConstantTimeEq;
 pub use thiserror::Error;
 pub use tokio::io::{AsyncBufReadExt, AsyncWriteExt};