1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
5 use hippotat::prelude::*;
7 #[derive(StructOpt,Debug)]
16 const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
20 config: config::InstanceConfigGlobal,
21 all_clients: HashMap<ClientName, Client>,
26 ic: Arc<InstanceConfig>,
27 web: tokio::sync::mpsc::Sender<WebRequest>,
30 /// Sent from hyper worker pool task to client task
31 #[allow(dead_code)] // xxx
34 // initial part of body
35 // used up to and including first 2 lines of metadata
36 // end delimiter for the metadata not yet located, but in here somewhere
38 initial_remaining: usize,
40 body: hyper::body::Body,
41 boundary_finder: multipart::BoundaryFinder,
42 reply_to: tokio::sync::oneshot::Sender<WebResponse>,
47 /// Reply from client task to hyper worker pool task
48 #[allow(dead_code)] // xxx
52 data: Result<WebResponseData, AE>,
55 type WebResponseData = Vec<u8>;
57 #[throws(PacketError)]
58 pub fn route_packet(conn: &str, link: &dyn Display,
59 packet: Box<[u8]>, daddr: IpAddr)
62 trace!("{} {} discarding packet daddr={:?} len={}",
63 conn, link, daddr, packet.len());
69 req: hyper::Request<hyper::Body>
70 ) -> Result<hyper::Response<hyper::Body>, hyper::http::Error> {
71 if req.method() == Method::GET {
72 let mut resp = hyper::Response::new(hyper::Body::from("hippotat\r\n"));
73 resp.headers_mut().insert(
75 "text/plain; charset=US-ASCII".try_into().unwrap()
80 let mut warnings: Warnings = default();
84 let get_header = |hn: &str| {
85 let mut values = req.headers().get_all(hn).iter();
86 let v = values.next().ok_or_else(|| anyhow!("missing {}", hn))?;
87 if values.next().is_some() { throw!(anyhow!("multiple {}!", hn)); }
88 let v = v.to_str().context(anyhow!("interpret {} as UTF-8", hn))?;
92 let mkboundary = |b: &'_ _| format!("\n--{}", b).into_bytes();
93 let boundary = match (||{
94 let t = get_header("Content-Type")?;
95 let t: mime::Mime = t.parse().context("parse Content-Type")?;
96 if t.type_() != "multipart" { throw!(anyhow!("not multipart/")) }
97 let b = mime::BOUNDARY;
98 let b = t.get_param(b).ok_or_else(|| anyhow!("missing boundary=..."))?;
99 if t.subtype() != "form-data" {
100 warnings.add(&"Content-Type not /form-data")?;
102 let b = mkboundary(b.as_str());
107 warnings.add(&e.wrap_err("guessing boundary"))?;
112 let length_hint: usize = (||{
113 let clength = get_header("Content-Length")?;
114 let clength = clength.parse().context("parse Content-Length")?;
117 |e| { let _ = warnings.add(&e.wrap_err("parsing Content-Length")); 0 }
120 let mut body = req.into_body();
121 let initial = match read_limited_bytes(
122 METADATA_MAX_LEN, default(), length_hint, &mut body
125 Err(ReadLimitedError::Truncated { sofar,.. }) => sofar,
126 Err(ReadLimitedError::Hyper(e)) => throw!(e),
129 let boundary_finder = memmem::Finder::new(&boundary);
130 let mut boundary_iter = boundary_finder.find_iter(&initial);
132 let start = if initial.starts_with(&boundary[1..]) { boundary.len()-1 }
133 else if let Some(start) = boundary_iter.next() { start + boundary.len() }
134 else { throw!(anyhow!("initial boundary not found")) };
136 let comp = multipart::process_boundary
137 (&mut warnings, &initial[start..], PartName::m)?
138 .ok_or_else(|| anyhow!(r#"no "m" component"#))?;
140 if comp.name != PartName::m { throw!(anyhow!(
141 r#"first multipart component must be name="m""#
144 let mut meta = MetadataFieldIterator::new(comp.payload);
146 let client: ClientName = meta.need_parse().context("client addr")?;
148 let mut hmac_got = [0; HMAC_L];
149 let (client_time, hmac_got_l) = (||{
150 let token: &str = meta.need_next().context(r#"find in "m""#)?;
151 let (time_t, hmac_b64) = token.split_once(' ')
152 .ok_or_else(|| anyhow!("split"))?;
153 let time_t = u64::from_str_radix(time_t, 16).context("parse time_t")?;
155 &mut base64::read::DecoderReader::new(&mut hmac_b64.as_bytes(),
157 &mut &mut hmac_got[..]
158 ).context("parse b64 token")?;
159 let l = l.try_into()?;
160 Ok::<_,AE>((time_t, l))
161 })().context("token")?;
162 let hmac_got = &hmac_got[0..hmac_got_l];
164 let client_name = client;
165 let client = global.all_clients.get(&client_name);
167 // We attempt to hide whether the client exists we don't try to
168 // hide the hash lookup computationgs, but we do try to hide the
169 // HMAC computation by always doing it. We hope that the compiler
170 // doesn't produce a specialised implementation for the dummy
172 let client_exists = subtle::Choice::from(client.is_some() as u8);
173 let secret = client.map(|c| c.ic.secret.0.as_bytes());
174 let secret = secret.unwrap_or(&[0x55; HMAC_B][..]);
175 let client_time_s = format!("{:x}", client_time);
176 let hmac_exp = token_hmac(secret, client_time_s.as_bytes());
177 // We also definitely want a consttime memeq for the hmac value
178 let hmac_ok = hmac_got.ct_eq(&hmac_exp);
179 //dbg!(DumpHex(&hmac_exp), client.is_some());
180 //dbg!(DumpHex(hmac_got), hmac_ok, client_exists);
181 if ! bool::from(hmac_ok & client_exists) {
182 debug!("{} rejected client {}", &conn, &client_name);
183 let body = hyper::Body::from("Not authorised\r\n");
185 hyper::Response::builder()
186 .status(hyper::StatusCode::FORBIDDEN)
187 .header("Content-Type", r#"text/plain; charset="utf-8""#)
192 let client = client.unwrap();
193 let now = time_t_now();
194 let chk_skew = |a: u64, b: u64, c_ahead_behind| {
195 if let Some(a_ahead) = a.checked_sub(b) {
196 if a_ahead > client.ic.max_clock_skew.as_secs() {
197 throw!(anyhow!("too much clock skew (client {} by {})",
198 c_ahead_behind, a_ahead));
203 chk_skew(client_time, now, "ahead")?;
204 chk_skew(now, client_time, "behind")?;
206 let initial_remaining = meta.remaining_bytes_len();
208 //eprintln!("boundary={:?} start={} name={:?} client={}",
209 // boundary, start, &comp.name, &client.ic);
211 let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
212 trace!("{} {} request, Content-Length={}",
213 &conn, &client_name, length_hint);
214 let wreq = WebRequest {
218 boundary_finder: boundary_finder.into_owned(),
220 warnings: mem::take(&mut warnings),
225 client.web.try_send(wreq)
226 .map_err(|_| anyhow!("client task shut down!"))?;
228 let reply: WebResponse = reply_recv.await?;
229 warnings = reply.warnings;
230 let data = reply.data?;
232 if warnings.warnings.is_empty() {
233 trace!("{} {} responding, {}",
234 &conn, &client_name, data.len());
236 debug!("{} {} responding, {} warnings={:?}",
237 &conn, &client_name, data.len(),
241 let data = hyper::Body::from(data);
243 hyper::Response::builder()
244 .header("Content-Type", r#"application/octet-stream"#)
247 }.await.unwrap_or_else(|e| {
248 debug!("{} error {}", &conn, &e);
249 let mut errmsg = format!("ERROR\n\n{:?}\n\n", &e);
250 for w in warnings.warnings {
251 write!(errmsg, "warning: {}\n", w).unwrap();
253 hyper::Response::builder()
254 .status(hyper::StatusCode::BAD_REQUEST)
255 .header("Content-Type", r#"text/plain; charset="utf-8""#)
260 #[allow(unused_variables)] // xxx
261 #[allow(unused_mut)] // xxx
262 async fn run_client(ic: Arc<InstanceConfig>,
263 mut web: mpsc::Receiver<WebRequest>)
267 reply_to: tokio::sync::oneshot::Sender<WebResponse>,
268 oi: OutstandingInner,
271 struct OutstandingInner {
272 target_requests_outstanding: u32,
274 let mut outstanding: VecDeque<Outstanding> = default();
275 let downbound: VecDeque<(/*xxx*/)> = default();
277 let try_send_response = |
278 reply_to: tokio::sync::oneshot::Sender<WebResponse>,
279 response: WebResponse
281 reply_to.send(response)
282 .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
287 if ! downbound.is_empty() {
288 outstanding.pop_front()
289 } else if let Some((i,_)) = outstanding.iter().enumerate().find({
290 |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
292 Some(outstanding.remove(i).unwrap())
297 let response = WebResponse {
298 data: Ok(vec![ /* xxx */ ]),
302 try_send_response(ret.reply_to, response);
309 initial, initial_remaining, length_hint, mut body,
311 reply_to, conn, mut warnings,
312 } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
316 let initial_used = initial.len() - initial_remaining;
318 let whole_request = read_limited_bytes(
319 ic.max_batch_up.sat(),
323 ).await.context("read request body")?;
325 let (meta, mut comps) =
326 multipart::ComponentIterator::resume_mid_component(
327 &whole_request[initial_used..],
329 ).context("resume parsing body, after auth checks")?;
331 let mut meta = MetadataFieldIterator::new(&meta);
334 { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
335 let $server:ident, $client:ident $($code:tt)*
339 let $client $($code)*
341 if $client $badcmp $server {
342 throw!(anyhow!("mismatch: client={:?} {} server={:?}",
343 $client, stringify!($badcmp), $server));
347 })().context(stringify!($v))?;
353 target_requests_outstanding, ( != ), client,
354 let server, client: u32 = meta.need_parse()?;
358 http_timeout, ( > ), client,
359 let server, client = Duration::from_secs(meta.need_parse()?);
364 let server, client: u32 = meta.parse()?.unwrap_or(server);
368 max_batch_down, (), min(client, server),
369 let server, client: u32 = meta.parse()?.unwrap_or(server);
373 max_batch_up, ( > ), client,
374 let server, client = meta.parse()?.unwrap_or(server);
377 while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
378 if comp.name != PartName::d {
379 warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
381 checkn(Mime2Slip, mtu, comp.payload, |header| {
382 let saddr = ip_packet_addr::<false>(header)?;
383 if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
384 let daddr = ip_packet_addr::<true>(header)?;
386 }, |(daddr,packet)| route_packet(
387 &conn, &ic.link.client, daddr,packet
389 |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
393 let oi = OutstandingInner {
394 target_requests_outstanding,
398 Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
400 try_send_response(reply_to, WebResponse {
409 //Err(anyhow!("xxx"))
414 let opts = Opts::from_args();
420 let (global, ipif) = config::startup(
421 "hippotatd", LinkEnd::Server,
422 &opts.config, &opts.log, |ics|
424 let global_config = config::InstanceConfigGlobal::from(&ics);
426 let ipif = Ipif::start(&global_config.ipif, None)?;
428 let ics = ics.into_iter().map(Arc::new).collect_vec();
429 let (client_handles_send, client_handles_recv) = ics.iter()
430 .map(|_ic| mpsc::channel(
431 5 // xxx should me max_requests_outstanding but that's
432 // marked client-only so needs rework
433 )).unzip::<_,_,Vec<_>,Vec<_>>();
435 let all_clients = izip!(
438 ).map(|(ic, web_send)| {
446 for (ic, web_recv) in izip!(
450 let ic_ = ic.clone();
451 tasks.push((tokio::spawn(async move {
452 run_client(ic_, web_recv).await.void_unwrap_err()
453 }), format!("client {}", &ic)));
456 let global = Arc::new(Global {
457 config: global_config,
461 for addr in &global.config.addrs {
462 let global_ = global.clone();
463 let make_service = hyper::service::make_service_fn(
464 move |conn: &hyper::server::conn::AddrStream| {
465 let global_ = global_.clone();
466 let conn = Arc::new(format!("[{}]", conn.remote_addr()));
467 async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
468 handle(conn.clone(), global_.clone(), req)
473 let addr = SocketAddr::new(*addr, global.config.port);
474 let server = hyper::Server::try_bind(&addr)
476 .http1_preserve_header_case(true)
477 .serve(make_service);
478 info!("listening on {}", &addr);
479 let task = tokio::task::spawn(async move {
481 Ok(()) => anyhow!("shut down?!"),
485 tasks.push((task, format!("http server {}", addr)));
491 let died = future::select_all(
492 tasks.iter_mut().map(|e| &mut e.0)
494 error!("xxx {:?}", &died);
496 ipif.quitting(None).await;