From: Ian Jackson Date: Sat, 1 Feb 2025 12:43:48 +0000 (+0000) Subject: Switch to reqwest in the client X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=868d32c7192967adcbe8345e5eceb38b18bebb40;p=hippotat.git Switch to reqwest in the client hyper 1.x doesn't have a connection pool. We want that, so let's use reqwest. The URL type has changed, annoyingly. Signed-off-by: Ian Jackson --- diff --git a/client/client.rs b/client/client.rs index e3a5716..75c4562 100644 --- a/client/client.rs +++ b/client/client.rs @@ -38,13 +38,9 @@ type OutstandingRequest<'r> = Pin>> + Send + 'r >>; -impl Hcc for T where - T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { } -trait Hcc: hyper::client::connect::Connect + Clone + Send + Sync + 'static { } - -struct ClientContext<'c,C> { +struct ClientContext<'c> { ic: &'c InstanceConfig, - hclient: &'c Arc>, + hclient: &'c reqwest::Client, reporter: &'c parking_lot::Mutex>, } @@ -55,8 +51,8 @@ struct TxQueued { } #[throws(AE)] -fn submit_request<'r, 'c:'r, C:Hcc>( - c: &'c ClientContext, +fn submit_request<'r, 'c:'r>( + c: &'c ClientContext, req_num: &mut ReqNum, reqs: &mut Vec>, upbound: FramesData, @@ -139,37 +135,41 @@ fn submit_request<'r, 'c:'r, C:Hcc>( trace!("{} #{}: req; tx body_len={} frames={}", &c.ic, req_num, body_len, upbound.len()); - let body = hyper::body::Body::wrap_stream( + let body = http_body_util::StreamBody::new( futures::stream::iter( content!( Bytes, into_iter, into, - ).map(Ok::) + ).map(|by| Ok::<_, Void>(http_body::Frame::data(by))) ) ); let req = { + let url = c.ic.url.clone(); + let mut req = reqwest::Request::new(reqwest::Method::POST, url); + let h = req.headers_mut(); let ctype = r#"multipart/form-data; boundary="b""#; - let req = hyper::Request::post(&c.ic.url) - .header("Content-Type", ctype) - .header("Content-Length", body_len) - .body(body) - .context("construct request")?; + let ctype = reqwest::header::HeaderValue::from_static(ctype); + h.insert("Content-Type", ctype); + *req.body_mut() = Some(reqwest::Body::wrap(body)); req }; - let resp = c.hclient.request(req); + let resp = c.hclient.execute(req); let fut = Box::pin(async move { let r = async { tokio::time::timeout( c.ic.effective_http_timeout, async { let resp = resp.await.context("make request")?; let status = resp.status(); - let mut resp = resp.into_body(); let max_body = c.ic.max_batch_down.sat() + MAX_OVERHEAD; + let body = futures::stream::unfold(resp, |mut resp| async { + resp.chunk().await.transpose().map(|r| (r, resp)) + }); + pin!(body); let resp = read_limited_bytes( - max_body, default(), default(), Pin::new(&mut resp), + max_body, default(), default(), body.as_mut(), ).await - .discard_data().context("fetching response body")?; + .context("fetching response body")?; if ! status.is_success() { throw!(anyhow!("HTTP error status={} body={:?}", @@ -191,9 +191,9 @@ fn submit_request<'r, 'c:'r, C:Hcc>( reqs.push(fut); } -async fn run_client( +async fn run_client( ic: InstanceConfig, - hclient: Arc> + hclient: reqwest::Client, ) -> Result { debug!("{}: config: {:?}", &ic, &ic); @@ -353,11 +353,9 @@ async fn main() { Ok((ics,)) }); - let https = HttpsConnector::new(); - let hclient = hyper::Client::builder() - .http1_preserve_header_case(true) - .build::<_, hyper::Body>(https); - let hclient = Arc::new(hclient); + let hclient = reqwest::Client::builder() + .http1_title_case_headers() + .build().expect("build reqwest Client"); info!("starting"); let () = future::select_all( diff --git a/src/config.rs b/src/config.rs index aa76686..0a38080 100644 --- a/src/config.rs +++ b/src/config.rs @@ -36,7 +36,7 @@ pub struct InstanceConfig { #[client] pub max_requests_outstanding: u32, #[client] pub http_retry: Duration, #[client] pub success_report_interval: Duration, - #[client] pub url: Uri, + #[client] pub url: Url, #[client] pub vroutes: Vec, #[client] pub ifname_client: String, @@ -203,7 +203,7 @@ impl_inspectable_config_value!{ ServerName as Display } impl_inspectable_config_value!{ ClientName as Display } impl_inspectable_config_value!{ u16 as Display } impl_inspectable_config_value!{ u32 as Display } -impl_inspectable_config_value!{ hyper::Uri as Display } +impl_inspectable_config_value!{ reqwest::Url as Display } impl_inspectable_config_value!{ IpAddr as Display } impl_inspectable_config_value!{ ipnet::IpNet as Display } @@ -660,7 +660,12 @@ parseable_from_str!{u32, default() } parseable_from_str!{String, default() } parseable_from_str!{IpNet, default() } parseable_from_str!{IpAddr, Ipv4Addr::UNSPECIFIED.into() } -parseable_from_str!{Uri, default() } + +parseable_from_str!{ + Url, + "hippotat-unspecified:".parse() + .expect("failed to parse `hippotat-unspecified:` as a url") +} impl Parseable for Vec { #[throws(AE)] @@ -880,7 +885,7 @@ impl InstanceConfig { match end { LinkEnd::Client => { - if self.url == Uri::unspecified() { + if self.url == Url::unspecified() { let addr = self.addrs.get(0).ok_or_else( || anyhow!("client needs addrs or url set") )?; diff --git a/src/prelude.rs b/src/prelude.rs index f39ae74..a0d5a93 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -33,7 +33,7 @@ pub use easy_ext::ext; pub use fehler::{throw, throws}; pub use futures::{poll, future, FutureExt, StreamExt, TryStreamExt}; pub use hyper::body::{Bytes, Buf, HttpBody}; -pub use hyper::{Method, Uri}; +pub use hyper::{Method}; pub use hyper_tls::HttpsConnector; pub use ipnet::IpNet; pub use itertools::{iproduct, izip, Itertools}; @@ -42,6 +42,7 @@ pub use lazy_static::lazy_static; pub use log::{trace, debug, info, warn, error}; pub use memchr::memmem; pub use pin_project_lite::pin_project; +pub use reqwest::Url; pub use subtle::ConstantTimeEq; pub use thiserror::Error; pub use tokio::io::{AsyncBufReadExt, AsyncWriteExt};