chiark / gitweb /
Apply OpenSSL exception to Hippotat files
[hippotat.git] / server / suser.rs
1 // Copyright 2021-2022 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later WITH LicenseRef-Hippotat-OpenSSL-Exception
3 // There is NO WARRANTY.
4
5 use super::*;
6
7 #[derive(Debug)]
8 pub struct User {
9   pub ic: Arc<InstanceConfig>,
10   pub web: mpsc::Sender<WebRequest>,
11   pub route: mpsc::Sender<RoutedPacket>,
12 }
13
14 pub async fn run(global: Arc<Global>,
15                  ic: Arc<InstanceConfig>,
16                  mut web: mpsc::Receiver<WebRequest>,
17                  mut routed: mpsc::Receiver<RoutedPacket>)
18                  -> Result<Void, AE>
19 {
20   struct Outstanding {
21     reply_to: oneshot::Sender<WebResponse>,
22     oi: OutstandingInner,
23   }
24   #[derive(Debug)]
25   struct OutstandingInner {
26     deadline: Instant,
27     target_requests_outstanding: u32,
28     max_batch_down: u32,
29   }
30   let mut outstanding: VecDeque<Outstanding> = default();
31   let mut downbound: PacketQueue<RoutedPacketData> = default();
32
33   let try_send_response = |
34     reply_to: oneshot::Sender<WebResponse>,
35     response: WebResponse
36   | {
37     reply_to.send(response)
38       .unwrap_or_else(|_: WebResponse| {
39         /* oh dear */
40         trace!("unable to send response back to webserver! user={}",
41                &ic.link.client);
42       });
43   };
44
45   loop {
46     let eff_max_batch_down = outstanding
47       .iter()
48       .map(|o| o.oi.max_batch_down)
49       .min()
50       .unwrap_or(ic.max_batch_down)
51       .sat();
52     let earliest_deadline = outstanding
53       .iter()
54       .map(|o| o.oi.deadline)
55       .min();
56
57
58     if let Some(req) = {
59       let now = Instant::now();
60
61       if ! downbound.is_empty() {
62         outstanding.pop_front()
63       } else if let Some((i,_)) = outstanding.iter().enumerate().find({
64         |(_,o)| {
65           outstanding.len() > o.oi.target_requests_outstanding.sat()
66             ||
67           o.oi.deadline < now
68         }
69       }) {
70         Some(outstanding.remove(i).unwrap())
71       } else {
72         None
73       }
74     } {
75       let mut build: FrameQueueBuf = default();
76
77       loop {
78         let next = if let Some(n) = downbound.peek_front() { n }
79                    else { break };
80         // Don't add 1 for the ESC since we will strip one
81         if build.len() + next.len() >= eff_max_batch_down { break }
82         build.esc_push(downbound.pop_front().unwrap());
83       }
84       if ! build.is_empty() {
85         // skip leading ESC
86         build.advance(1);
87       }
88
89       let response = WebResponse {
90         data: Ok(build),
91         warnings: default(),
92       };
93
94       try_send_response(req.reply_to, response);
95     }
96
97     let max = usize::saturating_mul(
98       ic.max_requests_outstanding.sat(),
99       eff_max_batch_down,
100     ).saturating_add(1 /* one boundary SLIP_ESC which we'll trim */);
101
102     while downbound.total_len() > max {
103       let _ = downbound.pop_front();
104       trace!("{} discarding downbound-queue-full", &ic.link);
105     }
106
107     select!{
108       biased;
109
110       data = routed.recv() =>
111       {
112         let data = data.ok_or_else(|| anyhow!("routers shut down!"))?;
113         downbound.push_back(data.data);
114       },
115
116       req = web.recv() =>
117       {
118         let WebRequest {
119           initial, initial_remaining, length_hint, mut body,
120           boundary_finder,
121           reply_to, conn, mut warnings, may_route,
122         } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
123
124         match async {
125
126           let initial_used = initial.len() - initial_remaining;
127
128           let whole_request = read_limited_bytes(
129             ic.max_batch_up.sat(),
130             initial,
131             length_hint,
132             &mut body
133           ).await.context("read request body")?;
134
135           let (meta, mut comps) =
136             multipart::ComponentIterator::resume_mid_component(
137               &whole_request[initial_used..],
138               boundary_finder
139             ).context("resume parsing body, after auth checks")?;
140
141           let mut meta = MetadataFieldIterator::new(&meta);
142
143           macro_rules! meta {
144             { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
145               let $server:ident, $client:ident $($code:tt)*
146             } => {
147               let $v = (||{
148                 let $server = ic.$v;
149                 let $client $($code)*
150                 $(
151                   if $client $badcmp $server {
152                     throw!(anyhow!("mismatch: client={:?} {} server={:?}",
153                                    $client, stringify!($badcmp), $server));
154                   }
155                 )?
156                 Ok::<_,AE>($ret)
157               })().context(stringify!($v))?;
158               //dbg!(&$v);
159             }
160           }
161           meta!{
162             target_requests_outstanding, ( != ), client,
163             let server, client: u32 = meta.need_parse()?;
164           }
165           meta!{
166             http_timeout, ( > ), client,
167             let server, client = Duration::from_secs(meta.need_parse()?);
168           }
169           meta!{
170             mtu, ( != ), client,
171             let server, client: u32 = meta.parse()?.unwrap_or(server);
172           }
173           meta!{
174             max_batch_down, (), min(client, server),
175             let server, client: u32 = meta.parse()?.unwrap_or(server);
176           }
177           meta!{
178             max_batch_up, ( > ), client,
179             let server, client = meta.parse()?.unwrap_or(server);
180           }
181           let _ = max_batch_up; // we don't use this further
182
183           while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
184             if comp.name != PartName::d {
185               warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
186             }
187             slip::processn(Mime2Slip, mtu, comp.payload, |header| {
188               let saddr = ip_packet_addr::<false>(header)?;
189               if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
190               let daddr = ip_packet_addr::<true>(header)?;
191               Ok(daddr)
192             }, |(daddr,packet)| route_packet(
193               &global, &conn, Some(&ic.link.client), daddr,
194               packet, may_route.clone(),
195             ).map(Ok),
196               |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
197             ).await?;
198           }
199
200           let deadline = Instant::now() + http_timeout;
201
202           let oi = OutstandingInner {
203             target_requests_outstanding,
204             max_batch_down,
205             deadline,
206           };
207           Ok::<_,AE>(oi)
208         }.await {
209           Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
210           Err(e) => {
211             try_send_response(reply_to, WebResponse {
212               data: Err(e),
213               warnings,
214             });
215           },
216         }
217       }
218
219       () = async {if let Some(deadline) = earliest_deadline {
220         tokio::time::sleep_until(deadline).await;
221       } else {
222         future::pending().await
223       } } =>
224       {
225       }
226     }
227   }
228 }