From 9660c221a184bd6116336972c45a17af59ecdfe5 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Sat, 14 Aug 2021 13:58:51 +0100 Subject: [PATCH] server: wip plumbing Signed-off-by: Ian Jackson --- src/bin/server.rs | 61 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/src/bin/server.rs b/src/bin/server.rs index 764b22f..e38b117 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -31,11 +31,17 @@ struct WebRequest { initial: Box<[u8]>, initial_remaining: usize, body: hyper::body::Body, - reply: tokio::sync::oneshot::Sender, + reply_to: tokio::sync::oneshot::Sender, + warnings: Warnings, } /// Reply from client task to hyper worker pool task -type WebResponse = Result; +#[allow(dead_code)] // xxx +struct WebResponse { + warnings: Warnings, + data: Result, +} + type WebResponseData = (); async fn handle( @@ -159,12 +165,13 @@ async fn handle( //eprintln!("boundary={:?} start={} name={:?} client={}", // boundary, start, &comp.name, &client.ic); - let (reply, reply_recv) = tokio::sync::oneshot::channel(); + let (reply_to, reply_recv) = tokio::sync::oneshot::channel(); let wreq = WebRequest { initial, initial_remaining, body, - reply + warnings: mem::take(&mut warnings), + reply_to }; trace!("{} request", &client.ic); @@ -172,8 +179,9 @@ async fn handle( .map_err(|_| anyhow!("client task shut down!"))?; let reply: WebResponse = reply_recv.await?; + warnings = reply.warnings; - reply + reply.data }.await { Ok(()) => { }, @@ -187,17 +195,26 @@ async fn handle( Ok(hyper::Response::new(hyper::Body::from("Hello World"))) } +#[allow(unused_variables)] // xxx async fn run_client(_ic: Arc, mut web: mpsc::Receiver) -> Result { struct Outstanding { - reply: tokio::sync::oneshot::Sender, + reply_to: tokio::sync::oneshot::Sender, max_requests_outstanding: u32, } let mut outstanding: VecDeque = default(); let downbound: VecDeque<(/*xxx*/)> = default(); + let try_send_response = | + reply_to: tokio::sync::oneshot::Sender, + response: WebResponse + | { + reply_to.send(response) + .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */); + }; + loop { if let Some(ret) = { if ! downbound.is_empty() { @@ -210,18 +227,36 @@ async fn run_client(_ic: Arc, None } } { - ret.reply.send(Ok(() /* dummy */)) - .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */); + let response = WebResponse { + data: Ok(()), + warnings: default(), + }; + + try_send_response(ret.reply_to, response); } select!{ req = web.recv() => { - let req = req.ok_or_else(|| anyhow!("webservers all shut down!"))?; - outstanding.push_back(Outstanding { - reply: req.reply, - max_requests_outstanding: 42, // xxx - }); + let WebRequest { + initial, initial_remaining, body, + reply_to, warnings, + } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?; + + match (||{ + Ok::<_,AE>(()) + })() { + Ok(()) => outstanding.push_back(Outstanding { + reply_to: reply_to, + max_requests_outstanding: 42, // xxx + }), + Err(e) => { + try_send_response(reply_to, WebResponse { + data: Err(e), + warnings, + }); + }, + } } } } -- 2.30.2