chiark / gitweb /
c2f11ff09cbc73a5d4d7eb9b421972d375f3a9a4
[hippotat.git] / server / suser.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use super::*;
6
7 #[derive(Debug)]
8 pub struct User {
9   pub ic: Arc<InstanceConfig>,
10   pub web: mpsc::Sender<WebRequest>,
11   pub route: mpsc::Sender<RoutedPacket>,
12 }
13
14 pub async fn run(global: Arc<Global>,
15                  ic: Arc<InstanceConfig>,
16                  mut web: mpsc::Receiver<WebRequest>,
17                  mut routed: mpsc::Receiver<RoutedPacket>)
18                  -> Result<Void, AE>
19 {
20   struct Outstanding {
21     reply_to: oneshot::Sender<WebResponse>,
22     oi: OutstandingInner,
23   }
24   #[derive(Debug)]
25   struct OutstandingInner {
26     deadline: Instant,
27     target_requests_outstanding: u32,
28     max_batch_down: u32,
29   }
30   let mut outstanding: VecDeque<Outstanding> = default();
31   let mut downbound: PacketQueue<RoutedPacketData> = default();
32
33   let try_send_response = |
34     reply_to: oneshot::Sender<WebResponse>,
35     response: WebResponse
36   | {
37     reply_to.send(response)
38       .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
39   };
40
41   loop {
42     let eff_max_batch_down = outstanding
43       .iter()
44       .map(|o| o.oi.max_batch_down)
45       .min()
46       .unwrap_or(ic.max_batch_down)
47       .sat();
48
49     if let Some(req) = {
50       if ! downbound.is_empty() {
51         outstanding.pop_front()
52       } else if let Some((i,_)) = outstanding.iter().enumerate().find({
53         |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
54         // xxx need timeout-based return too
55       }) {
56         Some(outstanding.remove(i).unwrap())
57       } else {
58         None
59       }
60     } {
61       let mut build: FrameQueueBuf = default();
62
63       loop {
64         let next = if let Some(n) = downbound.peek_front() { n }
65                    else { break };
66         // Don't add 1 for the ESC since we will strip one
67         if build.len() + next.len() >= eff_max_batch_down { break }
68         build.esc_push(downbound.pop_front().unwrap());
69       }
70       if ! build.is_empty() {
71         // skip leading ESC
72         build.advance(1);
73       }
74
75       let response = WebResponse {
76         data: Ok(build),
77         warnings: default(),
78       };
79
80       try_send_response(req.reply_to, response);
81     }
82
83     let max = usize::saturating_mul(
84       ic.max_requests_outstanding.sat(),
85       eff_max_batch_down,
86     ).saturating_add(1 /* one boundary SLIP_ESC which we'll trim */);
87
88     while downbound.total_len() > max {
89       let _ = downbound.pop_front();
90     }
91
92     select!{
93       biased;
94
95       data = routed.recv() =>
96       {
97         let data = data.ok_or_else(|| anyhow!("routers shut down!"))?;
98         downbound.push_back(data.data);
99       },
100
101       req = web.recv() =>
102       {
103         let WebRequest {
104           initial, initial_remaining, length_hint, mut body,
105           boundary_finder,
106           reply_to, conn, mut warnings, may_route,
107         } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
108
109         match async {
110
111           let initial_used = initial.len() - initial_remaining;
112
113           let whole_request = read_limited_bytes(
114             ic.max_batch_up.sat(),
115             initial,
116             length_hint,
117             &mut body
118           ).await.context("read request body")?;
119
120           let (meta, mut comps) =
121             multipart::ComponentIterator::resume_mid_component(
122               &whole_request[initial_used..],
123               boundary_finder
124             ).context("resume parsing body, after auth checks")?;
125
126           let mut meta = MetadataFieldIterator::new(&meta);
127
128           macro_rules! meta {
129             { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
130               let $server:ident, $client:ident $($code:tt)*
131             } => {
132               let $v = (||{
133                 let $server = ic.$v;
134                 let $client $($code)*
135                 $(
136                   if $client $badcmp $server {
137                     throw!(anyhow!("mismatch: client={:?} {} server={:?}",
138                                    $client, stringify!($badcmp), $server));
139                   }
140                 )?
141                 Ok::<_,AE>($ret)
142               })().context(stringify!($v))?;
143               //dbg!(&$v);
144             }
145           }
146           meta!{
147             target_requests_outstanding, ( != ), client,
148             let server, client: u32 = meta.need_parse()?;
149           }
150           meta!{
151             http_timeout, ( > ), client,
152             let server, client = Duration::from_secs(meta.need_parse()?);
153           }
154           meta!{
155             mtu, ( != ), client,
156             let server, client: u32 = meta.parse()?.unwrap_or(server);
157           }
158           meta!{
159             max_batch_down, (), min(client, server),
160             let server, client: u32 = meta.parse()?.unwrap_or(server);
161           }
162           meta!{
163             max_batch_up, ( > ), client,
164             let server, client = meta.parse()?.unwrap_or(server);
165           }
166           let _ = max_batch_up; // we don't use this further
167
168           while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
169             if comp.name != PartName::d {
170               warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
171             }
172             slip::processn(Mime2Slip, mtu, comp.payload, |header| {
173               let saddr = ip_packet_addr::<false>(header)?;
174               if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
175               let daddr = ip_packet_addr::<true>(header)?;
176               Ok(daddr)
177             }, |(daddr,packet)| route_packet(
178               &global, &conn, Some(&ic.link.client), daddr,
179               packet, may_route.clone(),
180             ).map(Ok),
181               |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
182             ).await?;
183           }
184
185           let deadline = Instant::now() + http_timeout;
186
187           let oi = OutstandingInner {
188             target_requests_outstanding,
189             max_batch_down,
190             deadline,
191           };
192           Ok::<_,AE>(oi)
193         }.await {
194           Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
195           Err(e) => {
196             try_send_response(reply_to, WebResponse {
197               data: Err(e),
198               warnings,
199             });
200           },
201         }
202       }
203     }
204   }
205   //Err(anyhow!("xxx"))
206 }