From: Ian Jackson Date: Tue, 3 Aug 2021 22:58:16 +0000 (+0100) Subject: rx queue X-Git-Tag: hippotat/1.0.0~367 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=d36dea93f630abca4415b8e1e447706d4b65aacc;p=hippotat.git rx queue Signed-off-by: Ian Jackson --- diff --git a/src/bin/client.rs b/src/bin/client.rs index e21020c..fde8058 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -135,7 +135,7 @@ fn submit_request<'r, 'c:'r, C:HCC>( Ok::<_,AE>(resp) }).await? }.await; - let r = c.reporter.lock().report(req_num, r); + let r = c.reporter.lock().report(Some(req_num), r); if r.is_none() { tokio::time::sleep(c.ic.http_retry).await; @@ -199,6 +199,15 @@ async fn run_client( async { loop { + let rx_fullness = + if rx_queue.remaining() < ic.max_batch_down.sat() * 3 /* xxx */ { + // xxx make this separate option ? docs say server only + Ok(()) + } else { + Err(anyhow!("rx queue full")) + }; + let rx_fullness_ok = reporter.lock().report(None, rx_fullness); + select! { data = tx_stream.next_segment(), if packets.is_empty() => @@ -233,9 +242,10 @@ async fn run_client( }, _ = async { }, - if reqs.len() < ic.target_requests_outstanding.sat() || - (reqs.len() < ic.max_requests_outstanding.sat() && - ! upbound.is_empty()) + if rx_fullness_ok.is_some() && + (reqs.len() < ic.target_requests_outstanding.sat() || + (reqs.len() < ic.max_requests_outstanding.sat() && + ! upbound.is_empty())) // xxx backpressure, if too much in rx_queue => { @@ -257,7 +267,8 @@ async fn run_client( }, |e| error!("{} #{}: rx discarding: {}", &ic, req_num, e)); dbg!(&rx_queue); - rx_queue = default(); // xxx + // xxx do writing with + // tokio::io::AsyncWriteExt::write_all_buf } } } diff --git a/src/reporter.rs b/src/reporter.rs index ec6fcb0..04a8e49 100644 --- a/src/reporter.rs +++ b/src/reporter.rs @@ -13,7 +13,8 @@ impl<'r> Reporter<'r> { ic } } - pub fn report(&mut self, req_num: ReqNum, r: Result) -> Option { + pub fn report(&mut self, req_num: Option, r: Result) + -> Option { match r { Ok(t) => { // xxx something something success @@ -21,7 +22,11 @@ impl<'r> Reporter<'r> { }, Err(e) => { // xxx something something error - warn!("{} #{}: {:?}", self.ic, req_num, e); + if let Some(req_num) = req_num { + warn!("{} #{}: {:?}", self.ic, req_num, e); + } else { + warn!("{}: {:?}", self.ic, e); + } None }, }