// end delimiter for the metadata not yet located, but in here somewhere
initial: Box<[u8]>,
initial_remaining: usize,
+ length_hint: usize,
body: hyper::body::Body,
- reply: tokio::sync::oneshot::Sender<WebResponse>,
+ reply_to: tokio::sync::oneshot::Sender<WebResponse>,
+ warnings: Warnings,
}
/// Reply from client task to hyper worker pool task
-type WebResponse = Result<WebResponseData, AE>;
+#[allow(dead_code)] // xxx
+struct WebResponse {
+ warnings: Warnings,
+ data: Result<WebResponseData, AE>,
+}
+
type WebResponseData = ();
async fn handle(
match async {
+ let get_header = |hn: &str| {
+ let mut values = req.headers().get_all(hn).iter();
+ let v = values.next().ok_or_else(|| anyhow!("missing {}", hn))?;
+ if values.next().is_some() { throw!(anyhow!("multiple {}!", hn)); }
+ let v = v.to_str().context(anyhow!("interpret {} as UTF-8", hn))?;
+ Ok::<_,AE>(v)
+ };
+
let mkboundary = |b: &'_ _| format!("\n--{}", b).into_bytes();
let boundary = match (||{
- let mut ctypes = req.headers().get_all("Content-Type").iter();
- let t = ctypes.next().ok_or_else(|| anyhow!("missing Content-Type"))?;
- if ctypes.next().is_some() { throw!(anyhow!("several Content-Type")) }
- let t = t.to_str().context("interpret Content-Type as utf-8")?;
+ let t = get_header("Content-Type")?;
let t: mime::Mime = t.parse().context("parse Content-Type")?;
if t.type_() != "multipart" { throw!(anyhow!("not multipart/")) }
let b = mime::BOUNDARY;
},
};
+ let length_hint: usize = (||{
+ let clength = get_header("Content-Length")?;
+ let clength = clength.parse().context("parse Content-Length")?;
+ Ok::<_,AE>(clength)
+ })().unwrap_or_else(
+ |e| { let _ = warnings.add(&e.wrap_err("parsing Content-Length")); 0 }
+ );
+
let mut body = req.into_body();
- let initial = match read_limited_bytes(METADATA_MAX_LEN, &mut body).await {
+ let initial = match read_limited_bytes(
+ METADATA_MAX_LEN, default(), length_hint, &mut body
+ ).await {
Ok(all) => all,
Err(ReadLimitedError::Truncated { sofar,.. }) => sofar,
Err(ReadLimitedError::Hyper(e)) => throw!(e),
//eprintln!("boundary={:?} start={} name={:?} client={}",
// boundary, start, &comp.name, &client.ic);
- let (reply, reply_recv) = tokio::sync::oneshot::channel();
+ let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
let wreq = WebRequest {
initial,
initial_remaining,
+ length_hint,
body,
- reply
+ warnings: mem::take(&mut warnings),
+ reply_to
};
trace!("{} request", &client.ic);
.map_err(|_| anyhow!("client task shut down!"))?;
let reply: WebResponse = reply_recv.await?;
+ warnings = reply.warnings;
- reply
+ reply.data
}.await {
Ok(()) => {
},
Ok(hyper::Response::new(hyper::Body::from("Hello World")))
}
+#[allow(unused_variables)] // xxx
async fn run_client(_ic: Arc<InstanceConfig>,
mut web: mpsc::Receiver<WebRequest>)
-> Result<Void, AE>
{
struct Outstanding {
- reply: tokio::sync::oneshot::Sender<WebResponse>,
+ reply_to: tokio::sync::oneshot::Sender<WebResponse>,
max_requests_outstanding: u32,
}
let mut outstanding: VecDeque<Outstanding> = default();
let downbound: VecDeque<(/*xxx*/)> = default();
+ let try_send_response = |
+ reply_to: tokio::sync::oneshot::Sender<WebResponse>,
+ response: WebResponse
+ | {
+ reply_to.send(response)
+ .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
+ };
+
loop {
if let Some(ret) = {
if ! downbound.is_empty() {
None
}
} {
- ret.reply.send(Ok(() /* dummy */))
- .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
+ let response = WebResponse {
+ data: Ok(()),
+ warnings: default(),
+ };
+
+ try_send_response(ret.reply_to, response);
}
select!{
req = web.recv() =>
{
- let req = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
- outstanding.push_back(Outstanding {
- reply: req.reply,
- max_requests_outstanding: 42, // xxx
- });
+ let WebRequest {
+ initial, initial_remaining, length_hint, mut body,
+ reply_to, warnings,
+ } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
+
+ match async {
+
+ let whole_request = read_limited_bytes(
+ usize::MAX /* xxx */,
+ initial,
+ length_hint,
+ &mut body
+ ).await.context("read request body")?;
+
+ dbg!(whole_request.len());
+
+/*
+
+ multipart::ComponentIterator::resume_mid_component(
+ &initial[initial_remaining..],
+ */
+
+ Ok::<_,AE>(())
+ }.await {
+ Ok(()) => outstanding.push_back(Outstanding {
+ reply_to: reply_to,
+ max_requests_outstanding: 42, // xxx
+ }),
+ Err(e) => {
+ try_send_response(reply_to, WebResponse {
+ data: Err(e),
+ warnings,
+ });
+ },
+ }
}
}
}