chiark / gitweb /
rx queue
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Tue, 3 Aug 2021 22:58:16 +0000 (23:58 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Tue, 3 Aug 2021 23:02:22 +0000 (00:02 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs
src/reporter.rs

index e21020cc2c2c6fb77ddde43c1b4a0b7ab234eb80..fde8058cf79963e86d447f529f1e461ad65823f9 100644 (file)
@@ -135,7 +135,7 @@ fn submit_request<'r, 'c:'r, C:HCC>(
       Ok::<_,AE>(resp)
     }).await? }.await;
 
-    let r = c.reporter.lock().report(req_num, r);
+    let r = c.reporter.lock().report(Some(req_num), r);
 
     if r.is_none() {
       tokio::time::sleep(c.ic.http_retry).await;
@@ -199,6 +199,15 @@ async fn run_client<C:HCC>(
 
   async {
     loop {
+      let rx_fullness =
+        if rx_queue.remaining() < ic.max_batch_down.sat() * 3 /* xxx */ {
+          // xxx make this separate option ? docs say server only
+          Ok(())
+        } else {
+          Err(anyhow!("rx queue full"))
+        };
+      let rx_fullness_ok = reporter.lock().report(None, rx_fullness);
+
       select! {
         data = tx_stream.next_segment(),
         if packets.is_empty() =>
@@ -233,9 +242,10 @@ async fn run_client<C:HCC>(
         },
 
         _ = async { },
-        if reqs.len() < ic.target_requests_outstanding.sat() ||
-           (reqs.len() < ic.max_requests_outstanding.sat() &&
-            ! upbound.is_empty())
+        if rx_fullness_ok.is_some() &&
+           (reqs.len() < ic.target_requests_outstanding.sat() ||
+            (reqs.len() < ic.max_requests_outstanding.sat() &&
+             ! upbound.is_empty()))
           // xxx backpressure, if too much in rx_queue
           =>
         {
@@ -257,7 +267,8 @@ async fn run_client<C:HCC>(
             }, |e| error!("{} #{}: rx discarding: {}", &ic, req_num, e));
           
             dbg!(&rx_queue);
-            rx_queue = default(); // xxx
+            // xxx do writing with
+            //  tokio::io::AsyncWriteExt::write_all_buf
           }
         }
       }
index ec6fcb00eedbe5658a9509800563c0b86bfcdf9e..04a8e49e2bd1b4df5e23df2b1b555b0a766df706 100644 (file)
@@ -13,7 +13,8 @@ impl<'r> Reporter<'r> {
     ic
   } }
   
-  pub fn report<T>(&mut self, req_num: ReqNum, r: Result<T,AE>) -> Option<T> {
+  pub fn report<T>(&mut self, req_num: Option<ReqNum>, r: Result<T,AE>)
+                   -> Option<T> {
     match r {
       Ok(t) => {
         // xxx something something success
@@ -21,7 +22,11 @@ impl<'r> Reporter<'r> {
       },
       Err(e) => {
         // xxx something something error
-        warn!("{} #{}: {:?}", self.ic, req_num, e);
+        if let Some(req_num) = req_num {
+          warn!("{} #{}: {:?}", self.ic, req_num, e);
+        } else {
+          warn!("{}: {:?}", self.ic, e);
+        }
         None
       },
     }