From: Ian Jackson Date: Fri, 13 Aug 2021 23:19:11 +0000 (+0100) Subject: server: wip X-Git-Tag: hippotat/1.0.0~184 X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=hippotat.git;a=commitdiff_plain;h=9e9071a5326b5babf2617a9dee2cf217a871511d server: wip Signed-off-by: Ian Jackson --- diff --git a/src/bin/server.rs b/src/bin/server.rs index 26abf5e..764b22f 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -22,10 +22,22 @@ struct ClientHandles { web: tokio::sync::mpsc::Sender, } +/// Sent from hyper worker pool task to client task +#[allow(dead_code)] // xxx struct WebRequest { - reply: tokio::sync::oneshot::Sender<()>, + // initial part of body + // used up to and including first 2 lines of metadata + // end delimiter for the metadata not yet located, but in here somewhere + initial: Box<[u8]>, + initial_remaining: usize, + body: hyper::body::Body, + reply: tokio::sync::oneshot::Sender, } +/// Reply from client task to hyper worker pool task +type WebResponse = Result; +type WebResponseData = (); + async fn handle( all_clients: Arc, req: hyper::Request @@ -141,11 +153,27 @@ async fn handle( }; chk_skew(client_time, now, "ahead")?; chk_skew(now, client_time, "behind")?; - - eprintln!("boundary={:?} start={} name={:?} client={}", - boundary, start, &comp.name, &client.ic); - Ok::<_,AE>(()) + let initial_remaining = meta.remaining_bytes_len(); + + //eprintln!("boundary={:?} start={} name={:?} client={}", + // boundary, start, &comp.name, &client.ic); + + let (reply, reply_recv) = tokio::sync::oneshot::channel(); + let wreq = WebRequest { + initial, + initial_remaining, + body, + reply + }; + trace!("{} request", &client.ic); + + client.web.try_send(wreq) + .map_err(|_| anyhow!("client task shut down!"))?; + + let reply: WebResponse = reply_recv.await?; + + reply }.await { Ok(()) => { }, @@ -159,11 +187,45 @@ async fn handle( Ok(hyper::Response::new(hyper::Body::from("Hello World"))) } -async fn run_client(_ic: Arc, _web: mpsc::Receiver) +async fn run_client(_ic: Arc, + mut web: mpsc::Receiver) -> Result { - tokio::time::sleep(Duration::from_secs(1_000_000_000)).await; - Err(anyhow!("xxx")) + struct Outstanding { + reply: tokio::sync::oneshot::Sender, + max_requests_outstanding: u32, + } + let mut outstanding: VecDeque = default(); + let downbound: VecDeque<(/*xxx*/)> = default(); + + loop { + if let Some(ret) = { + if ! downbound.is_empty() { + outstanding.pop_front() + } else if let Some((i,_)) = outstanding.iter().enumerate().find({ + |(_,o)| outstanding.len() > o.max_requests_outstanding.sat() + }) { + Some(outstanding.remove(i).unwrap()) + } else { + None + } + } { + ret.reply.send(Ok(() /* dummy */)) + .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */); + } + + select!{ + req = web.recv() => + { + let req = req.ok_or_else(|| anyhow!("webservers all shut down!"))?; + outstanding.push_back(Outstanding { + reply: req.reply, + max_requests_outstanding: 42, // xxx + }); + } + } + } + //Err(anyhow!("xxx")) } #[tokio::main] diff --git a/src/multipart.rs b/src/multipart.rs index 1c0be40..9f3fca9 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -109,6 +109,14 @@ impl<'b> MetadataFieldIterator<'b> { let s = if let Some(r) = self.next() { r? } else { return None }; Some(s.parse()?) } + + pub fn remaining_bytes_len(&self) -> usize { + if let Some(last) = self.last { + self.buf.len() - last + } else { + 0 + } + } } impl<'b> Iterator for MetadataFieldIterator<'b> {