chiark / gitweb /
wip server slip
[hippotat.git] / server / server.rs
index df34045d7b61d44f5cc0a0f33f5fb212aff48974..102e5e761504a37d9b6a44d636135f9ea69e2d70 100644 (file)
@@ -4,17 +4,24 @@
 
 use hippotat::prelude::*;
 
+mod slink;
+mod slocal;
+mod sweb;
+
+pub use sweb::{WebRequest, WebResponse};
+pub use slink::Client;
+
 #[derive(StructOpt,Debug)]
 pub struct Opts {
   #[structopt(flatten)]
-  log: LogOpts,
+  pub log: LogOpts,
 
   #[structopt(flatten)]
-  config: config::Opts,
+  pub config: config::Opts,
 }
 
-const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
-const INTERNAL_QUEUE: usize = 15; // xxx: config
+pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
+pub const INTERNAL_QUEUE: usize = 15; // xxx: config
 
 #[derive(Debug)]
 pub struct Global {
@@ -22,42 +29,8 @@ pub struct Global {
   all_clients: HashMap<ClientName, Client>,
 }
 
-#[derive(Debug)]
-pub struct Client {
-  ic: Arc<InstanceConfig>,
-  web: mpsc::Sender<WebRequest>,
-  route: mpsc::Sender<RoutedPacket>,
-}
-
 pub type RoutedPacket = Box<[u8]>; // not MIME data
 
-/// Sent from hyper worker pool task to client task
-#[allow(dead_code)] // xxx
-#[derive(Debug)]
-struct WebRequest {
-  // initial part of body
-  // used up to and including first 2 lines of metadata
-  // end delimiter for the metadata not yet located, but in here somewhere
-  initial: Box<[u8]>,
-  initial_remaining: usize,
-  length_hint: usize,
-  body: hyper::body::Body,
-  boundary_finder: multipart::BoundaryFinder,
-  reply_to: oneshot::Sender<WebResponse>,
-  warnings: Warnings,
-  conn: Arc<String>,
-}
-
-/// Reply from client task to hyper worker pool task
-#[allow(dead_code)] // xxx
-#[derive(Debug)]
-struct WebResponse {
-  warnings: Warnings,
-  data: Result<WebResponseData, AE>,
-}
-
-type WebResponseData = Vec<u8>;
-
 #[throws(PacketError)]
 pub async fn route_packet(global: &Global,
                           conn: &str, link: &(dyn Display + Sync),
@@ -91,354 +64,6 @@ pub async fn route_packet(global: &Global,
   }
 }
 
-async fn handle(
-  conn: Arc<String>,
-  global: Arc<Global>,
-  req: hyper::Request<hyper::Body>
-) -> Result<hyper::Response<hyper::Body>, hyper::http::Error> {
-  if req.method() == Method::GET {
-    let mut resp = hyper::Response::new(hyper::Body::from("hippotat\r\n"));
-    resp.headers_mut().insert(
-      "Content-Type",
-      "text/plain; charset=US-ASCII".try_into().unwrap()
-    );
-    return Ok(resp)
-  }
-
-  let mut warnings: Warnings = default();
-
-  async {
-
-    let get_header = |hn: &str| {
-      let mut values = req.headers().get_all(hn).iter();
-      let v = values.next().ok_or_else(|| anyhow!("missing {}", hn))?;
-      if values.next().is_some() { throw!(anyhow!("multiple {}!", hn)); }
-      let v = v.to_str().context(anyhow!("interpret {} as UTF-8", hn))?;
-      Ok::<_,AE>(v)
-    };
-
-    let mkboundary = |b: &'_ _| format!("\n--{}", b).into_bytes();
-    let boundary = match (||{
-      let t = get_header("Content-Type")?;
-      let t: mime::Mime = t.parse().context("parse Content-Type")?;
-      if t.type_() != "multipart" { throw!(anyhow!("not multipart/")) }
-      let b = mime::BOUNDARY;
-      let b = t.get_param(b).ok_or_else(|| anyhow!("missing boundary=..."))?;
-      if t.subtype() != "form-data" {
-        warnings.add(&"Content-Type not /form-data")?;
-      }
-      let b = mkboundary(b.as_str());
-      Ok::<_,AE>(b)
-    })() {
-      Ok(y) => y,
-      Err(e) => {
-        warnings.add(&e.wrap_err("guessing boundary"))?;
-        mkboundary("b")
-      },
-    };
-
-    let length_hint: usize = (||{
-      let clength = get_header("Content-Length")?;
-      let clength = clength.parse().context("parse Content-Length")?;
-      Ok::<_,AE>(clength)
-    })().unwrap_or_else(
-      |e| { let _ = warnings.add(&e.wrap_err("parsing Content-Length")); 0 }
-    );
-
-    let mut body = req.into_body();
-    let initial = match read_limited_bytes(
-      METADATA_MAX_LEN, default(), length_hint, &mut body
-    ).await {
-      Ok(all) => all,
-      Err(ReadLimitedError::Truncated { sofar,.. }) => sofar,
-      Err(ReadLimitedError::Hyper(e)) => throw!(e),
-    };
-
-    let boundary_finder = memmem::Finder::new(&boundary);
-    let mut boundary_iter = boundary_finder.find_iter(&initial);
-
-    let start = if initial.starts_with(&boundary[1..]) { boundary.len()-1 }
-    else if let Some(start) = boundary_iter.next() { start + boundary.len() }
-    else { throw!(anyhow!("initial boundary not found")) };
-
-    let comp = multipart::process_boundary
-      (&mut warnings, &initial[start..], PartName::m)?
-      .ok_or_else(|| anyhow!(r#"no "m" component"#))?;
-
-    if comp.name != PartName::m { throw!(anyhow!(
-      r#"first multipart component must be name="m""#
-    )) }
-
-    let mut meta = MetadataFieldIterator::new(comp.payload);
-
-    let client: ClientName = meta.need_parse().context("client addr")?;
-
-    let mut hmac_got = [0; HMAC_L];
-    let (client_time, hmac_got_l) = (||{
-      let token: &str = meta.need_next().context(r#"find in "m""#)?;
-      let (time_t, hmac_b64) = token.split_once(' ')
-        .ok_or_else(|| anyhow!("split"))?;
-      let time_t = u64::from_str_radix(time_t, 16).context("parse time_t")?;
-      let l = io::copy(
-        &mut base64::read::DecoderReader::new(&mut hmac_b64.as_bytes(),
-                                              BASE64_CONFIG),
-        &mut &mut hmac_got[..]
-      ).context("parse b64 token")?;
-      let l = l.try_into()?;
-      Ok::<_,AE>((time_t, l))
-    })().context("token")?;
-    let hmac_got = &hmac_got[0..hmac_got_l];
-
-    let client_name = client;
-    let client = global.all_clients.get(&client_name);
-
-    // We attempt to hide whether the client exists we don't try to
-    // hide the hash lookup computationgs, but we do try to hide the
-    // HMAC computation by always doing it.  We hope that the compiler
-    // doesn't produce a specialised implementation for the dummy
-    // secret value.
-    let client_exists = subtle::Choice::from(client.is_some() as u8);
-    let secret = client.map(|c| c.ic.secret.0.as_bytes());
-    let secret = secret.unwrap_or(&[0x55; HMAC_B][..]);
-    let client_time_s = format!("{:x}", client_time);
-    let hmac_exp = token_hmac(secret, client_time_s.as_bytes());
-    // We also definitely want a consttime memeq for the hmac value
-    let hmac_ok = hmac_got.ct_eq(&hmac_exp);
-    //dbg!(DumpHex(&hmac_exp), client.is_some());
-    //dbg!(DumpHex(hmac_got), hmac_ok, client_exists);
-    if ! bool::from(hmac_ok & client_exists) {
-      debug!("{} rejected client {}", &conn, &client_name);
-      let body = hyper::Body::from("Not authorised\r\n");
-      return Ok(
-        hyper::Response::builder()
-          .status(hyper::StatusCode::FORBIDDEN)
-          .header("Content-Type", r#"text/plain; charset="utf-8""#)
-          .body(body)
-      )
-    }
-
-    let client = client.unwrap();
-    let now = time_t_now();
-    let chk_skew = |a: u64, b: u64, c_ahead_behind| {
-      if let Some(a_ahead) = a.checked_sub(b) {
-        if a_ahead > client.ic.max_clock_skew.as_secs() {
-          throw!(anyhow!("too much clock skew (client {} by {})",
-                         c_ahead_behind, a_ahead));
-        }
-      }
-      Ok::<_,AE>(())
-    };
-    chk_skew(client_time, now, "ahead")?;
-    chk_skew(now, client_time, "behind")?;
-
-    let initial_remaining = meta.remaining_bytes_len();
-
-    //eprintln!("boundary={:?} start={} name={:?} client={}",
-    // boundary, start, &comp.name, &client.ic);
-
-    let (reply_to, reply_recv) = oneshot::channel();
-    trace!("{} {} request, Content-Length={}",
-           &conn, &client_name, length_hint);
-    let wreq = WebRequest {
-      initial,
-      initial_remaining,
-      length_hint,
-      boundary_finder: boundary_finder.into_owned(),
-      body,
-      warnings: mem::take(&mut warnings),
-      reply_to,
-      conn: conn.clone(),
-    };
-
-    client.web.try_send(wreq)
-      .map_err(|_| anyhow!("client task shut down!"))?;
-
-    let reply: WebResponse = reply_recv.await?;
-    warnings = reply.warnings;
-    let data = reply.data?;
-
-    if warnings.warnings.is_empty() {
-      trace!("{} {} responding, {}",
-             &conn, &client_name, data.len());
-    } else {
-      debug!("{} {} responding, {} warnings={:?}",
-             &conn, &client_name, data.len(),
-             &warnings.warnings);
-    }
-
-    let data = hyper::Body::from(data);
-    Ok::<_,AE>(
-      hyper::Response::builder()
-        .header("Content-Type", r#"application/octet-stream"#)
-        .body(data)
-    )
-  }.await.unwrap_or_else(|e| {
-    debug!("{} error {}", &conn, &e);
-    let mut errmsg = format!("ERROR\n\n{:?}\n\n", &e);
-    for w in warnings.warnings {
-      write!(errmsg, "warning: {}\n", w).unwrap();
-    }
-    hyper::Response::builder()
-      .status(hyper::StatusCode::BAD_REQUEST)
-      .header("Content-Type", r#"text/plain; charset="utf-8""#)
-      .body(errmsg.into())
-  })
-}
-
-#[allow(unused_variables)] // xxx
-#[allow(unused_mut)] // xxx
-async fn run_client(global: Arc<Global>,
-                    ic: Arc<InstanceConfig>,
-                    mut web: mpsc::Receiver<WebRequest>,
-                    mut routed: mpsc::Receiver<RoutedPacket>)
-                    -> Result<Void, AE>
-{
-  struct Outstanding {
-    reply_to: oneshot::Sender<WebResponse>,
-    oi: OutstandingInner,
-  }
-  #[derive(Debug)]
-  struct OutstandingInner {
-    target_requests_outstanding: u32,
-  }
-  let mut outstanding: VecDeque<Outstanding> = default();
-  let  downbound: VecDeque<(/*xxx*/)> = default();
-
-  let try_send_response = |
-    reply_to: oneshot::Sender<WebResponse>,
-    response: WebResponse
-  | {
-    reply_to.send(response)
-      .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */);
-  };
-
-  loop {
-    if let Some(ret) = {
-      if ! downbound.is_empty() {
-        outstanding.pop_front()
-      } else if let Some((i,_)) = outstanding.iter().enumerate().find({
-        |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat()
-      }) {
-        Some(outstanding.remove(i).unwrap())
-      } else {
-        None
-      }
-    } {
-      let response = WebResponse {
-        data: Ok(vec![ /* xxx */ ]),
-        warnings: default(),
-      };
-
-      try_send_response(ret.reply_to, response);
-    }
-
-    select!{
-      req = web.recv() =>
-      {
-        let WebRequest {
-          initial, initial_remaining, length_hint, mut body,
-          boundary_finder,
-          reply_to, conn, mut warnings,
-        } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;
-
-        match async {
-
-          let initial_used = initial.len() - initial_remaining;
-
-          let whole_request = read_limited_bytes(
-            ic.max_batch_up.sat(),
-            initial,
-            length_hint,
-            &mut body
-          ).await.context("read request body")?;
-
-          let (meta, mut comps) =
-            multipart::ComponentIterator::resume_mid_component(
-              &whole_request[initial_used..],
-              boundary_finder
-            ).context("resume parsing body, after auth checks")?;
-
-          let mut meta = MetadataFieldIterator::new(&meta);
-
-          macro_rules! meta {
-            { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
-              let $server:ident, $client:ident $($code:tt)*
-            } => {
-              let $v = (||{
-                let $server = ic.$v;
-                let $client $($code)*
-                $(
-                  if $client $badcmp $server {
-                    throw!(anyhow!("mismatch: client={:?} {} server={:?}",
-                                   $client, stringify!($badcmp), $server));
-                  }
-                )?
-                Ok::<_,AE>($ret)
-              })().context(stringify!($v))?;
-              //dbg!(&$v);
-            }
-          }
-
-          meta!{
-            target_requests_outstanding, ( != ), client,
-            let server, client: u32 = meta.need_parse()?;
-          }
-
-          meta!{
-            http_timeout, ( > ), client,
-            let server, client = Duration::from_secs(meta.need_parse()?);
-          }
-
-          meta!{
-            mtu, ( != ), client,
-            let server, client: u32 = meta.parse()?.unwrap_or(server);
-          }
-
-          meta!{
-            max_batch_down, (), min(client, server),
-            let server, client: u32 = meta.parse()?.unwrap_or(server);
-          }
-
-          meta!{
-            max_batch_up, ( > ), client,
-            let server, client = meta.parse()?.unwrap_or(server);
-          }
-
-          while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
-            if comp.name != PartName::d {
-              warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
-            }
-            checkn(Mime2Slip, mtu, comp.payload, |header| {
-              let saddr = ip_packet_addr::<false>(header)?;
-              if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
-              let daddr = ip_packet_addr::<true>(header)?;
-              Ok(daddr)
-            }, |(daddr,packet)| route_packet(
-              &global, &conn, &ic.link.client, daddr,packet
-            ),
-              |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
-            ).await?;
-          }
-
-          let oi = OutstandingInner {
-            target_requests_outstanding,
-          };
-          Ok::<_,AE>(oi)
-        }.await {
-          Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
-          Err(e) => {
-            try_send_response(reply_to, WebResponse {
-              data: Err(e),
-              warnings,
-            });
-          },
-        }
-      }
-    }
-  }
-  //Err(anyhow!("xxx"))
-}
-
 #[tokio::main]
 async fn main() {
   let opts = Opts::from_args();
@@ -492,7 +117,7 @@ async fn main() {
       let global_ = global.clone();
       let ic_ = ic.clone();
       tasks.push((tokio::spawn(async move {
-        run_client(global_, ic_, web_recv, route_recv)
+        slink::run(global_, ic_, web_recv, route_recv)
           .await.void_unwrap_err()
       }), format!("client {}", &ic)));
     }
@@ -504,7 +129,7 @@ async fn main() {
           let global_ = global_.clone();
           let conn = Arc::new(format!("[{}]", conn.remote_addr()));
           async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
-            handle(conn.clone(), global_.clone(), req)
+            sweb::handle(conn.clone(), global_.clone(), req)
           }) ) }
         }
       );