chiark / gitweb /
limit rx queue better
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Wed, 4 Aug 2021 11:56:49 +0000 (12:56 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Wed, 4 Aug 2021 11:56:49 +0000 (12:56 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs
src/reporter.rs

index da82193b3b850a1c00aa6231644453516ac528a5..91c9abd3e634fd7636f085af04eac82b0774bfbd 100644 (file)
@@ -27,7 +27,7 @@ trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
 
 struct ClientContext<'c,C> {
   ic: &'c InstanceConfig,
-  effective_http_timeout: Duration,
+  effective_http_timeout: Duration, // also min err report interval, xxx doc
   hclient: &'c Arc<hyper::Client<C>>,
   reporter: &'c parking_lot::Mutex<Reporter<'c>>,
 }
@@ -214,6 +214,14 @@ async fn run_client<C:HCC>(
 
   async {
     loop {
+      let rx_queue_space = 
+        if rx_queue.remaining() < ic.max_batch_down.sat() * 3 /* xxx */ {
+          // xxx make this separate option ? docs say server only
+          Ok(())
+        } else {
+          Err(())
+        };
+      
       select! {
         y = rx_stream.write_all_buf(&mut rx_queue),
         if ! rx_queue.is_empty() =>
@@ -254,14 +262,7 @@ async fn run_client<C:HCC>(
         },
 
         _ = async { },
-        if reporter.lock().filter(None, {
-          if rx_queue.remaining() < ic.max_batch_down.sat() * 3 /* xxx */ {
-            // xxx make this separate option ? docs say server only
-            Ok(())
-          } else {
-            Err(anyhow!("rx queue full"))
-          }
-        }).is_some() &&
+        if rx_queue_space.is_ok() &&
           (reqs.len() < ic.target_requests_outstanding.sat() ||
            (reqs.len() < ic.max_requests_outstanding.sat() &&
             ! upbound.is_empty()))
@@ -286,7 +287,15 @@ async fn run_client<C:HCC>(
             }, |e| error!("{} #{}: rx discarding: {}", &ic, req_num, e));
           
           }
-        }
+        },
+
+        _ = tokio::time::sleep(c.effective_http_timeout),
+        if rx_queue_space.is_err() =>
+        {
+          reporter.lock().filter(None, Err::<Void,_>(
+            anyhow!("rx queue full, blocked")
+          ));
+        },
       }
     }
   }.await
index 398491569f72c6a19e890e634055ce36797d0c0c..fb12e63335229fcd5fe50c5fdf1d9656c6428a6b 100644 (file)
@@ -33,8 +33,8 @@ impl<'r> Reporter<'r> {
     let now = Instant::now();
     if let Some(rep) = &self.last_report {
       if now - rep.when < match rep.ok {
-        Ok(()) => Duration::from_secs(3600),
-        Err(()) => Duration::from_secs(30),
+        Ok(()) => Duration::from_secs(3600), // xxx config?
+        Err(()) => Duration::from_secs(30), // xxx config?
       } {
         return
       }