chiark / gitweb /
65b7e489e7182da08b514e3e6961f231b2ee4312
[hippotat.git] / src / bin / server.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 #[derive(StructOpt,Debug)]
8 pub struct Opts {
9   #[structopt(flatten)]
10   log: LogOpts,
11
12   #[structopt(flatten)]
13   config: config::Opts,
14 }
15
16 const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
17
18 type AllClients = HashMap<ClientName, ClientHandles>;
19
20 struct ClientHandles {
21   ic: Arc<InstanceConfig>,
22   web: tokio::sync::mpsc::Sender<WebRequest>,
23 }
24
25 /// Sent from hyper worker pool task to client task
26 #[allow(dead_code)] // xxx
27 #[derive(Debug)]
28 struct WebRequest {
29   // initial part of body
30   // used up to and including first 2 lines of metadata
31   // end delimiter for the metadata not yet located, but in here somewhere
32   initial: Box<[u8]>,
33   initial_remaining: usize,
34   length_hint: usize,
35   body: hyper::body::Body,
36   boundary_finder: multipart::BoundaryFinder,
37   reply_to: tokio::sync::oneshot::Sender<WebResponse>,
38   warnings: Warnings,
39 }
40
41 /// Reply from client task to hyper worker pool task
42 #[allow(dead_code)] // xxx
43 #[derive(Debug)]
44 struct WebResponse {
45   warnings: Warnings,
46   data: Result<WebResponseData, AE>,
47 }
48
49 type WebResponseData = ();
50
51 async fn handle(
52   all_clients: Arc<AllClients>,
53   req: hyper::Request<hyper::Body>
54 ) -> Result<hyper::Response<hyper::Body>, Void> {
55   if req.method() == Method::GET {
56     let mut resp = hyper::Response::new(hyper::Body::from("hippotat\r\n"));
57     resp.headers_mut().insert(
58       "Content-Type",
59       "text/plain; charset=US-ASCII".try_into().unwrap()
60     );
61     return Ok(resp)
62   }
63
64   let mut warnings: Warnings = default();
65
66   match async {
67
68     let get_header = |hn: &str| {
69       let mut values = req.headers().get_all(hn).iter();
70       let v = values.next().ok_or_else(|| anyhow!("missing {}", hn))?;
71       if values.next().is_some() { throw!(anyhow!("multiple {}!", hn)); }
72       let v = v.to_str().context(anyhow!("interpret {} as UTF-8", hn))?;
73       Ok::<_,AE>(v)
74     };
75
76     let mkboundary = |b: &'_ _| format!("\n--{}", b).into_bytes();
77     let boundary = match (||{
78       let t = get_header("Content-Type")?;
79       let t: mime::Mime = t.parse().context("parse Content-Type")?;
80       if t.type_() != "multipart" { throw!(anyhow!("not multipart/")) }
81       let b = mime::BOUNDARY;
82       let b = t.get_param(b).ok_or_else(|| anyhow!("missing boundary=..."))?;
83       if t.subtype() != "form-data" {
84         warnings.add(&"Content-Type not /form-data")?;
85       }
86       let b = mkboundary(b.as_str());
87       Ok::<_,AE>(b)
88     })() {
89       Ok(y) => y,
90       Err(e) => {
91         warnings.add(&e.wrap_err("guessing boundary"))?;
92         mkboundary("b")
93       },
94     };
95
96     let length_hint: usize = (||{
97       let clength = get_header("Content-Length")?;
98       let clength = clength.parse().context("parse Content-Length")?;
99       Ok::<_,AE>(clength)
100     })().unwrap_or_else(
101       |e| { let _ = warnings.add(&e.wrap_err("parsing Content-Length")); 0 }
102     );
103
104     let mut body = req.into_body();
105     let initial = match read_limited_bytes(
106       METADATA_MAX_LEN, default(), length_hint, &mut body
107     ).await {
108       Ok(all) => all,
109       Err(ReadLimitedError::Truncated { sofar,.. }) => sofar,
110       Err(ReadLimitedError::Hyper(e)) => throw!(e),
111     };
112
113     let boundary_finder = memmem::Finder::new(&boundary);
114     let mut boundary_iter = boundary_finder.find_iter(&initial);
115
116     let start = if initial.starts_with(&boundary[1..]) { boundary.len()-1 }
117     else if let Some(start) = boundary_iter.next() { start + boundary.len() }
118     else { throw!(anyhow!("initial boundary not found")) };
119
120     let comp = multipart::process_boundary
121       (&mut warnings, &initial[start..], PartName::m)?
122       .ok_or_else(|| anyhow!(r#"no "m" component"#))?;
123
124     if comp.name != PartName::m { throw!(anyhow!(
125       r#"first multipart component must be name="m""#
126     )) }
127
128     let mut meta = MetadataFieldIterator::new(comp.payload);
129
130     let client: ClientName = meta.need_parse().context("client addr")?;
131
132     let mut hmac_got = [0; HMAC_L];
133     let (client_time, hmac_got_l) = (||{
134       let token: &str = meta.need_next().context(r#"find in "m""#)?;
135       let (time_t, hmac_b64) = token.split_once(' ')
136         .ok_or_else(|| anyhow!("split"))?;
137       let time_t = u64::from_str_radix(time_t, 16).context("parse time_t")?;
138       let l = io::copy(
139         &mut base64::read::DecoderReader::new(&mut hmac_b64.as_bytes(),
140                                               BASE64_CONFIG),
141         &mut &mut hmac_got[..]
142       ).context("parse b64 token")?;
143       let l = l.try_into()?;
144       Ok::<_,AE>((time_t, l))
145     })().context("token")?;
146     let hmac_got = &hmac_got[0..hmac_got_l];
147
148     let client = all_clients.get(&client);
149
150     // We attempt to hide whether the client exists we don't try to
151     // hide the hash lookup computationgs, but we do try to hide the
152     // HMAC computation by always doing it.  We hope that the compiler
153     // doesn't produce a specialised implementation for the dummy
154     // secret value.
155     let client_exists = subtle::Choice::from(client.is_some() as u8);
156     let secret = client.map(|c| c.ic.secret.0.as_bytes());
157     let secret = secret.unwrap_or(&[0x55; HMAC_B][..]);
158     let client_time_s = format!("{:x}", client_time);
159     let hmac_exp = token_hmac(secret, client_time_s.as_bytes());
160     // We also definitely want a consttime memeq for the hmac value
161     let hmac_ok = hmac_got.ct_eq(&hmac_exp);
162     //dbg!(DumpHex(&hmac_exp), client.is_some());
163     //dbg!(DumpHex(hmac_got), hmac_ok, client_exists);
164     if ! bool::from(hmac_ok & client_exists) {
165       throw!(anyhow!("xxx should be a 403 error"));
166     }
167
168     let client = client.unwrap();
169     let now = time_t_now();
170     let chk_skew = |a: u64, b: u64, c_ahead_behind| {
171       if let Some(a_ahead) = a.checked_sub(b) {
172         if a_ahead > client.ic.max_clock_skew.as_secs() {
173           throw!(anyhow!("too much clock skew (client {} by {})",
174                          c_ahead_behind, a_ahead));
175         }
176       }
177       Ok::<_,AE>(())
178     };
179     chk_skew(client_time, now, "ahead")?;
180     chk_skew(now, client_time, "behind")?;
181
182     let initial_remaining = meta.remaining_bytes_len();
183
184     //eprintln!("boundary={:?} start={} name={:?} client={}",
185     // boundary, start, &comp.name, &client.ic);
186
187     let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
188     trace!("{} request xxx={}", &client.ic, initial.len());
189     let wreq = WebRequest {
190       initial,
191       initial_remaining,
192       length_hint,
193       boundary_finder: boundary_finder.into_owned(),
194       body,
195       warnings: mem::take(&mut warnings),
196       reply_to
197     };
198
199     client.web.try_send(wreq)
200       .map_err(|_| anyhow!("client task shut down!"))?;
201
202     let reply: WebResponse = reply_recv.await?;
203     warnings = reply.warnings;
204
205     reply.data
206   }.await {
207     Ok(()) => {
208     },
209     Err(e) => {
210       eprintln!("error={}", e);
211     }
212   }
213
214   eprintln!("warnings={:?}", &warnings);
215
216   Ok(hyper::Response::new(hyper::Body::from("Hello World")))
217 }
218
219 #[allow(unused_variables)] // xxx
220 #[allow(unused_mut)] // xxx
221 async fn run_client(ic: Arc<InstanceConfig>,
222                     mut web: mpsc::Receiver<WebRequest>)
223                     -> Result<Void, AE>
224 {
225   struct Outstanding {
226     reply_to: tokio::sync::oneshot::Sender<WebResponse>,
227     target_requests_outstanding: u32,
228   }
229   let mut outstanding: VecDeque<Outstanding> = default();
230   let  downbound: VecDeque<(/*xxx*/)> = default();
231
232   let try_send_response = |
233     reply_to: tokio::sync::oneshot::Sender<WebResponse>,
234     response: WebResponse
235   | {
236     reply_to.send(response)
237       .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
238   };
239
240   loop {
241     if let Some(ret) = {
242       if ! downbound.is_empty() {
243         outstanding.pop_front()
244       } else if let Some((i,_)) = outstanding.iter().enumerate().find({
245         |(_,o)| outstanding.len() > o.target_requests_outstanding.sat()
246       }) {
247         Some(outstanding.remove(i).unwrap())
248       } else {
249         None
250       }
251     } {
252       let response = WebResponse {
253         data: Ok(()),
254         warnings: default(),
255       };
256
257       dbg!(&response);
258       try_send_response(ret.reply_to, response);
259     }
260
261     select!{
262       req = web.recv() =>
263       {
264         let WebRequest {
265           initial, initial_remaining, length_hint, mut body,
266           boundary_finder,
267           reply_to, mut warnings,
268         } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
269
270         match async {
271
272           let initial_used = initial.len() - initial_remaining;
273
274           let whole_request = read_limited_bytes(
275             ic.max_batch_up.sat(),
276             initial,
277             length_hint,
278             &mut body
279           ).await.context("read request body")?;
280
281           let (meta, mut comps) =
282             multipart::ComponentIterator::resume_mid_component(
283               &whole_request[initial_used..],
284               boundary_finder
285             ).context("resume parsing body, after auth checks")?;
286
287           let mut meta = MetadataFieldIterator::new(&meta);
288
289           macro_rules! meta {
290             { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
291               let $server:ident, $client:ident $($code:tt)*
292             } => {
293               let $v = (||{
294                 let $server = ic.$v;
295                 let $client $($code)*
296                 $(
297                   if $client $badcmp $server {
298                     throw!(anyhow!("mismatch: client={:?} {} server={:?}",
299                                    $client, stringify!($badcmp), $server));
300                   }
301                 )?
302                 Ok::<_,AE>($ret)
303               })().context(stringify!($v))?;
304               dbg!(&$v);
305             }
306           }
307
308           meta!{
309             target_requests_outstanding, ( != ), client,
310             let server, client: u32 = meta.need_parse()?;
311           }
312
313           meta!{
314             http_timeout, ( > ), client,
315             let server, client = Duration::from_secs(meta.need_parse()?);
316           }
317
318           meta!{
319             max_batch_down, (), min(client, server),
320             let server, client: u32 = meta.parse()?.unwrap_or(server);
321           }
322
323           meta!{
324             max_batch_up, ( > ), client,
325             let server, client = meta.parse()?.unwrap_or(server);
326           }
327
328           while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
329             if comp.name != PartName::d {
330               warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
331             }
332             dbg!(comp.name, DumpHex(comp.payload));
333           }
334
335           Ok::<_,AE>(target_requests_outstanding)
336         }.await {
337           Ok(target_requests_outstanding) => {
338             outstanding.push_back(Outstanding {
339               reply_to,
340               target_requests_outstanding,
341             });
342           },
343           Err(e) => {
344             try_send_response(reply_to, WebResponse {
345               data: Err(e),
346               warnings,
347             });
348           },
349         }
350       }
351     }
352   }
353   //Err(anyhow!("xxx"))
354 }
355
356 #[tokio::main]
357 async fn main() {
358   let opts = Opts::from_args();
359   let mut tasks: Vec<(
360     JoinHandle<AE>,
361     String,
362   )> = vec![];
363
364   let (global, ipif) = config::startup(
365     "hippotatd", LinkEnd::Server,
366     &opts.config, &opts.log, |ics|
367   {
368     let global = config::InstanceConfigGlobal::from(&ics);
369     let ipif = Ipif::start(&global.ipif, None)?;
370
371     let all_clients: AllClients = ics.into_iter().map(|ic| {
372       let ic = Arc::new(ic);
373
374       let (web_send, web_recv) = mpsc::channel(
375         5 // xxx should me max_requests_outstanding but that's
376           // marked client-only so needs rework
377       );
378
379       let ic_ = ic.clone();
380       tasks.push((tokio::spawn(async move {
381         run_client(ic_, web_recv).await.void_unwrap_err()
382       }), format!("client {}", &ic)));
383
384       (ic.link.client,
385        ClientHandles {
386          ic,
387          web: web_send,
388        })
389     }).collect();
390     let all_clients = Arc::new(all_clients);
391
392     for addr in &global.addrs {
393       let all_clients_ = all_clients.clone();
394       let make_service = hyper::service::make_service_fn(move |_conn| {
395         let all_clients_ = all_clients_.clone();
396         async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
397           handle(all_clients_.clone(), req)
398         }) ) } }
399       );
400
401       let addr = SocketAddr::new(*addr, global.port);
402       let server = hyper::Server::try_bind(&addr)
403         .context("bind")?
404         .http1_preserve_header_case(true)
405         .serve(make_service);
406       info!("listening on {}", &addr);
407       let task = tokio::task::spawn(async move {
408         match server.await {
409           Ok(()) => anyhow!("shut down?!"),
410           Err(e) => e.into(),
411         }
412       });
413       tasks.push((task, format!("http server {}", addr)));
414     }
415     
416     Ok((global, ipif))
417   });
418
419   let died = future::select_all(
420     tasks.iter_mut().map(|e| &mut e.0)
421   ).await;
422   error!("xxx {:?}", &died);
423
424   ipif.quitting(None).await;
425
426   dbg!(global);
427 }