From ce74b5478bfe81e8cef2de66efc41161ef2354a1 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Sun, 22 Aug 2021 17:16:05 +0100 Subject: [PATCH] response plumbing Signed-off-by: Ian Jackson --- server/server.rs | 2 +- server/suser.rs | 36 +++++++++++++++++++++++++++++++----- src/queue.rs | 3 +++ 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/server/server.rs b/server/server.rs index 2726d09..dcadd54 100644 --- a/server/server.rs +++ b/server/server.rs @@ -8,7 +8,7 @@ mod suser; mod slocal; mod sweb; -pub use sweb::{WebRequest, WebResponse}; +pub use sweb::{WebRequest, WebResponse, WebResponseBody}; pub use suser::User; #[derive(StructOpt,Debug)] diff --git a/server/suser.rs b/server/suser.rs index 48c8be5..98ee84c 100644 --- a/server/suser.rs +++ b/server/suser.rs @@ -39,7 +39,7 @@ pub async fn run(global: Arc, }; loop { - if let Some(ret) = { + if let Some(req) = { if ! downbound.is_empty() { outstanding.pop_front() } else if let Some((i,_)) = outstanding.iter().enumerate().find({ @@ -50,19 +50,45 @@ pub async fn run(global: Arc, None } } { + let mut build: FrameQueueBuf = default(); + + loop { + let next = if let Some(n) = downbound.peek_front() { n } + else { break }; + // Don't add 1 for the ESC since we will strip one + if build.len() + next.len() >= ic.max_batch_down.sat() { break } + build.esc_push(downbound.pop_front().unwrap()); + } + if ! build.is_empty() { + // skip leading ESC + build.advance(1); + } + let response = WebResponse { - data: Ok(default()), + data: Ok(build), warnings: default(), }; - try_send_response(ret.reply_to, response); + try_send_response(req.reply_to, response); + } + + let max = usize::saturating_mul( + ic.max_requests_outstanding.sat(), + ic.max_batch_down.sat(), + ).saturating_add(1 /* one boundary SLIP_ESC which we'll trim */); + + while downbound.total_len() > max { + let _ = downbound.pop_front(); } select!{ biased; - - // xxx something something routed something + data = routed.recv() => + { + let data = data.ok_or_else(|| anyhow!("routers shut down!"))?; + downbound.push_back(data.data); + }, req = web.recv() => { diff --git a/src/queue.rs b/src/queue.rs index 6b6c7bb..3a85f5c 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -25,6 +25,9 @@ impl PacketQueue where D: AsRef<[u8]> { pub fn content_count(&self) -> usize { self.queue.len() } pub fn content_len(&self) -> usize { self.content } + pub fn total_len(&self) -> usize { + self.content_count() + self.content_len() + } pub fn is_empty(&self) -> bool { self.queue.is_empty() } pub fn peek_front(&self) -> Option<&D> { self.queue.front() } -- 2.30.2