chiark / gitweb /
read_limited_bytes: take a `capacity` argument
[hippotat.git] / src / bin / server.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 #[derive(StructOpt,Debug)]
8 pub struct Opts {
9   #[structopt(flatten)]
10   log: LogOpts,
11
12   #[structopt(flatten)]
13   config: config::Opts,
14 }
15
16 const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
17
18 type AllClients = HashMap<ClientName, ClientHandles>;
19
20 struct ClientHandles {
21   ic: Arc<InstanceConfig>,
22   web: tokio::sync::mpsc::Sender<WebRequest>,
23 }
24
25 /// Sent from hyper worker pool task to client task
26 #[allow(dead_code)] // xxx
27 struct WebRequest {
28   // initial part of body
29   // used up to and including first 2 lines of metadata
30   // end delimiter for the metadata not yet located, but in here somewhere
31   initial: Box<[u8]>,
32   initial_remaining: usize,
33   body: hyper::body::Body,
34   reply_to: tokio::sync::oneshot::Sender<WebResponse>,
35   warnings: Warnings,
36 }
37
38 /// Reply from client task to hyper worker pool task
39 #[allow(dead_code)] // xxx
40 struct WebResponse {
41   warnings: Warnings,
42   data: Result<WebResponseData, AE>,
43 }
44
45 type WebResponseData = ();
46
47 async fn handle(
48   all_clients: Arc<AllClients>,
49   req: hyper::Request<hyper::Body>
50 ) -> Result<hyper::Response<hyper::Body>, Void> {
51   if req.method() == Method::GET {
52     let mut resp = hyper::Response::new(hyper::Body::from("hippotat\r\n"));
53     resp.headers_mut().insert(
54       "Content-Type",
55       "text/plain; charset=US-ASCII".try_into().unwrap()
56     );
57     return Ok(resp)
58   }
59
60   let mut warnings: Warnings = default();
61
62   match async {
63
64     let mkboundary = |b: &'_ _| format!("\n--{}", b).into_bytes();
65     let boundary = match (||{
66       let mut ctypes = req.headers().get_all("Content-Type").iter();
67       let t = ctypes.next().ok_or_else(|| anyhow!("missing Content-Type"))?;
68       if ctypes.next().is_some() { throw!(anyhow!("several Content-Type")) }
69       let t = t.to_str().context("interpret Content-Type as utf-8")?;
70       let t: mime::Mime = t.parse().context("parse Content-Type")?;
71       if t.type_() != "multipart" { throw!(anyhow!("not multipart/")) }
72       let b = mime::BOUNDARY;
73       let b = t.get_param(b).ok_or_else(|| anyhow!("missing boundary=..."))?;
74       if t.subtype() != "form-data" {
75         warnings.add(&"Content-Type not /form-data")?;
76       }
77       let b = mkboundary(b.as_str());
78       Ok::<_,AE>(b)
79     })() {
80       Ok(y) => y,
81       Err(e) => {
82         warnings.add(&e.wrap_err("guessing boundary"))?;
83         mkboundary("b")
84       },
85     };
86
87     let mut body = req.into_body();
88     let initial = match read_limited_bytes(
89       METADATA_MAX_LEN, default(), default(), &mut body
90     ).await {
91       Ok(all) => all,
92       Err(ReadLimitedError::Truncated { sofar,.. }) => sofar,
93       Err(ReadLimitedError::Hyper(e)) => throw!(e),
94     };
95
96     let finder = memmem::Finder::new(&boundary);
97     let mut find_iter = finder.find_iter(&initial);
98
99     let start = if initial.starts_with(&boundary[1..]) { boundary.len()-1 }
100     else if let Some(start) = find_iter.next() { start + boundary.len() }
101     else { throw!(anyhow!("initial boundary not found")) };
102
103     let comp = multipart::process_component
104       (&mut warnings, &initial[start..], PartName::m)?
105       .ok_or_else(|| anyhow!(r#"no "m" component"#))?;
106
107     if comp.name != PartName::m { throw!(anyhow!(
108       r#"first multipart component must be name="m""#
109     )) }
110
111     let mut meta = MetadataFieldIterator::new(comp.payload_start);
112
113     let client: ClientName = meta.need_parse().context("client addr")?;
114
115     let mut hmac_got = [0; HMAC_L];
116     let (client_time, hmac_got_l) = (||{
117       let token: &str = meta.need_next().context(r#"find in "m""#)?;
118       let (time_t, hmac_b64) = token.split_once(' ')
119         .ok_or_else(|| anyhow!("split"))?;
120       let time_t = u64::from_str_radix(time_t, 16).context("parse time_t")?;
121       let l = io::copy(
122         &mut base64::read::DecoderReader::new(&mut hmac_b64.as_bytes(),
123                                               BASE64_CONFIG),
124         &mut &mut hmac_got[..]
125       ).context("parse b64 token")?;
126       let l = l.try_into()?;
127       Ok::<_,AE>((time_t, l))
128     })().context("token")?;
129     let hmac_got = &hmac_got[0..hmac_got_l];
130
131     let client = all_clients.get(&client);
132
133     // We attempt to hide whether the client exists we don't try to
134     // hide the hash lookup computationgs, but we do try to hide the
135     // HMAC computation by always doing it.  We hope that the compiler
136     // doesn't produce a specialised implementation for the dummy
137     // secret value.
138     let client_exists = subtle::Choice::from(client.is_some() as u8);
139     let secret = client.map(|c| c.ic.secret.0.as_bytes());
140     let secret = secret.unwrap_or(&[0x55; HMAC_B][..]);
141     let client_time_s = format!("{:x}", client_time);
142     let hmac_exp = token_hmac(secret, client_time_s.as_bytes());
143     // We also definitely want a consttime memeq for the hmac value
144     let hmac_ok = hmac_got.ct_eq(&hmac_exp);
145     //dbg!(DumpHex(&hmac_exp), client.is_some());
146     //dbg!(DumpHex(hmac_got), hmac_ok, client_exists);
147     if ! bool::from(hmac_ok & client_exists) {
148       throw!(anyhow!("xxx should be a 403 error"));
149     }
150
151     let client = client.unwrap();
152     let now = time_t_now();
153     let chk_skew = |a: u64, b: u64, c_ahead_behind| {
154       if let Some(a_ahead) = a.checked_sub(b) {
155         if a_ahead > client.ic.max_clock_skew.as_secs() {
156           throw!(anyhow!("too much clock skew (client {} by {})",
157                          c_ahead_behind, a_ahead));
158         }
159       }
160       Ok::<_,AE>(())
161     };
162     chk_skew(client_time, now, "ahead")?;
163     chk_skew(now, client_time, "behind")?;
164
165     let initial_remaining = meta.remaining_bytes_len();
166
167     //eprintln!("boundary={:?} start={} name={:?} client={}",
168     // boundary, start, &comp.name, &client.ic);
169
170     let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
171     let wreq = WebRequest {
172       initial,
173       initial_remaining,
174       body,
175       warnings: mem::take(&mut warnings),
176       reply_to
177     };
178     trace!("{} request", &client.ic);
179
180     client.web.try_send(wreq)
181       .map_err(|_| anyhow!("client task shut down!"))?;
182
183     let reply: WebResponse = reply_recv.await?;
184     warnings = reply.warnings;
185
186     reply.data
187   }.await {
188     Ok(()) => {
189     },
190     Err(e) => {
191       eprintln!("error={}", e);
192     }
193   }
194
195   eprintln!("warnings={:?}", &warnings);
196
197   Ok(hyper::Response::new(hyper::Body::from("Hello World")))
198 }
199
200 #[allow(unused_variables)] // xxx
201 async fn run_client(_ic: Arc<InstanceConfig>,
202                     mut web: mpsc::Receiver<WebRequest>)
203                     -> Result<Void, AE>
204 {
205   struct Outstanding {
206     reply_to: tokio::sync::oneshot::Sender<WebResponse>,
207     max_requests_outstanding: u32,
208   }
209   let mut outstanding: VecDeque<Outstanding> = default();
210   let  downbound: VecDeque<(/*xxx*/)> = default();
211
212   let try_send_response = |
213     reply_to: tokio::sync::oneshot::Sender<WebResponse>,
214     response: WebResponse
215   | {
216     reply_to.send(response)
217       .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
218   };
219
220   loop {
221     if let Some(ret) = {
222       if ! downbound.is_empty() {
223         outstanding.pop_front()
224       } else if let Some((i,_)) = outstanding.iter().enumerate().find({
225         |(_,o)| outstanding.len() > o.max_requests_outstanding.sat()
226       }) {
227         Some(outstanding.remove(i).unwrap())
228       } else {
229         None
230       }
231     } {
232       let response = WebResponse {
233         data: Ok(()),
234         warnings: default(),
235       };
236
237       try_send_response(ret.reply_to, response);
238     }
239
240     select!{
241       req = web.recv() =>
242       {
243         let WebRequest {
244           initial, initial_remaining, body,
245           reply_to, warnings,
246         } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
247
248         match async {
249
250           // xxx size limit
251
252           let whole_request = body.try_fold(
253             initial.into_vec(),
254             |mut w, by| async move { w.extend_from_slice(&by); Ok(w) },
255           ).await.context("read request body")?;
256
257           dbg!(whole_request.len());
258
259 /*          
260
261           multipart::ComponentIterator::resume_mid_component(
262             &initial[initial_remaining..],
263   */          
264
265           Ok::<_,AE>(())
266         }.await {
267           Ok(()) => outstanding.push_back(Outstanding {
268             reply_to: reply_to,
269             max_requests_outstanding: 42, // xxx
270           }),
271           Err(e) => {
272             try_send_response(reply_to, WebResponse {
273               data: Err(e),
274               warnings,
275             });
276           },
277         }
278       }
279     }
280   }
281   //Err(anyhow!("xxx"))
282 }
283
284 #[tokio::main]
285 async fn main() {
286   let opts = Opts::from_args();
287   let mut tasks: Vec<(
288     JoinHandle<AE>,
289     String,
290   )> = vec![];
291
292   let (global, ipif) = config::startup(
293     "hippotatd", LinkEnd::Server,
294     &opts.config, &opts.log, |ics|
295   {
296     let global = config::InstanceConfigGlobal::from(&ics);
297     let ipif = Ipif::start(&global.ipif, None)?;
298
299     let all_clients: AllClients = ics.into_iter().map(|ic| {
300       let ic = Arc::new(ic);
301
302       let (web_send, web_recv) = mpsc::channel(
303         5 // xxx should me max_requests_outstanding but that's
304           // marked client-only so needs rework
305       );
306
307       let ic_ = ic.clone();
308       tasks.push((tokio::spawn(async move {
309         run_client(ic_, web_recv).await.void_unwrap_err()
310       }), format!("client {}", &ic)));
311
312       (ic.link.client,
313        ClientHandles {
314          ic,
315          web: web_send,
316        })
317     }).collect();
318     let all_clients = Arc::new(all_clients);
319
320     for addr in &global.addrs {
321       let all_clients_ = all_clients.clone();
322       let make_service = hyper::service::make_service_fn(move |_conn| {
323         let all_clients_ = all_clients_.clone();
324         async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
325           handle(all_clients_.clone(), req)
326         }) ) } }
327       );
328
329       let addr = SocketAddr::new(*addr, global.port);
330       let server = hyper::Server::try_bind(&addr)
331         .context("bind")?
332         .http1_preserve_header_case(true)
333         .serve(make_service);
334       info!("listening on {}", &addr);
335       let task = tokio::task::spawn(async move {
336         match server.await {
337           Ok(()) => anyhow!("shut down?!"),
338           Err(e) => e.into(),
339         }
340       });
341       tasks.push((task, format!("http server {}", addr)));
342     }
343     
344     Ok((global, ipif))
345   });
346
347   let died = future::select_all(
348     tasks.iter_mut().map(|e| &mut e.0)
349   ).await;
350   error!("xxx {:?}", &died);
351
352   ipif.quitting(None).await;
353
354   dbg!(global);
355 }