chiark / gitweb /
response plumbing
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 22 Aug 2021 16:16:05 +0000 (17:16 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 22 Aug 2021 16:16:05 +0000 (17:16 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
server/server.rs
server/suser.rs
src/queue.rs

index 2726d09fccd69e1ce6b85febd047135ed4c57965..dcadd54118f83b4b5124579e5da20650fb25d07c 100644 (file)
@@ -8,7 +8,7 @@ mod suser;
 mod slocal;
 mod sweb;
 
-pub use sweb::{WebRequest, WebResponse};
+pub use sweb::{WebRequest, WebResponse, WebResponseBody};
 pub use suser::User;
 
 #[derive(StructOpt,Debug)]
index 48c8be5b8af186f89321994bd04b076c39c341de..98ee84cb23f1b293fb087d977ab74ae7751f02b5 100644 (file)
@@ -39,7 +39,7 @@ pub async fn run(global: Arc<Global>,
   };
 
   loop {
-    if let Some(ret) = {
+    if let Some(req) = {
       if ! downbound.is_empty() {
         outstanding.pop_front()
       } else if let Some((i,_)) = outstanding.iter().enumerate().find({
@@ -50,19 +50,45 @@ pub async fn run(global: Arc<Global>,
         None
       }
     } {
+      let mut build: FrameQueueBuf = default();
+
+      loop {
+        let next = if let Some(n) = downbound.peek_front() { n }
+                   else { break };
+        // Don't add 1 for the ESC since we will strip one
+        if build.len() + next.len() >= ic.max_batch_down.sat() { break }
+        build.esc_push(downbound.pop_front().unwrap());
+      }
+      if ! build.is_empty() {
+        // skip leading ESC
+        build.advance(1);
+      }
+
       let response = WebResponse {
-        data: Ok(default()),
+        data: Ok(build),
         warnings: default(),
       };
 
-      try_send_response(ret.reply_to, response);
+      try_send_response(req.reply_to, response);
+    }
+
+    let max = usize::saturating_mul(
+      ic.max_requests_outstanding.sat(),
+      ic.max_batch_down.sat(),
+    ).saturating_add(1 /* one boundary SLIP_ESC which we'll trim */);
+
+    while downbound.total_len() > max {
+      let _ = downbound.pop_front();
     }
 
     select!{
       biased;
 
-      
-      // xxx something something routed something
+      data = routed.recv() =>
+      {
+        let data = data.ok_or_else(|| anyhow!("routers shut down!"))?;
+        downbound.push_back(data.data);
+      },
 
       req = web.recv() =>
       {
index 6b6c7bba6f9a93b2441baa04c440cbe27f208f9c..3a85f5c27b1886b3863ca22e033386a849ecc26b 100644 (file)
@@ -25,6 +25,9 @@ impl<D> PacketQueue<D> where D: AsRef<[u8]> {
 
   pub fn content_count(&self) -> usize { self.queue.len() }
   pub fn content_len(&self) -> usize { self.content }
+  pub fn total_len(&self) -> usize {
+    self.content_count() + self.content_len()
+  }
 
   pub fn is_empty(&self) -> bool { self.queue.is_empty() }
   pub fn peek_front(&self) -> Option<&D> { self.queue.front() }