/// Sent from hyper worker pool task to client task
#[allow(dead_code)] // xxx
+#[derive(Debug)]
struct WebRequest {
// initial part of body
// used up to and including first 2 lines of metadata
/// Reply from client task to hyper worker pool task
#[allow(dead_code)] // xxx
+#[derive(Debug)]
struct WebResponse {
warnings: Warnings,
data: Result<WebResponseData, AE>,
type WebResponseData = ();
+#[throws(PacketError)]
+pub fn route_packet(packet: Box<[u8]>, daddr: IpAddr) {
+ trace!("xxx discarding packet daddr={:?} len={}", daddr, packet.len());
+}
+
async fn handle(
all_clients: Arc<AllClients>,
req: hyper::Request<hyper::Body>
// boundary, start, &comp.name, &client.ic);
let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
+ trace!("{} request xxx={}", &client.ic, initial.len());
let wreq = WebRequest {
initial,
initial_remaining,
warnings: mem::take(&mut warnings),
reply_to
};
- trace!("{} request", &client.ic);
client.web.try_send(wreq)
.map_err(|_| anyhow!("client task shut down!"))?;
{
struct Outstanding {
reply_to: tokio::sync::oneshot::Sender<WebResponse>,
- max_requests_outstanding: u32,
+ oi: OutstandingInner,
+ }
+ #[derive(Debug)]
+ struct OutstandingInner {
+ target_requests_outstanding: u32,
}
let mut outstanding: VecDeque<Outstanding> = default();
let downbound: VecDeque<(/*xxx*/)> = default();
if ! downbound.is_empty() {
outstanding.pop_front()
} else if let Some((i,_)) = outstanding.iter().enumerate().find({
- |(_,o)| outstanding.len() > o.max_requests_outstanding.sat()
+ |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
}) {
Some(outstanding.remove(i).unwrap())
} else {
warnings: default(),
};
+ dbg!(&response);
try_send_response(ret.reply_to, response);
}
let WebRequest {
initial, initial_remaining, length_hint, mut body,
boundary_finder,
- reply_to, warnings,
+ reply_to, mut warnings,
} = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
match async {
+ let initial_used = initial.len() - initial_remaining;
+
let whole_request = read_limited_bytes(
ic.max_batch_up.sat(),
initial,
&mut body
).await.context("read request body")?;
- let (meta, comps) =
+ let (meta, mut comps) =
multipart::ComponentIterator::resume_mid_component(
- &whole_request[initial_remaining..],
+ &whole_request[initial_used..],
boundary_finder
).context("resume parsing body, after auth checks")?;
let mut meta = MetadataFieldIterator::new(&meta);
-/*
- macro_rules!(
-
- let target_requests_outstanding = {
- let server = ic.target_requests_outstanding;
- let client: u32 = meta.need_parse()?;
- if client != server {
- throw!(anyhow!("mismatch: client={} != server={}",
- client, server));
- }
- Ok::<_,AE>(client)
- }.context("target_requests_outstanding")?;
-
- let http_timeout: u64 = {
- let server = ic.http_timeout;
- let client = Duration::from_secs(meta.need_parse()?);
- if client > server {
- throw!(anyhow!("mismatch: client={} > server={}",
- client, server));
+
+ macro_rules! meta {
+ { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
+ let $server:ident, $client:ident $($code:tt)*
+ } => {
+ let $v = (||{
+ let $server = ic.$v;
+ let $client $($code)*
+ $(
+ if $client $badcmp $server {
+ throw!(anyhow!("mismatch: client={:?} {} server={:?}",
+ $client, stringify!($badcmp), $server));
+ }
+ )?
+ Ok::<_,AE>($ret)
+ })().context(stringify!($v))?;
+ dbg!(&$v);
}
- Ok::<_,AE>(client)
- }.context("http_timeout")?;
-
- let max_batch_down = {
- let server = ic.max_batch_down;
- let client: u32 = meta.parse().context("max_batch_down")?;
- let to_use = min(client, server);
- Ok::<_,AE>(to_use)
- }.context("max_batch_down")?;
-
- let max_batch_up = {
- let server = ic.max_batch_up;
- let client = meta.parse().context("max_batch_up")?;
- if client > server {
- throw!(anyhow!("mismatch: client={} != server={}",
- client, server));
+ }
+
+ meta!{
+ target_requests_outstanding, ( != ), client,
+ let server, client: u32 = meta.need_parse()?;
+ }
+
+ meta!{
+ http_timeout, ( > ), client,
+ let server, client = Duration::from_secs(meta.need_parse()?);
+ }
+
+ meta!{
+ mtu, ( != ), client,
+ let server, client: u32 = meta.parse()?.unwrap_or(server);
+ }
+
+ meta!{
+ max_batch_down, (), min(client, server),
+ let server, client: u32 = meta.parse()?.unwrap_or(server);
+ }
+
+ meta!{
+ max_batch_up, ( > ), client,
+ let server, client = meta.parse()?.unwrap_or(server);
+ }
+
+ while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
+ if comp.name != PartName::d {
+ warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
}
-
- throw!(anyhow!(
- "target_requests_outstanding mismatch: client={} server={}",
- target_requests_outstanding,
- ic.target_requests_outstanding
- ))
+ checkn(Mime2Slip, mtu, comp.payload, |header| {
+ let saddr = ip_packet_addr::<false>(header)?;
+ if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
+ let daddr = ip_packet_addr::<true>(header)?;
+ Ok(daddr)
+ }, |(daddr,packet)| route_packet(daddr,packet),
+ |e| { let _xxx = warnings.add(&e); }
+ )?;
}
- if ic.
-*/
- Ok::<_,AE>(())
+ let oi = OutstandingInner {
+ target_requests_outstanding,
+ };
+ Ok::<_,AE>(oi)
}.await {
- Ok(()) => outstanding.push_back(Outstanding {
- reply_to: reply_to,
- max_requests_outstanding: 42, // xxx
- }),
+ Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
Err(e) => {
try_send_response(reply_to, WebResponse {
data: Err(e),