--- /dev/null
+// Copyright 2021 Ian Jackson and contributors to Hippotat
+// SPDX-License-Identifier: GPL-3.0-or-later
+// There is NO WARRANTY.
+
+use super::*;
+
+#[derive(Debug)]
+pub struct Client {
+ pub ic: Arc<InstanceConfig>,
+ pub web: mpsc::Sender<WebRequest>,
+ pub route: mpsc::Sender<RoutedPacket>,
+}
+
+#[allow(unused_variables)] // xxx
+#[allow(unused_mut)] // xxx
+pub async fn run(global: Arc<Global>,
+ ic: Arc<InstanceConfig>,
+ mut web: mpsc::Receiver<WebRequest>,
+ mut routed: mpsc::Receiver<RoutedPacket>)
+ -> Result<Void, AE>
+{
+ struct Outstanding {
+ reply_to: oneshot::Sender<WebResponse>,
+ oi: OutstandingInner,
+ }
+ #[derive(Debug)]
+ struct OutstandingInner {
+ target_requests_outstanding: u32,
+ }
+ let mut outstanding: VecDeque<Outstanding> = default();
+ let downbound: VecDeque<(/*xxx*/)> = default();
+
+ let try_send_response = |
+ reply_to: oneshot::Sender<WebResponse>,
+ response: WebResponse
+ | {
+ reply_to.send(response)
+ .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
+ };
+
+ loop {
+ if let Some(ret) = {
+ if ! downbound.is_empty() {
+ outstanding.pop_front()
+ } else if let Some((i,_)) = outstanding.iter().enumerate().find({
+ |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
+ }) {
+ Some(outstanding.remove(i).unwrap())
+ } else {
+ None
+ }
+ } {
+ let response = WebResponse {
+ data: Ok(vec![ /* xxx */ ]),
+ warnings: default(),
+ };
+
+ try_send_response(ret.reply_to, response);
+ }
+
+ select!{
+ req = web.recv() =>
+ {
+ let WebRequest {
+ initial, initial_remaining, length_hint, mut body,
+ boundary_finder,
+ reply_to, conn, mut warnings,
+ } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
+
+ match async {
+
+ let initial_used = initial.len() - initial_remaining;
+
+ let whole_request = read_limited_bytes(
+ ic.max_batch_up.sat(),
+ initial,
+ length_hint,
+ &mut body
+ ).await.context("read request body")?;
+
+ let (meta, mut comps) =
+ multipart::ComponentIterator::resume_mid_component(
+ &whole_request[initial_used..],
+ boundary_finder
+ ).context("resume parsing body, after auth checks")?;
+
+ let mut meta = MetadataFieldIterator::new(&meta);
+
+ macro_rules! meta {
+ { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
+ let $server:ident, $client:ident $($code:tt)*
+ } => {
+ let $v = (||{
+ let $server = ic.$v;
+ let $client $($code)*
+ $(
+ if $client $badcmp $server {
+ throw!(anyhow!("mismatch: client={:?} {} server={:?}",
+ $client, stringify!($badcmp), $server));
+ }
+ )?
+ Ok::<_,AE>($ret)
+ })().context(stringify!($v))?;
+ //dbg!(&$v);
+ }
+ }
+
+ meta!{
+ target_requests_outstanding, ( != ), client,
+ let server, client: u32 = meta.need_parse()?;
+ }
+
+ meta!{
+ http_timeout, ( > ), client,
+ let server, client = Duration::from_secs(meta.need_parse()?);
+ }
+
+ meta!{
+ mtu, ( != ), client,
+ let server, client: u32 = meta.parse()?.unwrap_or(server);
+ }
+
+ meta!{
+ max_batch_down, (), min(client, server),
+ let server, client: u32 = meta.parse()?.unwrap_or(server);
+ }
+
+ meta!{
+ max_batch_up, ( > ), client,
+ let server, client = meta.parse()?.unwrap_or(server);
+ }
+
+ while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
+ if comp.name != PartName::d {
+ warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
+ }
+ checkn(Mime2Slip, mtu, comp.payload, |header| {
+ let saddr = ip_packet_addr::<false>(header)?;
+ if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
+ let daddr = ip_packet_addr::<true>(header)?;
+ Ok(daddr)
+ }, |(daddr,packet)| route_packet(
+ &global, &conn, &ic.link.client, daddr,packet
+ ),
+ |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
+ ).await?;
+ }
+
+ let oi = OutstandingInner {
+ target_requests_outstanding,
+ };
+ Ok::<_,AE>(oi)
+ }.await {
+ Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
+ Err(e) => {
+ try_send_response(reply_to, WebResponse {
+ data: Err(e),
+ warnings,
+ });
+ },
+ }
+ }
+ }
+ }
+ //Err(anyhow!("xxx"))
+}