/// Sent from hyper worker pool task to client task
#[allow(dead_code)] // xxx
+#[derive(Debug)]
struct WebRequest {
// initial part of body
// used up to and including first 2 lines of metadata
/// Reply from client task to hyper worker pool task
#[allow(dead_code)] // xxx
+#[derive(Debug)]
struct WebResponse {
warnings: Warnings,
data: Result<WebResponseData, AE>,
// boundary, start, &comp.name, &client.ic);
let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
+ trace!("{} request xxx={}", &client.ic, initial.len());
let wreq = WebRequest {
initial,
initial_remaining,
warnings: mem::take(&mut warnings),
reply_to
};
- trace!("{} request", &client.ic);
client.web.try_send(wreq)
.map_err(|_| anyhow!("client task shut down!"))?;
{
struct Outstanding {
reply_to: tokio::sync::oneshot::Sender<WebResponse>,
- max_requests_outstanding: u32,
+ target_requests_outstanding: u32,
}
let mut outstanding: VecDeque<Outstanding> = default();
let downbound: VecDeque<(/*xxx*/)> = default();
if ! downbound.is_empty() {
outstanding.pop_front()
} else if let Some((i,_)) = outstanding.iter().enumerate().find({
- |(_,o)| outstanding.len() > o.max_requests_outstanding.sat()
+ |(_,o)| outstanding.len() > o.target_requests_outstanding.sat()
}) {
Some(outstanding.remove(i).unwrap())
} else {
warnings: default(),
};
+ dbg!(&response);
try_send_response(ret.reply_to, response);
}
let WebRequest {
initial, initial_remaining, length_hint, mut body,
boundary_finder,
- reply_to, warnings,
+ reply_to, mut warnings,
} = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
match async {
+ let initial_used = initial.len() - initial_remaining;
+
let whole_request = read_limited_bytes(
ic.max_batch_up.sat(),
initial,
&mut body
).await.context("read request body")?;
- let (meta, comps) =
+ let (meta, mut comps) =
multipart::ComponentIterator::resume_mid_component(
- &whole_request[initial_remaining..],
+ &whole_request[initial_used..],
boundary_finder
).context("resume parsing body, after auth checks")?;
let server, client = meta.parse()?.unwrap_or(server);
}
- Ok::<_,AE>(())
+ while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
+ if comp.name != PartName::d {
+ warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
+ }
+ dbg!(comp.name, DumpHex(comp.payload));
+ }
+
+ Ok::<_,AE>(target_requests_outstanding)
}.await {
- Ok(()) => outstanding.push_back(Outstanding {
- reply_to: reply_to,
- max_requests_outstanding: 42, // xxx
- }),
+ Ok(target_requests_outstanding) => {
+ outstanding.push_back(Outstanding {
+ reply_to,
+ target_requests_outstanding,
+ });
+ },
Err(e) => {
try_send_response(reply_to, WebResponse {
data: Err(e),