chiark / gitweb /
reorg source
[hippotat.git] / server / suser.rs
diff --git a/server/suser.rs b/server/suser.rs
new file mode 100644 (file)
index 0000000..d246384
--- /dev/null
@@ -0,0 +1,166 @@
+// Copyright 2021 Ian Jackson and contributors to Hippotat
+// SPDX-License-Identifier: GPL-3.0-or-later
+// There is NO WARRANTY.
+
+use super::*;
+
+#[derive(Debug)]
+pub struct Client {
+  pub ic: Arc<InstanceConfig>,
+  pub web: mpsc::Sender<WebRequest>,
+  pub route: mpsc::Sender<RoutedPacket>,
+}
+
+#[allow(unused_variables)] // xxx
+#[allow(unused_mut)] // xxx
+pub async fn run(global: Arc<Global>,
+                 ic: Arc<InstanceConfig>,
+                 mut web: mpsc::Receiver<WebRequest>,
+                 mut routed: mpsc::Receiver<RoutedPacket>)
+                 -> Result<Void, AE>
+{
+  struct Outstanding {
+    reply_to: oneshot::Sender<WebResponse>,
+    oi: OutstandingInner,
+  }
+  #[derive(Debug)]
+  struct OutstandingInner {
+    target_requests_outstanding: u32,
+  }
+  let mut outstanding: VecDeque<Outstanding> = default();
+  let  downbound: VecDeque<(/*xxx*/)> = default();
+
+  let try_send_response = |
+    reply_to: oneshot::Sender<WebResponse>,
+    response: WebResponse
+  | {
+    reply_to.send(response)
+      .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
+  };
+
+  loop {
+    if let Some(ret) = {
+      if ! downbound.is_empty() {
+        outstanding.pop_front()
+      } else if let Some((i,_)) = outstanding.iter().enumerate().find({
+        |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
+      }) {
+        Some(outstanding.remove(i).unwrap())
+      } else {
+        None
+      }
+    } {
+      let response = WebResponse {
+        data: Ok(vec![ /* xxx */ ]),
+        warnings: default(),
+      };
+
+      try_send_response(ret.reply_to, response);
+    }
+
+    select!{
+      req = web.recv() =>
+      {
+        let WebRequest {
+          initial, initial_remaining, length_hint, mut body,
+          boundary_finder,
+          reply_to, conn, mut warnings,
+        } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
+
+        match async {
+
+          let initial_used = initial.len() - initial_remaining;
+
+          let whole_request = read_limited_bytes(
+            ic.max_batch_up.sat(),
+            initial,
+            length_hint,
+            &mut body
+          ).await.context("read request body")?;
+
+          let (meta, mut comps) =
+            multipart::ComponentIterator::resume_mid_component(
+              &whole_request[initial_used..],
+              boundary_finder
+            ).context("resume parsing body, after auth checks")?;
+
+          let mut meta = MetadataFieldIterator::new(&meta);
+
+          macro_rules! meta {
+            { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
+              let $server:ident, $client:ident $($code:tt)*
+            } => {
+              let $v = (||{
+                let $server = ic.$v;
+                let $client $($code)*
+                $(
+                  if $client $badcmp $server {
+                    throw!(anyhow!("mismatch: client={:?} {} server={:?}",
+                                   $client, stringify!($badcmp), $server));
+                  }
+                )?
+                Ok::<_,AE>($ret)
+              })().context(stringify!($v))?;
+              //dbg!(&$v);
+            }
+          }
+
+          meta!{
+            target_requests_outstanding, ( != ), client,
+            let server, client: u32 = meta.need_parse()?;
+          }
+
+          meta!{
+            http_timeout, ( > ), client,
+            let server, client = Duration::from_secs(meta.need_parse()?);
+          }
+
+          meta!{
+            mtu, ( != ), client,
+            let server, client: u32 = meta.parse()?.unwrap_or(server);
+          }
+
+          meta!{
+            max_batch_down, (), min(client, server),
+            let server, client: u32 = meta.parse()?.unwrap_or(server);
+          }
+
+          meta!{
+            max_batch_up, ( > ), client,
+            let server, client = meta.parse()?.unwrap_or(server);
+          }
+
+          while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
+            if comp.name != PartName::d {
+              warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
+            }
+            checkn(Mime2Slip, mtu, comp.payload, |header| {
+              let saddr = ip_packet_addr::<false>(header)?;
+              if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
+              let daddr = ip_packet_addr::<true>(header)?;
+              Ok(daddr)
+            }, |(daddr,packet)| route_packet(
+              &global, &conn, &ic.link.client, daddr,packet
+            ),
+              |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
+            ).await?;
+          }
+
+          let oi = OutstandingInner {
+            target_requests_outstanding,
+          };
+          Ok::<_,AE>(oi)
+        }.await {
+          Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
+          Err(e) => {
+            try_send_response(reply_to, WebResponse {
+              data: Err(e),
+              warnings,
+            });
+          },
+        }
+      }
+    }
+  }
+  //Err(anyhow!("xxx"))
+}