chiark / gitweb /
cf0eec1d34425d054a6ea298db5facd97b202836
[hippotat.git] / src / bin / server.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 #[derive(StructOpt,Debug)]
8 pub struct Opts {
9   #[structopt(flatten)]
10   log: LogOpts,
11
12   #[structopt(flatten)]
13   config: config::Opts,
14 }
15
16 const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
17
18 type AllClients = HashMap<ClientName, ClientHandles>;
19
20 struct ClientHandles {
21   ic: Arc<InstanceConfig>,
22   web: tokio::sync::mpsc::Sender<WebRequest>,
23 }
24
25 /// Sent from hyper worker pool task to client task
26 #[allow(dead_code)] // xxx
27 #[derive(Debug)]
28 struct WebRequest {
29   // initial part of body
30   // used up to and including first 2 lines of metadata
31   // end delimiter for the metadata not yet located, but in here somewhere
32   initial: Box<[u8]>,
33   initial_remaining: usize,
34   length_hint: usize,
35   body: hyper::body::Body,
36   boundary_finder: multipart::BoundaryFinder,
37   reply_to: tokio::sync::oneshot::Sender<WebResponse>,
38   warnings: Warnings,
39 }
40
41 /// Reply from client task to hyper worker pool task
42 #[allow(dead_code)] // xxx
43 #[derive(Debug)]
44 struct WebResponse {
45   warnings: Warnings,
46   data: Result<WebResponseData, AE>,
47 }
48
49 type WebResponseData = ();
50
51 async fn handle(
52   all_clients: Arc<AllClients>,
53   req: hyper::Request<hyper::Body>
54 ) -> Result<hyper::Response<hyper::Body>, Void> {
55   if req.method() == Method::GET {
56     let mut resp = hyper::Response::new(hyper::Body::from("hippotat\r\n"));
57     resp.headers_mut().insert(
58       "Content-Type",
59       "text/plain; charset=US-ASCII".try_into().unwrap()
60     );
61     return Ok(resp)
62   }
63
64   let mut warnings: Warnings = default();
65
66   match async {
67
68     let get_header = |hn: &str| {
69       let mut values = req.headers().get_all(hn).iter();
70       let v = values.next().ok_or_else(|| anyhow!("missing {}", hn))?;
71       if values.next().is_some() { throw!(anyhow!("multiple {}!", hn)); }
72       let v = v.to_str().context(anyhow!("interpret {} as UTF-8", hn))?;
73       Ok::<_,AE>(v)
74     };
75
76     let mkboundary = |b: &'_ _| format!("\n--{}", b).into_bytes();
77     let boundary = match (||{
78       let t = get_header("Content-Type")?;
79       let t: mime::Mime = t.parse().context("parse Content-Type")?;
80       if t.type_() != "multipart" { throw!(anyhow!("not multipart/")) }
81       let b = mime::BOUNDARY;
82       let b = t.get_param(b).ok_or_else(|| anyhow!("missing boundary=..."))?;
83       if t.subtype() != "form-data" {
84         warnings.add(&"Content-Type not /form-data")?;
85       }
86       let b = mkboundary(b.as_str());
87       Ok::<_,AE>(b)
88     })() {
89       Ok(y) => y,
90       Err(e) => {
91         warnings.add(&e.wrap_err("guessing boundary"))?;
92         mkboundary("b")
93       },
94     };
95
96     let length_hint: usize = (||{
97       let clength = get_header("Content-Length")?;
98       let clength = clength.parse().context("parse Content-Length")?;
99       Ok::<_,AE>(clength)
100     })().unwrap_or_else(
101       |e| { let _ = warnings.add(&e.wrap_err("parsing Content-Length")); 0 }
102     );
103
104     let mut body = req.into_body();
105     let initial = match read_limited_bytes(
106       METADATA_MAX_LEN, default(), length_hint, &mut body
107     ).await {
108       Ok(all) => all,
109       Err(ReadLimitedError::Truncated { sofar,.. }) => sofar,
110       Err(ReadLimitedError::Hyper(e)) => throw!(e),
111     };
112
113     let boundary_finder = memmem::Finder::new(&boundary);
114     let mut boundary_iter = boundary_finder.find_iter(&initial);
115
116     let start = if initial.starts_with(&boundary[1..]) { boundary.len()-1 }
117     else if let Some(start) = boundary_iter.next() { start + boundary.len() }
118     else { throw!(anyhow!("initial boundary not found")) };
119
120     let comp = multipart::process_boundary
121       (&mut warnings, &initial[start..], PartName::m)?
122       .ok_or_else(|| anyhow!(r#"no "m" component"#))?;
123
124     if comp.name != PartName::m { throw!(anyhow!(
125       r#"first multipart component must be name="m""#
126     )) }
127
128     let mut meta = MetadataFieldIterator::new(comp.payload);
129
130     let client: ClientName = meta.need_parse().context("client addr")?;
131
132     let mut hmac_got = [0; HMAC_L];
133     let (client_time, hmac_got_l) = (||{
134       let token: &str = meta.need_next().context(r#"find in "m""#)?;
135       let (time_t, hmac_b64) = token.split_once(' ')
136         .ok_or_else(|| anyhow!("split"))?;
137       let time_t = u64::from_str_radix(time_t, 16).context("parse time_t")?;
138       let l = io::copy(
139         &mut base64::read::DecoderReader::new(&mut hmac_b64.as_bytes(),
140                                               BASE64_CONFIG),
141         &mut &mut hmac_got[..]
142       ).context("parse b64 token")?;
143       let l = l.try_into()?;
144       Ok::<_,AE>((time_t, l))
145     })().context("token")?;
146     let hmac_got = &hmac_got[0..hmac_got_l];
147
148     let client = all_clients.get(&client);
149
150     // We attempt to hide whether the client exists we don't try to
151     // hide the hash lookup computationgs, but we do try to hide the
152     // HMAC computation by always doing it.  We hope that the compiler
153     // doesn't produce a specialised implementation for the dummy
154     // secret value.
155     let client_exists = subtle::Choice::from(client.is_some() as u8);
156     let secret = client.map(|c| c.ic.secret.0.as_bytes());
157     let secret = secret.unwrap_or(&[0x55; HMAC_B][..]);
158     let client_time_s = format!("{:x}", client_time);
159     let hmac_exp = token_hmac(secret, client_time_s.as_bytes());
160     // We also definitely want a consttime memeq for the hmac value
161     let hmac_ok = hmac_got.ct_eq(&hmac_exp);
162     //dbg!(DumpHex(&hmac_exp), client.is_some());
163     //dbg!(DumpHex(hmac_got), hmac_ok, client_exists);
164     if ! bool::from(hmac_ok & client_exists) {
165       throw!(anyhow!("xxx should be a 403 error"));
166     }
167
168     let client = client.unwrap();
169     let now = time_t_now();
170     let chk_skew = |a: u64, b: u64, c_ahead_behind| {
171       if let Some(a_ahead) = a.checked_sub(b) {
172         if a_ahead > client.ic.max_clock_skew.as_secs() {
173           throw!(anyhow!("too much clock skew (client {} by {})",
174                          c_ahead_behind, a_ahead));
175         }
176       }
177       Ok::<_,AE>(())
178     };
179     chk_skew(client_time, now, "ahead")?;
180     chk_skew(now, client_time, "behind")?;
181
182     let initial_remaining = meta.remaining_bytes_len();
183
184     //eprintln!("boundary={:?} start={} name={:?} client={}",
185     // boundary, start, &comp.name, &client.ic);
186
187     let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
188     trace!("{} request xxx={}", &client.ic, initial.len());
189     let wreq = WebRequest {
190       initial,
191       initial_remaining,
192       length_hint,
193       boundary_finder: boundary_finder.into_owned(),
194       body,
195       warnings: mem::take(&mut warnings),
196       reply_to
197     };
198
199     client.web.try_send(wreq)
200       .map_err(|_| anyhow!("client task shut down!"))?;
201
202     let reply: WebResponse = reply_recv.await?;
203     warnings = reply.warnings;
204
205     reply.data
206   }.await {
207     Ok(()) => {
208     },
209     Err(e) => {
210       eprintln!("error={}", e);
211     }
212   }
213
214   eprintln!("warnings={:?}", &warnings);
215
216   Ok(hyper::Response::new(hyper::Body::from("Hello World")))
217 }
218
219 #[allow(unused_variables)] // xxx
220 #[allow(unused_mut)] // xxx
221 async fn run_client(ic: Arc<InstanceConfig>,
222                     mut web: mpsc::Receiver<WebRequest>)
223                     -> Result<Void, AE>
224 {
225   struct Outstanding {
226     reply_to: tokio::sync::oneshot::Sender<WebResponse>,
227     oi: OutstandingInner,
228   }
229   #[derive(Debug)]
230   struct OutstandingInner {
231     target_requests_outstanding: u32,
232   }
233   let mut outstanding: VecDeque<Outstanding> = default();
234   let  downbound: VecDeque<(/*xxx*/)> = default();
235
236   let try_send_response = |
237     reply_to: tokio::sync::oneshot::Sender<WebResponse>,
238     response: WebResponse
239   | {
240     reply_to.send(response)
241       .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
242   };
243
244   loop {
245     if let Some(ret) = {
246       if ! downbound.is_empty() {
247         outstanding.pop_front()
248       } else if let Some((i,_)) = outstanding.iter().enumerate().find({
249         |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
250       }) {
251         Some(outstanding.remove(i).unwrap())
252       } else {
253         None
254       }
255     } {
256       let response = WebResponse {
257         data: Ok(()),
258         warnings: default(),
259       };
260
261       dbg!(&response);
262       try_send_response(ret.reply_to, response);
263     }
264
265     select!{
266       req = web.recv() =>
267       {
268         let WebRequest {
269           initial, initial_remaining, length_hint, mut body,
270           boundary_finder,
271           reply_to, mut warnings,
272         } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
273
274         match async {
275
276           let initial_used = initial.len() - initial_remaining;
277
278           let whole_request = read_limited_bytes(
279             ic.max_batch_up.sat(),
280             initial,
281             length_hint,
282             &mut body
283           ).await.context("read request body")?;
284
285           let (meta, mut comps) =
286             multipart::ComponentIterator::resume_mid_component(
287               &whole_request[initial_used..],
288               boundary_finder
289             ).context("resume parsing body, after auth checks")?;
290
291           let mut meta = MetadataFieldIterator::new(&meta);
292
293           macro_rules! meta {
294             { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
295               let $server:ident, $client:ident $($code:tt)*
296             } => {
297               let $v = (||{
298                 let $server = ic.$v;
299                 let $client $($code)*
300                 $(
301                   if $client $badcmp $server {
302                     throw!(anyhow!("mismatch: client={:?} {} server={:?}",
303                                    $client, stringify!($badcmp), $server));
304                   }
305                 )?
306                 Ok::<_,AE>($ret)
307               })().context(stringify!($v))?;
308               dbg!(&$v);
309             }
310           }
311
312           meta!{
313             target_requests_outstanding, ( != ), client,
314             let server, client: u32 = meta.need_parse()?;
315           }
316
317           meta!{
318             http_timeout, ( > ), client,
319             let server, client = Duration::from_secs(meta.need_parse()?);
320           }
321
322           meta!{
323             max_batch_down, (), min(client, server),
324             let server, client: u32 = meta.parse()?.unwrap_or(server);
325           }
326
327           meta!{
328             max_batch_up, ( > ), client,
329             let server, client = meta.parse()?.unwrap_or(server);
330           }
331
332           while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
333             if comp.name != PartName::d {
334               warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
335             }
336             dbg!(comp.name, DumpHex(comp.payload));
337           }
338
339           let oi = OutstandingInner {
340             target_requests_outstanding,
341           };
342           Ok::<_,AE>(oi)
343         }.await {
344           Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
345           Err(e) => {
346             try_send_response(reply_to, WebResponse {
347               data: Err(e),
348               warnings,
349             });
350           },
351         }
352       }
353     }
354   }
355   //Err(anyhow!("xxx"))
356 }
357
358 #[tokio::main]
359 async fn main() {
360   let opts = Opts::from_args();
361   let mut tasks: Vec<(
362     JoinHandle<AE>,
363     String,
364   )> = vec![];
365
366   let (global, ipif) = config::startup(
367     "hippotatd", LinkEnd::Server,
368     &opts.config, &opts.log, |ics|
369   {
370     let global = config::InstanceConfigGlobal::from(&ics);
371     let ipif = Ipif::start(&global.ipif, None)?;
372
373     let all_clients: AllClients = ics.into_iter().map(|ic| {
374       let ic = Arc::new(ic);
375
376       let (web_send, web_recv) = mpsc::channel(
377         5 // xxx should me max_requests_outstanding but that's
378           // marked client-only so needs rework
379       );
380
381       let ic_ = ic.clone();
382       tasks.push((tokio::spawn(async move {
383         run_client(ic_, web_recv).await.void_unwrap_err()
384       }), format!("client {}", &ic)));
385
386       (ic.link.client,
387        ClientHandles {
388          ic,
389          web: web_send,
390        })
391     }).collect();
392     let all_clients = Arc::new(all_clients);
393
394     for addr in &global.addrs {
395       let all_clients_ = all_clients.clone();
396       let make_service = hyper::service::make_service_fn(move |_conn| {
397         let all_clients_ = all_clients_.clone();
398         async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
399           handle(all_clients_.clone(), req)
400         }) ) } }
401       );
402
403       let addr = SocketAddr::new(*addr, global.port);
404       let server = hyper::Server::try_bind(&addr)
405         .context("bind")?
406         .http1_preserve_header_case(true)
407         .serve(make_service);
408       info!("listening on {}", &addr);
409       let task = tokio::task::spawn(async move {
410         match server.await {
411           Ok(()) => anyhow!("shut down?!"),
412           Err(e) => e.into(),
413         }
414       });
415       tasks.push((task, format!("http server {}", addr)));
416     }
417     
418     Ok((global, ipif))
419   });
420
421   let died = future::select_all(
422     tasks.iter_mut().map(|e| &mut e.0)
423   ).await;
424   error!("xxx {:?}", &died);
425
426   ipif.quitting(None).await;
427
428   dbg!(global);
429 }