target_requests_outstanding: u32,
}
let mut outstanding: VecDeque<Outstanding> = default();
- let downbound: VecDeque<(/*xxx*/)> = default();
+ let mut downbound: PacketQueue<RoutedPacketData> = default();
let try_send_response = |
reply_to: oneshot::Sender<WebResponse>,
};
loop {
- if let Some(ret) = {
+ if let Some(req) = {
if ! downbound.is_empty() {
outstanding.pop_front()
} else if let Some((i,_)) = outstanding.iter().enumerate().find({
None
}
} {
+ let mut build: FrameQueueBuf = default();
+
+ loop {
+ let next = if let Some(n) = downbound.peek_front() { n }
+ else { break };
+ // Don't add 1 for the ESC since we will strip one
+ if build.len() + next.len() >= ic.max_batch_down.sat() { break }
+ build.esc_push(downbound.pop_front().unwrap());
+ }
+ if ! build.is_empty() {
+ // skip leading ESC
+ build.advance(1);
+ }
+
let response = WebResponse {
- data: Ok(vec![ /* xxx */ ]),
+ data: Ok(build),
warnings: default(),
};
- try_send_response(ret.reply_to, response);
+ try_send_response(req.reply_to, response);
+ }
+
+ let max = usize::saturating_mul(
+ ic.max_requests_outstanding.sat(),
+ ic.max_batch_down.sat(),
+ ).saturating_add(1 /* one boundary SLIP_ESC which we'll trim */);
+
+ while downbound.total_len() > max {
+ let _ = downbound.pop_front();
}
select!{
biased;
+ data = routed.recv() =>
+ {
+ let data = data.ok_or_else(|| anyhow!("routers shut down!"))?;
+ downbound.push_back(data.data);
+ },
+
req = web.recv() =>
{
let WebRequest {
initial, initial_remaining, length_hint, mut body,
boundary_finder,
- reply_to, conn, mut warnings,
+ reply_to, conn, mut warnings, may_route,
} = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
match async {
//dbg!(&$v);
}
}
-
meta!{
target_requests_outstanding, ( != ), client,
let server, client: u32 = meta.need_parse()?;
}
-
meta!{
http_timeout, ( > ), client,
let server, client = Duration::from_secs(meta.need_parse()?);
}
-
meta!{
mtu, ( != ), client,
let server, client: u32 = meta.parse()?.unwrap_or(server);
}
-
meta!{
max_batch_down, (), min(client, server),
let server, client: u32 = meta.parse()?.unwrap_or(server);
}
-
meta!{
max_batch_up, ( > ), client,
let server, client = meta.parse()?.unwrap_or(server);
if comp.name != PartName::d {
warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
}
- checkn(Mime2Slip, mtu, comp.payload, |header| {
+ slip::processn(Mime2Slip, mtu, comp.payload, |header| {
let saddr = ip_packet_addr::<false>(header)?;
if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
let daddr = ip_packet_addr::<true>(header)?;
Ok(daddr)
}, |(daddr,packet)| route_packet(
- &global, &conn, &ic.link.client, daddr,packet
- ),
+ &global, &conn, Some(&ic.link.client), daddr,
+ packet, may_route.clone(),
+ ).map(Ok),
|e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
).await?;
}