chiark / gitweb /
fixes etc.
[hippotat.git] / server / suser.rs
index 98ee84cb23f1b293fb087d977ab74ae7751f02b5..c2f11ff09cbc73a5d4d7eb9b421972d375f3a9a4 100644 (file)
@@ -11,8 +11,6 @@ pub struct User {
   pub route: mpsc::Sender<RoutedPacket>,
 }
 
-#[allow(unused_variables)] // xxx
-#[allow(unused_mut)] // xxx
 pub async fn run(global: Arc<Global>,
                  ic: Arc<InstanceConfig>,
                  mut web: mpsc::Receiver<WebRequest>,
@@ -25,7 +23,9 @@ pub async fn run(global: Arc<Global>,
   }
   #[derive(Debug)]
   struct OutstandingInner {
+    deadline: Instant,
     target_requests_outstanding: u32,
+    max_batch_down: u32,
   }
   let mut outstanding: VecDeque<Outstanding> = default();
   let mut downbound: PacketQueue<RoutedPacketData> = default();
@@ -39,11 +39,19 @@ pub async fn run(global: Arc<Global>,
   };
 
   loop {
+    let eff_max_batch_down = outstanding
+      .iter()
+      .map(|o| o.oi.max_batch_down)
+      .min()
+      .unwrap_or(ic.max_batch_down)
+      .sat();
+
     if let Some(req) = {
       if ! downbound.is_empty() {
         outstanding.pop_front()
       } else if let Some((i,_)) = outstanding.iter().enumerate().find({
         |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
+        // xxx need timeout-based return too
       }) {
         Some(outstanding.remove(i).unwrap())
       } else {
@@ -56,7 +64,7 @@ pub async fn run(global: Arc<Global>,
         let next = if let Some(n) = downbound.peek_front() { n }
                    else { break };
         // Don't add 1 for the ESC since we will strip one
-        if build.len() + next.len() >= ic.max_batch_down.sat() { break }
+        if build.len() + next.len() >= eff_max_batch_down { break }
         build.esc_push(downbound.pop_front().unwrap());
       }
       if ! build.is_empty() {
@@ -74,7 +82,7 @@ pub async fn run(global: Arc<Global>,
 
     let max = usize::saturating_mul(
       ic.max_requests_outstanding.sat(),
-      ic.max_batch_down.sat(),
+      eff_max_batch_down,
     ).saturating_add(1 /* one boundary SLIP_ESC which we'll trim */);
 
     while downbound.total_len() > max {
@@ -155,6 +163,7 @@ pub async fn run(global: Arc<Global>,
             max_batch_up, ( > ), client,
             let server, client = meta.parse()?.unwrap_or(server);
           }
+          let _ = max_batch_up; // we don't use this further
 
           while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
             if comp.name != PartName::d {
@@ -173,8 +182,12 @@ pub async fn run(global: Arc<Global>,
             ).await?;
           }
 
+          let deadline = Instant::now() + http_timeout;
+
           let oi = OutstandingInner {
             target_requests_outstanding,
+            max_batch_down,
+            deadline,
           };
           Ok::<_,AE>(oi)
         }.await {