chiark / gitweb /
response plumbing
[hippotat.git] / server / suser.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use super::*;
6
7 #[derive(Debug)]
8 pub struct User {
9   pub ic: Arc<InstanceConfig>,
10   pub web: mpsc::Sender<WebRequest>,
11   pub route: mpsc::Sender<RoutedPacket>,
12 }
13
14 #[allow(unused_variables)] // xxx
15 #[allow(unused_mut)] // xxx
16 pub async fn run(global: Arc<Global>,
17                  ic: Arc<InstanceConfig>,
18                  mut web: mpsc::Receiver<WebRequest>,
19                  mut routed: mpsc::Receiver<RoutedPacket>)
20                  -> Result<Void, AE>
21 {
22   struct Outstanding {
23     reply_to: oneshot::Sender<WebResponse>,
24     oi: OutstandingInner,
25   }
26   #[derive(Debug)]
27   struct OutstandingInner {
28     target_requests_outstanding: u32,
29   }
30   let mut outstanding: VecDeque<Outstanding> = default();
31   let mut downbound: PacketQueue<RoutedPacketData> = default();
32
33   let try_send_response = |
34     reply_to: oneshot::Sender<WebResponse>,
35     response: WebResponse
36   | {
37     reply_to.send(response)
38       .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
39   };
40
41   loop {
42     if let Some(req) = {
43       if ! downbound.is_empty() {
44         outstanding.pop_front()
45       } else if let Some((i,_)) = outstanding.iter().enumerate().find({
46         |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
47       }) {
48         Some(outstanding.remove(i).unwrap())
49       } else {
50         None
51       }
52     } {
53       let mut build: FrameQueueBuf = default();
54
55       loop {
56         let next = if let Some(n) = downbound.peek_front() { n }
57                    else { break };
58         // Don't add 1 for the ESC since we will strip one
59         if build.len() + next.len() >= ic.max_batch_down.sat() { break }
60         build.esc_push(downbound.pop_front().unwrap());
61       }
62       if ! build.is_empty() {
63         // skip leading ESC
64         build.advance(1);
65       }
66
67       let response = WebResponse {
68         data: Ok(build),
69         warnings: default(),
70       };
71
72       try_send_response(req.reply_to, response);
73     }
74
75     let max = usize::saturating_mul(
76       ic.max_requests_outstanding.sat(),
77       ic.max_batch_down.sat(),
78     ).saturating_add(1 /* one boundary SLIP_ESC which we'll trim */);
79
80     while downbound.total_len() > max {
81       let _ = downbound.pop_front();
82     }
83
84     select!{
85       biased;
86
87       data = routed.recv() =>
88       {
89         let data = data.ok_or_else(|| anyhow!("routers shut down!"))?;
90         downbound.push_back(data.data);
91       },
92
93       req = web.recv() =>
94       {
95         let WebRequest {
96           initial, initial_remaining, length_hint, mut body,
97           boundary_finder,
98           reply_to, conn, mut warnings, may_route,
99         } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
100
101         match async {
102
103           let initial_used = initial.len() - initial_remaining;
104
105           let whole_request = read_limited_bytes(
106             ic.max_batch_up.sat(),
107             initial,
108             length_hint,
109             &mut body
110           ).await.context("read request body")?;
111
112           let (meta, mut comps) =
113             multipart::ComponentIterator::resume_mid_component(
114               &whole_request[initial_used..],
115               boundary_finder
116             ).context("resume parsing body, after auth checks")?;
117
118           let mut meta = MetadataFieldIterator::new(&meta);
119
120           macro_rules! meta {
121             { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
122               let $server:ident, $client:ident $($code:tt)*
123             } => {
124               let $v = (||{
125                 let $server = ic.$v;
126                 let $client $($code)*
127                 $(
128                   if $client $badcmp $server {
129                     throw!(anyhow!("mismatch: client={:?} {} server={:?}",
130                                    $client, stringify!($badcmp), $server));
131                   }
132                 )?
133                 Ok::<_,AE>($ret)
134               })().context(stringify!($v))?;
135               //dbg!(&$v);
136             }
137           }
138           meta!{
139             target_requests_outstanding, ( != ), client,
140             let server, client: u32 = meta.need_parse()?;
141           }
142           meta!{
143             http_timeout, ( > ), client,
144             let server, client = Duration::from_secs(meta.need_parse()?);
145           }
146           meta!{
147             mtu, ( != ), client,
148             let server, client: u32 = meta.parse()?.unwrap_or(server);
149           }
150           meta!{
151             max_batch_down, (), min(client, server),
152             let server, client: u32 = meta.parse()?.unwrap_or(server);
153           }
154           meta!{
155             max_batch_up, ( > ), client,
156             let server, client = meta.parse()?.unwrap_or(server);
157           }
158
159           while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
160             if comp.name != PartName::d {
161               warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
162             }
163             slip::processn(Mime2Slip, mtu, comp.payload, |header| {
164               let saddr = ip_packet_addr::<false>(header)?;
165               if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
166               let daddr = ip_packet_addr::<true>(header)?;
167               Ok(daddr)
168             }, |(daddr,packet)| route_packet(
169               &global, &conn, Some(&ic.link.client), daddr,
170               packet, may_route.clone(),
171             ).map(Ok),
172               |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
173             ).await?;
174           }
175
176           let oi = OutstandingInner {
177             target_requests_outstanding,
178           };
179           Ok::<_,AE>(oi)
180         }.await {
181           Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
182           Err(e) => {
183             try_send_response(reply_to, WebResponse {
184               data: Err(e),
185               warnings,
186             });
187           },
188         }
189       }
190     }
191   }
192   //Err(anyhow!("xxx"))
193 }