chiark / gitweb /
server: wip plumbing
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 14 Aug 2021 12:58:51 +0000 (13:58 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 15 Aug 2021 13:36:13 +0000 (14:36 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/server.rs

index 764b22f5e2954a98e9d6eede7c034fbb8c54dc56..e38b1176aa8cfd11e864a04a780a30b61e55aade 100644 (file)
@@ -31,11 +31,17 @@ struct WebRequest {
   initial: Box<[u8]>,
   initial_remaining: usize,
   body: hyper::body::Body,
-  reply: tokio::sync::oneshot::Sender<WebResponse>,
+  reply_to: tokio::sync::oneshot::Sender<WebResponse>,
+  warnings: Warnings,
 }
 
 /// Reply from client task to hyper worker pool task
-type WebResponse = Result<WebResponseData, AE>;
+#[allow(dead_code)] // xxx
+struct WebResponse {
+  warnings: Warnings,
+  data: Result<WebResponseData, AE>,
+}
+
 type WebResponseData = ();
 
 async fn handle(
@@ -159,12 +165,13 @@ async fn handle(
     //eprintln!("boundary={:?} start={} name={:?} client={}",
     // boundary, start, &comp.name, &client.ic);
 
-    let (reply, reply_recv) = tokio::sync::oneshot::channel();
+    let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
     let wreq = WebRequest {
       initial,
       initial_remaining,
       body,
-      reply
+      warnings: mem::take(&mut warnings),
+      reply_to
     };
     trace!("{} request", &client.ic);
 
@@ -172,8 +179,9 @@ async fn handle(
       .map_err(|_| anyhow!("client task shut down!"))?;
 
     let reply: WebResponse = reply_recv.await?;
+    warnings = reply.warnings;
 
-    reply
+    reply.data
   }.await {
     Ok(()) => {
     },
@@ -187,17 +195,26 @@ async fn handle(
   Ok(hyper::Response::new(hyper::Body::from("Hello World")))
 }
 
+#[allow(unused_variables)] // xxx
 async fn run_client(_ic: Arc<InstanceConfig>,
                     mut web: mpsc::Receiver<WebRequest>)
                     -> Result<Void, AE>
 {
   struct Outstanding {
-    reply: tokio::sync::oneshot::Sender<WebResponse>,
+    reply_to: tokio::sync::oneshot::Sender<WebResponse>,
     max_requests_outstanding: u32,
   }
   let mut outstanding: VecDeque<Outstanding> = default();
   let  downbound: VecDeque<(/*xxx*/)> = default();
 
+  let try_send_response = |
+    reply_to: tokio::sync::oneshot::Sender<WebResponse>,
+    response: WebResponse
+  | {
+    reply_to.send(response)
+      .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
+  };
+
   loop {
     if let Some(ret) = {
       if ! downbound.is_empty() {
@@ -210,18 +227,36 @@ async fn run_client(_ic: Arc<InstanceConfig>,
         None
       }
     } {
-      ret.reply.send(Ok(() /* dummy */))
-        .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
+      let response = WebResponse {
+        data: Ok(()),
+        warnings: default(),
+      };
+
+      try_send_response(ret.reply_to, response);
     }
 
     select!{
       req = web.recv() =>
       {
-        let req = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
-        outstanding.push_back(Outstanding {
-          reply: req.reply,
-          max_requests_outstanding: 42, // xxx
-        });
+        let WebRequest {
+          initial, initial_remaining, body,
+          reply_to, warnings,
+        } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
+
+        match (||{
+          Ok::<_,AE>(())
+        })() {
+          Ok(()) => outstanding.push_back(Outstanding {
+            reply_to: reply_to,
+            max_requests_outstanding: 42, // xxx
+          }),
+          Err(e) => {
+            try_send_response(reply_to, WebResponse {
+              data: Err(e),
+              warnings,
+            });
+          },
+        }
       }
     }
   }