X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;ds=sidebyside;f=server%2Fsuser.rs;fp=server%2Fsuser.rs;h=c2f11ff09cbc73a5d4d7eb9b421972d375f3a9a4;hb=55567b861f856b69d509b4e2a8466cdefd848b66;hp=98ee84cb23f1b293fb087d977ab74ae7751f02b5;hpb=76c5b7a6298129e1d1cdcff74eff22ec33687308;p=hippotat.git diff --git a/server/suser.rs b/server/suser.rs index 98ee84c..c2f11ff 100644 --- a/server/suser.rs +++ b/server/suser.rs @@ -11,8 +11,6 @@ pub struct User { pub route: mpsc::Sender, } -#[allow(unused_variables)] // xxx -#[allow(unused_mut)] // xxx pub async fn run(global: Arc, ic: Arc, mut web: mpsc::Receiver, @@ -25,7 +23,9 @@ pub async fn run(global: Arc, } #[derive(Debug)] struct OutstandingInner { + deadline: Instant, target_requests_outstanding: u32, + max_batch_down: u32, } let mut outstanding: VecDeque = default(); let mut downbound: PacketQueue = default(); @@ -39,11 +39,19 @@ pub async fn run(global: Arc, }; loop { + let eff_max_batch_down = outstanding + .iter() + .map(|o| o.oi.max_batch_down) + .min() + .unwrap_or(ic.max_batch_down) + .sat(); + if let Some(req) = { if ! downbound.is_empty() { outstanding.pop_front() } else if let Some((i,_)) = outstanding.iter().enumerate().find({ |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat() + // xxx need timeout-based return too }) { Some(outstanding.remove(i).unwrap()) } else { @@ -56,7 +64,7 @@ pub async fn run(global: Arc, let next = if let Some(n) = downbound.peek_front() { n } else { break }; // Don't add 1 for the ESC since we will strip one - if build.len() + next.len() >= ic.max_batch_down.sat() { break } + if build.len() + next.len() >= eff_max_batch_down { break } build.esc_push(downbound.pop_front().unwrap()); } if ! build.is_empty() { @@ -74,7 +82,7 @@ pub async fn run(global: Arc, let max = usize::saturating_mul( ic.max_requests_outstanding.sat(), - ic.max_batch_down.sat(), + eff_max_batch_down, ).saturating_add(1 /* one boundary SLIP_ESC which we'll trim */); while downbound.total_len() > max { @@ -155,6 +163,7 @@ pub async fn run(global: Arc, max_batch_up, ( > ), client, let server, client = meta.parse()?.unwrap_or(server); } + let _ = max_batch_up; // we don't use this further while let Some(comp) = comps.next(&mut warnings, PartName::d)? { if comp.name != PartName::d { @@ -173,8 +182,12 @@ pub async fn run(global: Arc, ).await?; } + let deadline = Instant::now() + http_timeout; + let oi = OutstandingInner { target_requests_outstanding, + max_batch_down, + deadline, }; Ok::<_,AE>(oi) }.await {