1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
9 pub ic: Arc<InstanceConfig>,
10 pub web: mpsc::Sender<WebRequest>,
11 pub route: mpsc::Sender<RoutedPacket>,
14 pub async fn run(global: Arc<Global>,
15 ic: Arc<InstanceConfig>,
16 mut web: mpsc::Receiver<WebRequest>,
17 mut routed: mpsc::Receiver<RoutedPacket>)
21 reply_to: oneshot::Sender<WebResponse>,
25 struct OutstandingInner {
27 target_requests_outstanding: u32,
30 let mut outstanding: VecDeque<Outstanding> = default();
31 let mut downbound: PacketQueue<RoutedPacketData> = default();
33 let try_send_response = |
34 reply_to: oneshot::Sender<WebResponse>,
37 reply_to.send(response)
38 .unwrap_or_else(|_: WebResponse| {
40 trace!("unable to send response back to webserver! user={}",
46 let eff_max_batch_down = outstanding
48 .map(|o| o.oi.max_batch_down)
50 .unwrap_or(ic.max_batch_down)
52 let earliest_deadline = outstanding
54 .map(|o| o.oi.deadline)
59 let now = Instant::now();
61 if ! downbound.is_empty() {
62 outstanding.pop_front()
63 } else if let Some((i,_)) = outstanding.iter().enumerate().find({
65 outstanding.len() > o.oi.target_requests_outstanding.sat()
70 Some(outstanding.remove(i).unwrap())
75 let mut build: FrameQueueBuf = default();
78 let next = if let Some(n) = downbound.peek_front() { n }
80 // Don't add 1 for the ESC since we will strip one
81 if build.len() + next.len() >= eff_max_batch_down { break }
82 build.esc_push(downbound.pop_front().unwrap());
84 if ! build.is_empty() {
89 let response = WebResponse {
94 try_send_response(req.reply_to, response);
97 let max = usize::saturating_mul(
98 ic.max_requests_outstanding.sat(),
100 ).saturating_add(1 /* one boundary SLIP_ESC which we'll trim */);
102 while downbound.total_len() > max {
103 let _ = downbound.pop_front();
104 trace!("{} discarding downbound-queue-full", &ic.link);
110 data = routed.recv() =>
112 let data = data.ok_or_else(|| anyhow!("routers shut down!"))?;
113 downbound.push_back(data.data);
119 initial, initial_remaining, length_hint, mut body,
121 reply_to, conn, mut warnings, may_route,
122 } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
126 let initial_used = initial.len() - initial_remaining;
128 let whole_request = read_limited_bytes(
129 ic.max_batch_up.sat(),
133 ).await.context("read request body")?;
135 let (meta, mut comps) =
136 multipart::ComponentIterator::resume_mid_component(
137 &whole_request[initial_used..],
139 ).context("resume parsing body, after auth checks")?;
141 let mut meta = MetadataFieldIterator::new(&meta);
144 { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
145 let $server:ident, $client:ident $($code:tt)*
149 let $client $($code)*
151 if $client $badcmp $server {
152 throw!(anyhow!("mismatch: client={:?} {} server={:?}",
153 $client, stringify!($badcmp), $server));
157 })().context(stringify!($v))?;
162 target_requests_outstanding, ( != ), client,
163 let server, client: u32 = meta.need_parse()?;
166 http_timeout, ( > ), client,
167 let server, client = Duration::from_secs(meta.need_parse()?);
171 let server, client: u32 = meta.parse()?.unwrap_or(server);
174 max_batch_down, (), min(client, server),
175 let server, client: u32 = meta.parse()?.unwrap_or(server);
178 max_batch_up, ( > ), client,
179 let server, client = meta.parse()?.unwrap_or(server);
181 let _ = max_batch_up; // we don't use this further
183 while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
184 if comp.name != PartName::d {
185 warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
187 slip::processn(Mime2Slip, mtu, comp.payload, |header| {
188 let saddr = ip_packet_addr::<false>(header)?;
189 if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
190 let daddr = ip_packet_addr::<true>(header)?;
192 }, |(daddr,packet)| route_packet(
193 &global, &conn, Some(&ic.link.client), daddr,
194 packet, may_route.clone(),
196 |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
200 let deadline = Instant::now() + http_timeout;
202 let oi = OutstandingInner {
203 target_requests_outstanding,
209 Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
211 try_send_response(reply_to, WebResponse {
219 () = async {if let Some(deadline) = earliest_deadline {
220 tokio::time::sleep_until(deadline).await;
222 future::pending().await