From 314e555f03793b650fac5142e63758a8c6c55302 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Sat, 21 Aug 2021 21:29:05 +0100 Subject: [PATCH] reorg source Signed-off-by: Ian Jackson --- server/sclient.rs | 166 +++++++++++++++++++ server/server.rs | 400 ++-------------------------------------------- server/sweb.rs | 226 ++++++++++++++++++++++++++ 3 files changed, 404 insertions(+), 388 deletions(-) create mode 100644 server/sclient.rs create mode 100644 server/sweb.rs diff --git a/server/sclient.rs b/server/sclient.rs new file mode 100644 index 0000000..d246384 --- /dev/null +++ b/server/sclient.rs @@ -0,0 +1,166 @@ +// Copyright 2021 Ian Jackson and contributors to Hippotat +// SPDX-License-Identifier: GPL-3.0-or-later +// There is NO WARRANTY. + +use super::*; + +#[derive(Debug)] +pub struct Client { + pub ic: Arc, + pub web: mpsc::Sender, + pub route: mpsc::Sender, +} + +#[allow(unused_variables)] // xxx +#[allow(unused_mut)] // xxx +pub async fn run(global: Arc, + ic: Arc, + mut web: mpsc::Receiver, + mut routed: mpsc::Receiver) + -> Result +{ + struct Outstanding { + reply_to: oneshot::Sender, + oi: OutstandingInner, + } + #[derive(Debug)] + struct OutstandingInner { + target_requests_outstanding: u32, + } + let mut outstanding: VecDeque = default(); + let downbound: VecDeque<(/*xxx*/)> = default(); + + let try_send_response = | + reply_to: oneshot::Sender, + response: WebResponse + | { + reply_to.send(response) + .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */); + }; + + loop { + if let Some(ret) = { + if ! downbound.is_empty() { + outstanding.pop_front() + } else if let Some((i,_)) = outstanding.iter().enumerate().find({ + |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat() + }) { + Some(outstanding.remove(i).unwrap()) + } else { + None + } + } { + let response = WebResponse { + data: Ok(vec![ /* xxx */ ]), + warnings: default(), + }; + + try_send_response(ret.reply_to, response); + } + + select!{ + req = web.recv() => + { + let WebRequest { + initial, initial_remaining, length_hint, mut body, + boundary_finder, + reply_to, conn, mut warnings, + } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?; + + match async { + + let initial_used = initial.len() - initial_remaining; + + let whole_request = read_limited_bytes( + ic.max_batch_up.sat(), + initial, + length_hint, + &mut body + ).await.context("read request body")?; + + let (meta, mut comps) = + multipart::ComponentIterator::resume_mid_component( + &whole_request[initial_used..], + boundary_finder + ).context("resume parsing body, after auth checks")?; + + let mut meta = MetadataFieldIterator::new(&meta); + + macro_rules! meta { + { $v:ident, ( $( $badcmp:tt )? ), $ret:expr, + let $server:ident, $client:ident $($code:tt)* + } => { + let $v = (||{ + let $server = ic.$v; + let $client $($code)* + $( + if $client $badcmp $server { + throw!(anyhow!("mismatch: client={:?} {} server={:?}", + $client, stringify!($badcmp), $server)); + } + )? + Ok::<_,AE>($ret) + })().context(stringify!($v))?; + //dbg!(&$v); + } + } + + meta!{ + target_requests_outstanding, ( != ), client, + let server, client: u32 = meta.need_parse()?; + } + + meta!{ + http_timeout, ( > ), client, + let server, client = Duration::from_secs(meta.need_parse()?); + } + + meta!{ + mtu, ( != ), client, + let server, client: u32 = meta.parse()?.unwrap_or(server); + } + + meta!{ + max_batch_down, (), min(client, server), + let server, client: u32 = meta.parse()?.unwrap_or(server); + } + + meta!{ + max_batch_up, ( > ), client, + let server, client = meta.parse()?.unwrap_or(server); + } + + while let Some(comp) = comps.next(&mut warnings, PartName::d)? { + if comp.name != PartName::d { + warnings.add(&format_args!("unexpected part {:?}", comp.name))?; + } + checkn(Mime2Slip, mtu, comp.payload, |header| { + let saddr = ip_packet_addr::(header)?; + if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) } + let daddr = ip_packet_addr::(header)?; + Ok(daddr) + }, |(daddr,packet)| route_packet( + &global, &conn, &ic.link.client, daddr,packet + ), + |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; }) + ).await?; + } + + let oi = OutstandingInner { + target_requests_outstanding, + }; + Ok::<_,AE>(oi) + }.await { + Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }), + Err(e) => { + try_send_response(reply_to, WebResponse { + data: Err(e), + warnings, + }); + }, + } + } + } + } + //Err(anyhow!("xxx")) +} diff --git a/server/server.rs b/server/server.rs index df34045..022d00b 100644 --- a/server/server.rs +++ b/server/server.rs @@ -4,17 +4,23 @@ use hippotat::prelude::*; +mod sclient; +mod sweb; + +pub use sweb::{WebRequest, WebResponse}; +pub use sclient::Client; + #[derive(StructOpt,Debug)] pub struct Opts { #[structopt(flatten)] - log: LogOpts, + pub log: LogOpts, #[structopt(flatten)] - config: config::Opts, + pub config: config::Opts, } -const METADATA_MAX_LEN: usize = MAX_OVERHEAD; -const INTERNAL_QUEUE: usize = 15; // xxx: config +pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD; +pub const INTERNAL_QUEUE: usize = 15; // xxx: config #[derive(Debug)] pub struct Global { @@ -22,42 +28,8 @@ pub struct Global { all_clients: HashMap, } -#[derive(Debug)] -pub struct Client { - ic: Arc, - web: mpsc::Sender, - route: mpsc::Sender, -} - pub type RoutedPacket = Box<[u8]>; // not MIME data -/// Sent from hyper worker pool task to client task -#[allow(dead_code)] // xxx -#[derive(Debug)] -struct WebRequest { - // initial part of body - // used up to and including first 2 lines of metadata - // end delimiter for the metadata not yet located, but in here somewhere - initial: Box<[u8]>, - initial_remaining: usize, - length_hint: usize, - body: hyper::body::Body, - boundary_finder: multipart::BoundaryFinder, - reply_to: oneshot::Sender, - warnings: Warnings, - conn: Arc, -} - -/// Reply from client task to hyper worker pool task -#[allow(dead_code)] // xxx -#[derive(Debug)] -struct WebResponse { - warnings: Warnings, - data: Result, -} - -type WebResponseData = Vec; - #[throws(PacketError)] pub async fn route_packet(global: &Global, conn: &str, link: &(dyn Display + Sync), @@ -91,354 +63,6 @@ pub async fn route_packet(global: &Global, } } -async fn handle( - conn: Arc, - global: Arc, - req: hyper::Request -) -> Result, hyper::http::Error> { - if req.method() == Method::GET { - let mut resp = hyper::Response::new(hyper::Body::from("hippotat\r\n")); - resp.headers_mut().insert( - "Content-Type", - "text/plain; charset=US-ASCII".try_into().unwrap() - ); - return Ok(resp) - } - - let mut warnings: Warnings = default(); - - async { - - let get_header = |hn: &str| { - let mut values = req.headers().get_all(hn).iter(); - let v = values.next().ok_or_else(|| anyhow!("missing {}", hn))?; - if values.next().is_some() { throw!(anyhow!("multiple {}!", hn)); } - let v = v.to_str().context(anyhow!("interpret {} as UTF-8", hn))?; - Ok::<_,AE>(v) - }; - - let mkboundary = |b: &'_ _| format!("\n--{}", b).into_bytes(); - let boundary = match (||{ - let t = get_header("Content-Type")?; - let t: mime::Mime = t.parse().context("parse Content-Type")?; - if t.type_() != "multipart" { throw!(anyhow!("not multipart/")) } - let b = mime::BOUNDARY; - let b = t.get_param(b).ok_or_else(|| anyhow!("missing boundary=..."))?; - if t.subtype() != "form-data" { - warnings.add(&"Content-Type not /form-data")?; - } - let b = mkboundary(b.as_str()); - Ok::<_,AE>(b) - })() { - Ok(y) => y, - Err(e) => { - warnings.add(&e.wrap_err("guessing boundary"))?; - mkboundary("b") - }, - }; - - let length_hint: usize = (||{ - let clength = get_header("Content-Length")?; - let clength = clength.parse().context("parse Content-Length")?; - Ok::<_,AE>(clength) - })().unwrap_or_else( - |e| { let _ = warnings.add(&e.wrap_err("parsing Content-Length")); 0 } - ); - - let mut body = req.into_body(); - let initial = match read_limited_bytes( - METADATA_MAX_LEN, default(), length_hint, &mut body - ).await { - Ok(all) => all, - Err(ReadLimitedError::Truncated { sofar,.. }) => sofar, - Err(ReadLimitedError::Hyper(e)) => throw!(e), - }; - - let boundary_finder = memmem::Finder::new(&boundary); - let mut boundary_iter = boundary_finder.find_iter(&initial); - - let start = if initial.starts_with(&boundary[1..]) { boundary.len()-1 } - else if let Some(start) = boundary_iter.next() { start + boundary.len() } - else { throw!(anyhow!("initial boundary not found")) }; - - let comp = multipart::process_boundary - (&mut warnings, &initial[start..], PartName::m)? - .ok_or_else(|| anyhow!(r#"no "m" component"#))?; - - if comp.name != PartName::m { throw!(anyhow!( - r#"first multipart component must be name="m""# - )) } - - let mut meta = MetadataFieldIterator::new(comp.payload); - - let client: ClientName = meta.need_parse().context("client addr")?; - - let mut hmac_got = [0; HMAC_L]; - let (client_time, hmac_got_l) = (||{ - let token: &str = meta.need_next().context(r#"find in "m""#)?; - let (time_t, hmac_b64) = token.split_once(' ') - .ok_or_else(|| anyhow!("split"))?; - let time_t = u64::from_str_radix(time_t, 16).context("parse time_t")?; - let l = io::copy( - &mut base64::read::DecoderReader::new(&mut hmac_b64.as_bytes(), - BASE64_CONFIG), - &mut &mut hmac_got[..] - ).context("parse b64 token")?; - let l = l.try_into()?; - Ok::<_,AE>((time_t, l)) - })().context("token")?; - let hmac_got = &hmac_got[0..hmac_got_l]; - - let client_name = client; - let client = global.all_clients.get(&client_name); - - // We attempt to hide whether the client exists we don't try to - // hide the hash lookup computationgs, but we do try to hide the - // HMAC computation by always doing it. We hope that the compiler - // doesn't produce a specialised implementation for the dummy - // secret value. - let client_exists = subtle::Choice::from(client.is_some() as u8); - let secret = client.map(|c| c.ic.secret.0.as_bytes()); - let secret = secret.unwrap_or(&[0x55; HMAC_B][..]); - let client_time_s = format!("{:x}", client_time); - let hmac_exp = token_hmac(secret, client_time_s.as_bytes()); - // We also definitely want a consttime memeq for the hmac value - let hmac_ok = hmac_got.ct_eq(&hmac_exp); - //dbg!(DumpHex(&hmac_exp), client.is_some()); - //dbg!(DumpHex(hmac_got), hmac_ok, client_exists); - if ! bool::from(hmac_ok & client_exists) { - debug!("{} rejected client {}", &conn, &client_name); - let body = hyper::Body::from("Not authorised\r\n"); - return Ok( - hyper::Response::builder() - .status(hyper::StatusCode::FORBIDDEN) - .header("Content-Type", r#"text/plain; charset="utf-8""#) - .body(body) - ) - } - - let client = client.unwrap(); - let now = time_t_now(); - let chk_skew = |a: u64, b: u64, c_ahead_behind| { - if let Some(a_ahead) = a.checked_sub(b) { - if a_ahead > client.ic.max_clock_skew.as_secs() { - throw!(anyhow!("too much clock skew (client {} by {})", - c_ahead_behind, a_ahead)); - } - } - Ok::<_,AE>(()) - }; - chk_skew(client_time, now, "ahead")?; - chk_skew(now, client_time, "behind")?; - - let initial_remaining = meta.remaining_bytes_len(); - - //eprintln!("boundary={:?} start={} name={:?} client={}", - // boundary, start, &comp.name, &client.ic); - - let (reply_to, reply_recv) = oneshot::channel(); - trace!("{} {} request, Content-Length={}", - &conn, &client_name, length_hint); - let wreq = WebRequest { - initial, - initial_remaining, - length_hint, - boundary_finder: boundary_finder.into_owned(), - body, - warnings: mem::take(&mut warnings), - reply_to, - conn: conn.clone(), - }; - - client.web.try_send(wreq) - .map_err(|_| anyhow!("client task shut down!"))?; - - let reply: WebResponse = reply_recv.await?; - warnings = reply.warnings; - let data = reply.data?; - - if warnings.warnings.is_empty() { - trace!("{} {} responding, {}", - &conn, &client_name, data.len()); - } else { - debug!("{} {} responding, {} warnings={:?}", - &conn, &client_name, data.len(), - &warnings.warnings); - } - - let data = hyper::Body::from(data); - Ok::<_,AE>( - hyper::Response::builder() - .header("Content-Type", r#"application/octet-stream"#) - .body(data) - ) - }.await.unwrap_or_else(|e| { - debug!("{} error {}", &conn, &e); - let mut errmsg = format!("ERROR\n\n{:?}\n\n", &e); - for w in warnings.warnings { - write!(errmsg, "warning: {}\n", w).unwrap(); - } - hyper::Response::builder() - .status(hyper::StatusCode::BAD_REQUEST) - .header("Content-Type", r#"text/plain; charset="utf-8""#) - .body(errmsg.into()) - }) -} - -#[allow(unused_variables)] // xxx -#[allow(unused_mut)] // xxx -async fn run_client(global: Arc, - ic: Arc, - mut web: mpsc::Receiver, - mut routed: mpsc::Receiver) - -> Result -{ - struct Outstanding { - reply_to: oneshot::Sender, - oi: OutstandingInner, - } - #[derive(Debug)] - struct OutstandingInner { - target_requests_outstanding: u32, - } - let mut outstanding: VecDeque = default(); - let downbound: VecDeque<(/*xxx*/)> = default(); - - let try_send_response = | - reply_to: oneshot::Sender, - response: WebResponse - | { - reply_to.send(response) - .unwrap_or_else(|_: WebResponse| () /* oh dear */ /* xxx trace? */); - }; - - loop { - if let Some(ret) = { - if ! downbound.is_empty() { - outstanding.pop_front() - } else if let Some((i,_)) = outstanding.iter().enumerate().find({ - |(_,o)| outstanding.len() > o.oi.target_requests_outstanding.sat() - }) { - Some(outstanding.remove(i).unwrap()) - } else { - None - } - } { - let response = WebResponse { - data: Ok(vec![ /* xxx */ ]), - warnings: default(), - }; - - try_send_response(ret.reply_to, response); - } - - select!{ - req = web.recv() => - { - let WebRequest { - initial, initial_remaining, length_hint, mut body, - boundary_finder, - reply_to, conn, mut warnings, - } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?; - - match async { - - let initial_used = initial.len() - initial_remaining; - - let whole_request = read_limited_bytes( - ic.max_batch_up.sat(), - initial, - length_hint, - &mut body - ).await.context("read request body")?; - - let (meta, mut comps) = - multipart::ComponentIterator::resume_mid_component( - &whole_request[initial_used..], - boundary_finder - ).context("resume parsing body, after auth checks")?; - - let mut meta = MetadataFieldIterator::new(&meta); - - macro_rules! meta { - { $v:ident, ( $( $badcmp:tt )? ), $ret:expr, - let $server:ident, $client:ident $($code:tt)* - } => { - let $v = (||{ - let $server = ic.$v; - let $client $($code)* - $( - if $client $badcmp $server { - throw!(anyhow!("mismatch: client={:?} {} server={:?}", - $client, stringify!($badcmp), $server)); - } - )? - Ok::<_,AE>($ret) - })().context(stringify!($v))?; - //dbg!(&$v); - } - } - - meta!{ - target_requests_outstanding, ( != ), client, - let server, client: u32 = meta.need_parse()?; - } - - meta!{ - http_timeout, ( > ), client, - let server, client = Duration::from_secs(meta.need_parse()?); - } - - meta!{ - mtu, ( != ), client, - let server, client: u32 = meta.parse()?.unwrap_or(server); - } - - meta!{ - max_batch_down, (), min(client, server), - let server, client: u32 = meta.parse()?.unwrap_or(server); - } - - meta!{ - max_batch_up, ( > ), client, - let server, client = meta.parse()?.unwrap_or(server); - } - - while let Some(comp) = comps.next(&mut warnings, PartName::d)? { - if comp.name != PartName::d { - warnings.add(&format_args!("unexpected part {:?}", comp.name))?; - } - checkn(Mime2Slip, mtu, comp.payload, |header| { - let saddr = ip_packet_addr::(header)?; - if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) } - let daddr = ip_packet_addr::(header)?; - Ok(daddr) - }, |(daddr,packet)| route_packet( - &global, &conn, &ic.link.client, daddr,packet - ), - |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; }) - ).await?; - } - - let oi = OutstandingInner { - target_requests_outstanding, - }; - Ok::<_,AE>(oi) - }.await { - Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }), - Err(e) => { - try_send_response(reply_to, WebResponse { - data: Err(e), - warnings, - }); - }, - } - } - } - } - //Err(anyhow!("xxx")) -} - #[tokio::main] async fn main() { let opts = Opts::from_args(); @@ -492,7 +116,7 @@ async fn main() { let global_ = global.clone(); let ic_ = ic.clone(); tasks.push((tokio::spawn(async move { - run_client(global_, ic_, web_recv, route_recv) + sclient::run(global_, ic_, web_recv, route_recv) .await.void_unwrap_err() }), format!("client {}", &ic))); } @@ -504,7 +128,7 @@ async fn main() { let global_ = global_.clone(); let conn = Arc::new(format!("[{}]", conn.remote_addr())); async { Ok::<_, Void>( hyper::service::service_fn(move |req| { - handle(conn.clone(), global_.clone(), req) + sweb::handle(conn.clone(), global_.clone(), req) }) ) } } ); diff --git a/server/sweb.rs b/server/sweb.rs new file mode 100644 index 0000000..157261a --- /dev/null +++ b/server/sweb.rs @@ -0,0 +1,226 @@ +// Copyright 2021 Ian Jackson and contributors to Hippotat +// SPDX-License-Identifier: GPL-3.0-or-later +// There is NO WARRANTY. + +use super::*; + +/// Sent from hyper worker pool task to client task +#[allow(dead_code)] // xxx +#[derive(Debug)] +pub struct WebRequest { + // initial part of body + // used up to and including first 2 lines of metadata + // end delimiter for the metadata not yet located, but in here somewhere + pub initial: Box<[u8]>, + pub initial_remaining: usize, + pub length_hint: usize, + pub body: hyper::body::Body, + pub boundary_finder: multipart::BoundaryFinder, + pub reply_to: oneshot::Sender, + pub warnings: Warnings, + pub conn: Arc, +} + +/// Reply from client task to hyper worker pool task +#[allow(dead_code)] // xxx +#[derive(Debug)] +pub struct WebResponse { + pub warnings: Warnings, + pub data: Result, +} + +pub type WebResponseData = Vec; + +pub async fn handle( + conn: Arc, + global: Arc, + req: hyper::Request +) -> Result, hyper::http::Error> { + if req.method() == Method::GET { + let mut resp = hyper::Response::new(hyper::Body::from("hippotat\r\n")); + resp.headers_mut().insert( + "Content-Type", + "text/plain; charset=US-ASCII".try_into().unwrap() + ); + return Ok(resp) + } + + let mut warnings: Warnings = default(); + + async { + + let get_header = |hn: &str| { + let mut values = req.headers().get_all(hn).iter(); + let v = values.next().ok_or_else(|| anyhow!("missing {}", hn))?; + if values.next().is_some() { throw!(anyhow!("multiple {}!", hn)); } + let v = v.to_str().context(anyhow!("interpret {} as UTF-8", hn))?; + Ok::<_,AE>(v) + }; + + let mkboundary = |b: &'_ _| format!("\n--{}", b).into_bytes(); + let boundary = match (||{ + let t = get_header("Content-Type")?; + let t: mime::Mime = t.parse().context("parse Content-Type")?; + if t.type_() != "multipart" { throw!(anyhow!("not multipart/")) } + let b = mime::BOUNDARY; + let b = t.get_param(b).ok_or_else(|| anyhow!("missing boundary=..."))?; + if t.subtype() != "form-data" { + warnings.add(&"Content-Type not /form-data")?; + } + let b = mkboundary(b.as_str()); + Ok::<_,AE>(b) + })() { + Ok(y) => y, + Err(e) => { + warnings.add(&e.wrap_err("guessing boundary"))?; + mkboundary("b") + }, + }; + + let length_hint: usize = (||{ + let clength = get_header("Content-Length")?; + let clength = clength.parse().context("parse Content-Length")?; + Ok::<_,AE>(clength) + })().unwrap_or_else( + |e| { let _ = warnings.add(&e.wrap_err("parsing Content-Length")); 0 } + ); + + let mut body = req.into_body(); + let initial = match read_limited_bytes( + METADATA_MAX_LEN, default(), length_hint, &mut body + ).await { + Ok(all) => all, + Err(ReadLimitedError::Truncated { sofar,.. }) => sofar, + Err(ReadLimitedError::Hyper(e)) => throw!(e), + }; + + let boundary_finder = memmem::Finder::new(&boundary); + let mut boundary_iter = boundary_finder.find_iter(&initial); + + let start = if initial.starts_with(&boundary[1..]) { boundary.len()-1 } + else if let Some(start) = boundary_iter.next() { start + boundary.len() } + else { throw!(anyhow!("initial boundary not found")) }; + + let comp = multipart::process_boundary + (&mut warnings, &initial[start..], PartName::m)? + .ok_or_else(|| anyhow!(r#"no "m" component"#))?; + + if comp.name != PartName::m { throw!(anyhow!( + r#"first multipart component must be name="m""# + )) } + + let mut meta = MetadataFieldIterator::new(comp.payload); + + let client: ClientName = meta.need_parse().context("client addr")?; + + let mut hmac_got = [0; HMAC_L]; + let (client_time, hmac_got_l) = (||{ + let token: &str = meta.need_next().context(r#"find in "m""#)?; + let (time_t, hmac_b64) = token.split_once(' ') + .ok_or_else(|| anyhow!("split"))?; + let time_t = u64::from_str_radix(time_t, 16).context("parse time_t")?; + let l = io::copy( + &mut base64::read::DecoderReader::new(&mut hmac_b64.as_bytes(), + BASE64_CONFIG), + &mut &mut hmac_got[..] + ).context("parse b64 token")?; + let l = l.try_into()?; + Ok::<_,AE>((time_t, l)) + })().context("token")?; + let hmac_got = &hmac_got[0..hmac_got_l]; + + let client_name = client; + let client = global.all_clients.get(&client_name); + + // We attempt to hide whether the client exists we don't try to + // hide the hash lookup computationgs, but we do try to hide the + // HMAC computation by always doing it. We hope that the compiler + // doesn't produce a specialised implementation for the dummy + // secret value. + let client_exists = subtle::Choice::from(client.is_some() as u8); + let secret = client.map(|c| c.ic.secret.0.as_bytes()); + let secret = secret.unwrap_or(&[0x55; HMAC_B][..]); + let client_time_s = format!("{:x}", client_time); + let hmac_exp = token_hmac(secret, client_time_s.as_bytes()); + // We also definitely want a consttime memeq for the hmac value + let hmac_ok = hmac_got.ct_eq(&hmac_exp); + //dbg!(DumpHex(&hmac_exp), client.is_some()); + //dbg!(DumpHex(hmac_got), hmac_ok, client_exists); + if ! bool::from(hmac_ok & client_exists) { + debug!("{} rejected client {}", &conn, &client_name); + let body = hyper::Body::from("Not authorised\r\n"); + return Ok( + hyper::Response::builder() + .status(hyper::StatusCode::FORBIDDEN) + .header("Content-Type", r#"text/plain; charset="utf-8""#) + .body(body) + ) + } + + let client = client.unwrap(); + let now = time_t_now(); + let chk_skew = |a: u64, b: u64, c_ahead_behind| { + if let Some(a_ahead) = a.checked_sub(b) { + if a_ahead > client.ic.max_clock_skew.as_secs() { + throw!(anyhow!("too much clock skew (client {} by {})", + c_ahead_behind, a_ahead)); + } + } + Ok::<_,AE>(()) + }; + chk_skew(client_time, now, "ahead")?; + chk_skew(now, client_time, "behind")?; + + let initial_remaining = meta.remaining_bytes_len(); + + //eprintln!("boundary={:?} start={} name={:?} client={}", + // boundary, start, &comp.name, &client.ic); + + let (reply_to, reply_recv) = oneshot::channel(); + trace!("{} {} request, Content-Length={}", + &conn, &client_name, length_hint); + let wreq = WebRequest { + initial, + initial_remaining, + length_hint, + boundary_finder: boundary_finder.into_owned(), + body, + warnings: mem::take(&mut warnings), + reply_to, + conn: conn.clone(), + }; + + client.web.try_send(wreq) + .map_err(|_| anyhow!("client task shut down!"))?; + + let reply: WebResponse = reply_recv.await?; + warnings = reply.warnings; + let data = reply.data?; + + if warnings.warnings.is_empty() { + trace!("{} {} responding, {}", + &conn, &client_name, data.len()); + } else { + debug!("{} {} responding, {} warnings={:?}", + &conn, &client_name, data.len(), + &warnings.warnings); + } + + let data = hyper::Body::from(data); + Ok::<_,AE>( + hyper::Response::builder() + .header("Content-Type", r#"application/octet-stream"#) + .body(data) + ) + }.await.unwrap_or_else(|e| { + debug!("{} error {}", &conn, &e); + let mut errmsg = format!("ERROR\n\n{:?}\n\n", &e); + for w in warnings.warnings { + write!(errmsg, "warning: {}\n", w).unwrap(); + } + hyper::Response::builder() + .status(hyper::StatusCode::BAD_REQUEST) + .header("Content-Type", r#"text/plain; charset="utf-8""#) + .body(errmsg.into()) + }) +} -- 2.30.2