chiark / gitweb /
server: wip recv
[hippotat.git] / src / bin / server.rs
index 1764470ba118ae8dc47c560a6161a1b20f13b79f..4a8070481f77e815a776f20863152265b9120adb 100644 (file)
@@ -24,6 +24,7 @@ struct ClientHandles {
 
 /// Sent from hyper worker pool task to client task
 #[allow(dead_code)] // xxx
+#[derive(Debug)]
 struct WebRequest {
   // initial part of body
   // used up to and including first 2 lines of metadata
@@ -39,6 +40,7 @@ struct WebRequest {
 
 /// Reply from client task to hyper worker pool task
 #[allow(dead_code)] // xxx
+#[derive(Debug)]
 struct WebResponse {
   warnings: Warnings,
   data: Result<WebResponseData, AE>,
@@ -46,6 +48,11 @@ struct WebResponse {
 
 type WebResponseData = ();
 
+#[throws(PacketError)]
+pub fn route_packet(packet: Box<[u8]>, daddr: IpAddr) {
+  trace!("xxx discarding packet daddr={:?} len={}", daddr, packet.len());
+}
+
 async fn handle(
   all_clients: Arc<AllClients>,
   req: hyper::Request<hyper::Body>
@@ -183,6 +190,7 @@ async fn handle(
     // boundary, start, &comp.name, &client.ic);
 
     let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
+    trace!("{} request xxx={}", &client.ic, initial.len());
     let wreq = WebRequest {
       initial,
       initial_remaining,
@@ -192,7 +200,6 @@ async fn handle(
       warnings: mem::take(&mut warnings),
       reply_to
     };
-    trace!("{} request", &client.ic);
 
     client.web.try_send(wreq)
       .map_err(|_| anyhow!("client task shut down!"))?;
@@ -222,7 +229,11 @@ async fn run_client(ic: Arc<InstanceConfig>,
 {
   struct Outstanding {
     reply_to: tokio::sync::oneshot::Sender<WebResponse>,
-    max_requests_outstanding: u32,
+    oi: OutstandingInner,
+  }
+  #[derive(Debug)]
+  struct OutstandingInner {
+    target_requests_outstanding: u32,
   }
   let mut outstanding: VecDeque<Outstanding> = default();
   let  downbound: VecDeque<(/*xxx*/)> = default();
@@ -240,7 +251,7 @@ async fn run_client(ic: Arc<InstanceConfig>,
       if ! downbound.is_empty() {
         outstanding.pop_front()
       } else if let Some((i,_)) = outstanding.iter().enumerate().find({
-        |(_,o)| outstanding.len() > o.max_requests_outstanding.sat()
+        |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
       }) {
         Some(outstanding.remove(i).unwrap())
       } else {
@@ -252,6 +263,7 @@ async fn run_client(ic: Arc<InstanceConfig>,
         warnings: default(),
       };
 
+      dbg!(&response);
       try_send_response(ret.reply_to, response);
     }
 
@@ -261,11 +273,13 @@ async fn run_client(ic: Arc<InstanceConfig>,
         let WebRequest {
           initial, initial_remaining, length_hint, mut body,
           boundary_finder,
-          reply_to, warnings,
+          reply_to, mut warnings,
         } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
 
         match async {
 
+          let initial_used = initial.len() - initial_remaining;
+
           let whole_request = read_limited_bytes(
             ic.max_batch_up.sat(),
             initial,
@@ -273,66 +287,78 @@ async fn run_client(ic: Arc<InstanceConfig>,
             &mut body
           ).await.context("read request body")?;
 
-          let (meta, comps) =
+          let (meta, mut comps) =
             multipart::ComponentIterator::resume_mid_component(
-              &whole_request[initial_remaining..],
+              &whole_request[initial_used..],
               boundary_finder
             ).context("resume parsing body, after auth checks")?;
 
           let mut meta = MetadataFieldIterator::new(&meta);
-/*
-          macro_rules!(
-
-          let target_requests_outstanding = {
-            let server = ic.target_requests_outstanding;
-            let client: u32 = meta.need_parse()?;
-            if client != server {
-              throw!(anyhow!("mismatch: client={} != server={}",
-                             client, server));
-            }
-            Ok::<_,AE>(client)
-          }.context("target_requests_outstanding")?;
-
-          let http_timeout: u64 = {
-            let server = ic.http_timeout;
-            let client = Duration::from_secs(meta.need_parse()?);
-            if client > server {
-              throw!(anyhow!("mismatch: client={} > server={}",
-                             client, server));
+
+          macro_rules! meta {
+            { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
+              let $server:ident, $client:ident $($code:tt)*
+            } => {
+              let $v = (||{
+                let $server = ic.$v;
+                let $client $($code)*
+                $(
+                  if $client $badcmp $server {
+                    throw!(anyhow!("mismatch: client={:?} {} server={:?}",
+                                   $client, stringify!($badcmp), $server));
+                  }
+                )?
+                Ok::<_,AE>($ret)
+              })().context(stringify!($v))?;
+              dbg!(&$v);
             }
-            Ok::<_,AE>(client)
-          }.context("http_timeout")?;
-
-          let max_batch_down = {
-            let server = ic.max_batch_down;
-            let client: u32 = meta.parse().context("max_batch_down")?;
-            let to_use = min(client, server);
-            Ok::<_,AE>(to_use)
-          }.context("max_batch_down")?;
-
-          let max_batch_up = {
-            let server = ic.max_batch_up;
-            let client = meta.parse().context("max_batch_up")?;
-            if client > server {
-              throw!(anyhow!("mismatch: client={} != server={}",
-                             client, server));
+          }
+
+          meta!{
+            target_requests_outstanding, ( != ), client,
+            let server, client: u32 = meta.need_parse()?;
+          }
+
+          meta!{
+            http_timeout, ( > ), client,
+            let server, client = Duration::from_secs(meta.need_parse()?);
+          }
+
+          meta!{
+            mtu, ( != ), client,
+            let server, client: u32 = meta.parse()?.unwrap_or(server);
+          }
+
+          meta!{
+            max_batch_down, (), min(client, server),
+            let server, client: u32 = meta.parse()?.unwrap_or(server);
+          }
+
+          meta!{
+            max_batch_up, ( > ), client,
+            let server, client = meta.parse()?.unwrap_or(server);
+          }
+
+          while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
+            if comp.name != PartName::d {
+              warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
             }
-              
-            throw!(anyhow!(
- "target_requests_outstanding mismatch: client={} server={}",
-              target_requests_outstanding, 
-              ic.target_requests_outstanding
-            ))
+            checkn(Mime2Slip, mtu, comp.payload, |header| {
+              let saddr = ip_packet_addr::<false>(header)?;
+              if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
+              let daddr = ip_packet_addr::<true>(header)?;
+              Ok(daddr)
+            }, |(daddr,packet)| route_packet(daddr,packet),
+               |e| { let _xxx = warnings.add(&e); }
+            )?;
           }
 
-          if ic.
-*/
-          Ok::<_,AE>(())
+          let oi = OutstandingInner {
+            target_requests_outstanding,
+          };
+          Ok::<_,AE>(oi)
         }.await {
-          Ok(()) => outstanding.push_back(Outstanding {
-            reply_to: reply_to,
-            max_requests_outstanding: 42, // xxx
-          }),
+          Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
           Err(e) => {
             try_send_response(reply_to, WebResponse {
               data: Err(e),