chiark / gitweb /
check mtu
[hippotat.git] / src / bin / server.rs
index c3b7b15cd0956e19b07e183ba3b93a502853358a..4900e9778559d546035082cad82a1de1798786e2 100644 (file)
@@ -24,6 +24,7 @@ struct ClientHandles {
 
 /// Sent from hyper worker pool task to client task
 #[allow(dead_code)] // xxx
+#[derive(Debug)]
 struct WebRequest {
   // initial part of body
   // used up to and including first 2 lines of metadata
@@ -39,6 +40,7 @@ struct WebRequest {
 
 /// Reply from client task to hyper worker pool task
 #[allow(dead_code)] // xxx
+#[derive(Debug)]
 struct WebResponse {
   warnings: Warnings,
   data: Result<WebResponseData, AE>,
@@ -183,6 +185,7 @@ async fn handle(
     // boundary, start, &comp.name, &client.ic);
 
     let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
+    trace!("{} request xxx={}", &client.ic, initial.len());
     let wreq = WebRequest {
       initial,
       initial_remaining,
@@ -192,7 +195,6 @@ async fn handle(
       warnings: mem::take(&mut warnings),
       reply_to
     };
-    trace!("{} request", &client.ic);
 
     client.web.try_send(wreq)
       .map_err(|_| anyhow!("client task shut down!"))?;
@@ -222,7 +224,11 @@ async fn run_client(ic: Arc<InstanceConfig>,
 {
   struct Outstanding {
     reply_to: tokio::sync::oneshot::Sender<WebResponse>,
-    max_requests_outstanding: u32,
+    oi: OutstandingInner,
+  }
+  #[derive(Debug)]
+  struct OutstandingInner {
+    target_requests_outstanding: u32,
   }
   let mut outstanding: VecDeque<Outstanding> = default();
   let  downbound: VecDeque<(/*xxx*/)> = default();
@@ -240,7 +246,7 @@ async fn run_client(ic: Arc<InstanceConfig>,
       if ! downbound.is_empty() {
         outstanding.pop_front()
       } else if let Some((i,_)) = outstanding.iter().enumerate().find({
-        |(_,o)| outstanding.len() > o.max_requests_outstanding.sat()
+        |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
       }) {
         Some(outstanding.remove(i).unwrap())
       } else {
@@ -252,6 +258,7 @@ async fn run_client(ic: Arc<InstanceConfig>,
         warnings: default(),
       };
 
+      dbg!(&response);
       try_send_response(ret.reply_to, response);
     }
 
@@ -261,7 +268,7 @@ async fn run_client(ic: Arc<InstanceConfig>,
         let WebRequest {
           initial, initial_remaining, length_hint, mut body,
           boundary_finder,
-          reply_to, warnings,
+          reply_to, mut warnings,
         } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
 
         match async {
@@ -275,7 +282,7 @@ async fn run_client(ic: Arc<InstanceConfig>,
             &mut body
           ).await.context("read request body")?;
 
-          let (meta, comps) =
+          let (meta, mut comps) =
             multipart::ComponentIterator::resume_mid_component(
               &whole_request[initial_used..],
               boundary_finder
@@ -312,6 +319,11 @@ async fn run_client(ic: Arc<InstanceConfig>,
             let server, client = Duration::from_secs(meta.need_parse()?);
           }
 
+          meta!{
+            mtu, ( != ), client,
+            let server, client: u32 = meta.parse()?.unwrap_or(server);
+          }
+
           meta!{
             max_batch_down, (), min(client, server),
             let server, client: u32 = meta.parse()?.unwrap_or(server);
@@ -322,12 +334,19 @@ async fn run_client(ic: Arc<InstanceConfig>,
             let server, client = meta.parse()?.unwrap_or(server);
           }
 
-          Ok::<_,AE>(())
+          while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
+            if comp.name != PartName::d {
+              warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
+            }
+            dbg!(comp.name, DumpHex(comp.payload));
+          }
+
+          let oi = OutstandingInner {
+            target_requests_outstanding,
+          };
+          Ok::<_,AE>(oi)
         }.await {
-          Ok(()) => outstanding.push_back(Outstanding {
-            reply_to: reply_to,
-            max_requests_outstanding: 42, // xxx
-          }),
+          Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
           Err(e) => {
             try_send_response(reply_to, WebResponse {
               data: Err(e),