From: Ian Jackson Date: Wed, 4 Aug 2021 11:56:49 +0000 (+0100) Subject: limit rx queue better X-Git-Tag: hippotat/1.0.0~341 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=299ad3ba1ae3a661962f3f47585fc746390049ca;p=hippotat.git limit rx queue better Signed-off-by: Ian Jackson --- diff --git a/src/bin/client.rs b/src/bin/client.rs index da82193..91c9abd 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -27,7 +27,7 @@ trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { } struct ClientContext<'c,C> { ic: &'c InstanceConfig, - effective_http_timeout: Duration, + effective_http_timeout: Duration, // also min err report interval, xxx doc hclient: &'c Arc>, reporter: &'c parking_lot::Mutex>, } @@ -214,6 +214,14 @@ async fn run_client( async { loop { + let rx_queue_space = + if rx_queue.remaining() < ic.max_batch_down.sat() * 3 /* xxx */ { + // xxx make this separate option ? docs say server only + Ok(()) + } else { + Err(()) + }; + select! { y = rx_stream.write_all_buf(&mut rx_queue), if ! rx_queue.is_empty() => @@ -254,14 +262,7 @@ async fn run_client( }, _ = async { }, - if reporter.lock().filter(None, { - if rx_queue.remaining() < ic.max_batch_down.sat() * 3 /* xxx */ { - // xxx make this separate option ? docs say server only - Ok(()) - } else { - Err(anyhow!("rx queue full")) - } - }).is_some() && + if rx_queue_space.is_ok() && (reqs.len() < ic.target_requests_outstanding.sat() || (reqs.len() < ic.max_requests_outstanding.sat() && ! upbound.is_empty())) @@ -286,7 +287,15 @@ async fn run_client( }, |e| error!("{} #{}: rx discarding: {}", &ic, req_num, e)); } - } + }, + + _ = tokio::time::sleep(c.effective_http_timeout), + if rx_queue_space.is_err() => + { + reporter.lock().filter(None, Err::( + anyhow!("rx queue full, blocked") + )); + }, } } }.await diff --git a/src/reporter.rs b/src/reporter.rs index 3984915..fb12e63 100644 --- a/src/reporter.rs +++ b/src/reporter.rs @@ -33,8 +33,8 @@ impl<'r> Reporter<'r> { let now = Instant::now(); if let Some(rep) = &self.last_report { if now - rep.when < match rep.ok { - Ok(()) => Duration::from_secs(3600), - Err(()) => Duration::from_secs(30), + Ok(()) => Duration::from_secs(3600), // xxx config? + Err(()) => Duration::from_secs(30), // xxx config? } { return }