chiark / gitweb /
server: wip recv
[hippotat.git] / src / bin / server.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 #[derive(StructOpt,Debug)]
8 pub struct Opts {
9   #[structopt(flatten)]
10   log: LogOpts,
11
12   #[structopt(flatten)]
13   config: config::Opts,
14 }
15
16 const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
17
18 type AllClients = HashMap<ClientName, ClientHandles>;
19
20 struct ClientHandles {
21   ic: Arc<InstanceConfig>,
22   web: tokio::sync::mpsc::Sender<WebRequest>,
23 }
24
25 /// Sent from hyper worker pool task to client task
26 #[allow(dead_code)] // xxx
27 #[derive(Debug)]
28 struct WebRequest {
29   // initial part of body
30   // used up to and including first 2 lines of metadata
31   // end delimiter for the metadata not yet located, but in here somewhere
32   initial: Box<[u8]>,
33   initial_remaining: usize,
34   length_hint: usize,
35   body: hyper::body::Body,
36   boundary_finder: multipart::BoundaryFinder,
37   reply_to: tokio::sync::oneshot::Sender<WebResponse>,
38   warnings: Warnings,
39 }
40
41 /// Reply from client task to hyper worker pool task
42 #[allow(dead_code)] // xxx
43 #[derive(Debug)]
44 struct WebResponse {
45   warnings: Warnings,
46   data: Result<WebResponseData, AE>,
47 }
48
49 type WebResponseData = ();
50
51 #[throws(PacketError)]
52 pub fn route_packet(packet: Box<[u8]>, daddr: IpAddr) {
53   trace!("xxx discarding packet daddr={:?} len={}", daddr, packet.len());
54 }
55
56 async fn handle(
57   all_clients: Arc<AllClients>,
58   req: hyper::Request<hyper::Body>
59 ) -> Result<hyper::Response<hyper::Body>, Void> {
60   if req.method() == Method::GET {
61     let mut resp = hyper::Response::new(hyper::Body::from("hippotat\r\n"));
62     resp.headers_mut().insert(
63       "Content-Type",
64       "text/plain; charset=US-ASCII".try_into().unwrap()
65     );
66     return Ok(resp)
67   }
68
69   let mut warnings: Warnings = default();
70
71   match async {
72
73     let get_header = |hn: &str| {
74       let mut values = req.headers().get_all(hn).iter();
75       let v = values.next().ok_or_else(|| anyhow!("missing {}", hn))?;
76       if values.next().is_some() { throw!(anyhow!("multiple {}!", hn)); }
77       let v = v.to_str().context(anyhow!("interpret {} as UTF-8", hn))?;
78       Ok::<_,AE>(v)
79     };
80
81     let mkboundary = |b: &'_ _| format!("\n--{}", b).into_bytes();
82     let boundary = match (||{
83       let t = get_header("Content-Type")?;
84       let t: mime::Mime = t.parse().context("parse Content-Type")?;
85       if t.type_() != "multipart" { throw!(anyhow!("not multipart/")) }
86       let b = mime::BOUNDARY;
87       let b = t.get_param(b).ok_or_else(|| anyhow!("missing boundary=..."))?;
88       if t.subtype() != "form-data" {
89         warnings.add(&"Content-Type not /form-data")?;
90       }
91       let b = mkboundary(b.as_str());
92       Ok::<_,AE>(b)
93     })() {
94       Ok(y) => y,
95       Err(e) => {
96         warnings.add(&e.wrap_err("guessing boundary"))?;
97         mkboundary("b")
98       },
99     };
100
101     let length_hint: usize = (||{
102       let clength = get_header("Content-Length")?;
103       let clength = clength.parse().context("parse Content-Length")?;
104       Ok::<_,AE>(clength)
105     })().unwrap_or_else(
106       |e| { let _ = warnings.add(&e.wrap_err("parsing Content-Length")); 0 }
107     );
108
109     let mut body = req.into_body();
110     let initial = match read_limited_bytes(
111       METADATA_MAX_LEN, default(), length_hint, &mut body
112     ).await {
113       Ok(all) => all,
114       Err(ReadLimitedError::Truncated { sofar,.. }) => sofar,
115       Err(ReadLimitedError::Hyper(e)) => throw!(e),
116     };
117
118     let boundary_finder = memmem::Finder::new(&boundary);
119     let mut boundary_iter = boundary_finder.find_iter(&initial);
120
121     let start = if initial.starts_with(&boundary[1..]) { boundary.len()-1 }
122     else if let Some(start) = boundary_iter.next() { start + boundary.len() }
123     else { throw!(anyhow!("initial boundary not found")) };
124
125     let comp = multipart::process_boundary
126       (&mut warnings, &initial[start..], PartName::m)?
127       .ok_or_else(|| anyhow!(r#"no "m" component"#))?;
128
129     if comp.name != PartName::m { throw!(anyhow!(
130       r#"first multipart component must be name="m""#
131     )) }
132
133     let mut meta = MetadataFieldIterator::new(comp.payload);
134
135     let client: ClientName = meta.need_parse().context("client addr")?;
136
137     let mut hmac_got = [0; HMAC_L];
138     let (client_time, hmac_got_l) = (||{
139       let token: &str = meta.need_next().context(r#"find in "m""#)?;
140       let (time_t, hmac_b64) = token.split_once(' ')
141         .ok_or_else(|| anyhow!("split"))?;
142       let time_t = u64::from_str_radix(time_t, 16).context("parse time_t")?;
143       let l = io::copy(
144         &mut base64::read::DecoderReader::new(&mut hmac_b64.as_bytes(),
145                                               BASE64_CONFIG),
146         &mut &mut hmac_got[..]
147       ).context("parse b64 token")?;
148       let l = l.try_into()?;
149       Ok::<_,AE>((time_t, l))
150     })().context("token")?;
151     let hmac_got = &hmac_got[0..hmac_got_l];
152
153     let client = all_clients.get(&client);
154
155     // We attempt to hide whether the client exists we don't try to
156     // hide the hash lookup computationgs, but we do try to hide the
157     // HMAC computation by always doing it.  We hope that the compiler
158     // doesn't produce a specialised implementation for the dummy
159     // secret value.
160     let client_exists = subtle::Choice::from(client.is_some() as u8);
161     let secret = client.map(|c| c.ic.secret.0.as_bytes());
162     let secret = secret.unwrap_or(&[0x55; HMAC_B][..]);
163     let client_time_s = format!("{:x}", client_time);
164     let hmac_exp = token_hmac(secret, client_time_s.as_bytes());
165     // We also definitely want a consttime memeq for the hmac value
166     let hmac_ok = hmac_got.ct_eq(&hmac_exp);
167     //dbg!(DumpHex(&hmac_exp), client.is_some());
168     //dbg!(DumpHex(hmac_got), hmac_ok, client_exists);
169     if ! bool::from(hmac_ok & client_exists) {
170       throw!(anyhow!("xxx should be a 403 error"));
171     }
172
173     let client = client.unwrap();
174     let now = time_t_now();
175     let chk_skew = |a: u64, b: u64, c_ahead_behind| {
176       if let Some(a_ahead) = a.checked_sub(b) {
177         if a_ahead > client.ic.max_clock_skew.as_secs() {
178           throw!(anyhow!("too much clock skew (client {} by {})",
179                          c_ahead_behind, a_ahead));
180         }
181       }
182       Ok::<_,AE>(())
183     };
184     chk_skew(client_time, now, "ahead")?;
185     chk_skew(now, client_time, "behind")?;
186
187     let initial_remaining = meta.remaining_bytes_len();
188
189     //eprintln!("boundary={:?} start={} name={:?} client={}",
190     // boundary, start, &comp.name, &client.ic);
191
192     let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
193     trace!("{} request xxx={}", &client.ic, initial.len());
194     let wreq = WebRequest {
195       initial,
196       initial_remaining,
197       length_hint,
198       boundary_finder: boundary_finder.into_owned(),
199       body,
200       warnings: mem::take(&mut warnings),
201       reply_to
202     };
203
204     client.web.try_send(wreq)
205       .map_err(|_| anyhow!("client task shut down!"))?;
206
207     let reply: WebResponse = reply_recv.await?;
208     warnings = reply.warnings;
209
210     reply.data
211   }.await {
212     Ok(()) => {
213     },
214     Err(e) => {
215       eprintln!("error={}", e);
216     }
217   }
218
219   eprintln!("warnings={:?}", &warnings);
220
221   Ok(hyper::Response::new(hyper::Body::from("Hello World")))
222 }
223
224 #[allow(unused_variables)] // xxx
225 #[allow(unused_mut)] // xxx
226 async fn run_client(ic: Arc<InstanceConfig>,
227                     mut web: mpsc::Receiver<WebRequest>)
228                     -> Result<Void, AE>
229 {
230   struct Outstanding {
231     reply_to: tokio::sync::oneshot::Sender<WebResponse>,
232     oi: OutstandingInner,
233   }
234   #[derive(Debug)]
235   struct OutstandingInner {
236     target_requests_outstanding: u32,
237   }
238   let mut outstanding: VecDeque<Outstanding> = default();
239   let  downbound: VecDeque<(/*xxx*/)> = default();
240
241   let try_send_response = |
242     reply_to: tokio::sync::oneshot::Sender<WebResponse>,
243     response: WebResponse
244   | {
245     reply_to.send(response)
246       .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
247   };
248
249   loop {
250     if let Some(ret) = {
251       if ! downbound.is_empty() {
252         outstanding.pop_front()
253       } else if let Some((i,_)) = outstanding.iter().enumerate().find({
254         |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
255       }) {
256         Some(outstanding.remove(i).unwrap())
257       } else {
258         None
259       }
260     } {
261       let response = WebResponse {
262         data: Ok(()),
263         warnings: default(),
264       };
265
266       dbg!(&response);
267       try_send_response(ret.reply_to, response);
268     }
269
270     select!{
271       req = web.recv() =>
272       {
273         let WebRequest {
274           initial, initial_remaining, length_hint, mut body,
275           boundary_finder,
276           reply_to, mut warnings,
277         } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
278
279         match async {
280
281           let initial_used = initial.len() - initial_remaining;
282
283           let whole_request = read_limited_bytes(
284             ic.max_batch_up.sat(),
285             initial,
286             length_hint,
287             &mut body
288           ).await.context("read request body")?;
289
290           let (meta, mut comps) =
291             multipart::ComponentIterator::resume_mid_component(
292               &whole_request[initial_used..],
293               boundary_finder
294             ).context("resume parsing body, after auth checks")?;
295
296           let mut meta = MetadataFieldIterator::new(&meta);
297
298           macro_rules! meta {
299             { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
300               let $server:ident, $client:ident $($code:tt)*
301             } => {
302               let $v = (||{
303                 let $server = ic.$v;
304                 let $client $($code)*
305                 $(
306                   if $client $badcmp $server {
307                     throw!(anyhow!("mismatch: client={:?} {} server={:?}",
308                                    $client, stringify!($badcmp), $server));
309                   }
310                 )?
311                 Ok::<_,AE>($ret)
312               })().context(stringify!($v))?;
313               dbg!(&$v);
314             }
315           }
316
317           meta!{
318             target_requests_outstanding, ( != ), client,
319             let server, client: u32 = meta.need_parse()?;
320           }
321
322           meta!{
323             http_timeout, ( > ), client,
324             let server, client = Duration::from_secs(meta.need_parse()?);
325           }
326
327           meta!{
328             mtu, ( != ), client,
329             let server, client: u32 = meta.parse()?.unwrap_or(server);
330           }
331
332           meta!{
333             max_batch_down, (), min(client, server),
334             let server, client: u32 = meta.parse()?.unwrap_or(server);
335           }
336
337           meta!{
338             max_batch_up, ( > ), client,
339             let server, client = meta.parse()?.unwrap_or(server);
340           }
341
342           while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
343             if comp.name != PartName::d {
344               warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
345             }
346             checkn(Mime2Slip, mtu, comp.payload, |header| {
347               let saddr = ip_packet_addr::<false>(header)?;
348               if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
349               let daddr = ip_packet_addr::<true>(header)?;
350               Ok(daddr)
351             }, |(daddr,packet)| route_packet(daddr,packet),
352                |e| { let _xxx = warnings.add(&e); }
353             )?;
354           }
355
356           let oi = OutstandingInner {
357             target_requests_outstanding,
358           };
359           Ok::<_,AE>(oi)
360         }.await {
361           Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
362           Err(e) => {
363             try_send_response(reply_to, WebResponse {
364               data: Err(e),
365               warnings,
366             });
367           },
368         }
369       }
370     }
371   }
372   //Err(anyhow!("xxx"))
373 }
374
375 #[tokio::main]
376 async fn main() {
377   let opts = Opts::from_args();
378   let mut tasks: Vec<(
379     JoinHandle<AE>,
380     String,
381   )> = vec![];
382
383   let (global, ipif) = config::startup(
384     "hippotatd", LinkEnd::Server,
385     &opts.config, &opts.log, |ics|
386   {
387     let global = config::InstanceConfigGlobal::from(&ics);
388     let ipif = Ipif::start(&global.ipif, None)?;
389
390     let all_clients: AllClients = ics.into_iter().map(|ic| {
391       let ic = Arc::new(ic);
392
393       let (web_send, web_recv) = mpsc::channel(
394         5 // xxx should me max_requests_outstanding but that's
395           // marked client-only so needs rework
396       );
397
398       let ic_ = ic.clone();
399       tasks.push((tokio::spawn(async move {
400         run_client(ic_, web_recv).await.void_unwrap_err()
401       }), format!("client {}", &ic)));
402
403       (ic.link.client,
404        ClientHandles {
405          ic,
406          web: web_send,
407        })
408     }).collect();
409     let all_clients = Arc::new(all_clients);
410
411     for addr in &global.addrs {
412       let all_clients_ = all_clients.clone();
413       let make_service = hyper::service::make_service_fn(move |_conn| {
414         let all_clients_ = all_clients_.clone();
415         async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
416           handle(all_clients_.clone(), req)
417         }) ) } }
418       );
419
420       let addr = SocketAddr::new(*addr, global.port);
421       let server = hyper::Server::try_bind(&addr)
422         .context("bind")?
423         .http1_preserve_header_case(true)
424         .serve(make_service);
425       info!("listening on {}", &addr);
426       let task = tokio::task::spawn(async move {
427         match server.await {
428           Ok(()) => anyhow!("shut down?!"),
429           Err(e) => e.into(),
430         }
431       });
432       tasks.push((task, format!("http server {}", addr)));
433     }
434     
435     Ok((global, ipif))
436   });
437
438   let died = future::select_all(
439     tasks.iter_mut().map(|e| &mut e.0)
440   ).await;
441   error!("xxx {:?}", &died);
442
443   ipif.quitting(None).await;
444
445   dbg!(global);
446 }