1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
9 pub ic: Arc<InstanceConfig>,
10 pub web: mpsc::Sender<WebRequest>,
11 pub route: mpsc::Sender<RoutedPacket>,
14 #[allow(unused_variables)] // xxx
15 #[allow(unused_mut)] // xxx
16 pub async fn run(global: Arc<Global>,
17 ic: Arc<InstanceConfig>,
18 mut web: mpsc::Receiver<WebRequest>,
19 mut routed: mpsc::Receiver<RoutedPacket>)
23 reply_to: oneshot::Sender<WebResponse>,
27 struct OutstandingInner {
28 target_requests_outstanding: u32,
30 let mut outstanding: VecDeque<Outstanding> = default();
31 let downbound: VecDeque<(/*xxx*/)> = default();
33 let try_send_response = |
34 reply_to: oneshot::Sender<WebResponse>,
37 reply_to.send(response)
38 .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
43 if ! downbound.is_empty() {
44 outstanding.pop_front()
45 } else if let Some((i,_)) = outstanding.iter().enumerate().find({
46 |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
48 Some(outstanding.remove(i).unwrap())
53 let response = WebResponse {
54 data: Ok(vec![ /* xxx */ ]),
58 try_send_response(ret.reply_to, response);
65 initial, initial_remaining, length_hint, mut body,
67 reply_to, conn, mut warnings,
68 } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
72 let initial_used = initial.len() - initial_remaining;
74 let whole_request = read_limited_bytes(
75 ic.max_batch_up.sat(),
79 ).await.context("read request body")?;
81 let (meta, mut comps) =
82 multipart::ComponentIterator::resume_mid_component(
83 &whole_request[initial_used..],
85 ).context("resume parsing body, after auth checks")?;
87 let mut meta = MetadataFieldIterator::new(&meta);
90 { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
91 let $server:ident, $client:ident $($code:tt)*
97 if $client $badcmp $server {
98 throw!(anyhow!("mismatch: client={:?} {} server={:?}",
99 $client, stringify!($badcmp), $server));
103 })().context(stringify!($v))?;
109 target_requests_outstanding, ( != ), client,
110 let server, client: u32 = meta.need_parse()?;
114 http_timeout, ( > ), client,
115 let server, client = Duration::from_secs(meta.need_parse()?);
120 let server, client: u32 = meta.parse()?.unwrap_or(server);
124 max_batch_down, (), min(client, server),
125 let server, client: u32 = meta.parse()?.unwrap_or(server);
129 max_batch_up, ( > ), client,
130 let server, client = meta.parse()?.unwrap_or(server);
133 while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
134 if comp.name != PartName::d {
135 warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
137 checkn(Mime2Slip, mtu, comp.payload, |header| {
138 let saddr = ip_packet_addr::<false>(header)?;
139 if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
140 let daddr = ip_packet_addr::<true>(header)?;
142 }, |(daddr,packet)| route_packet(
143 &global, &conn, &ic.link.client, daddr,packet
145 |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
149 let oi = OutstandingInner {
150 target_requests_outstanding,
154 Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
156 try_send_response(reply_to, WebResponse {
165 //Err(anyhow!("xxx"))