chiark / gitweb /
client: rename saddr variable
[hippotat.git] / src / bin / client.rs
index 095cd556a58ad2bb4d21c71d427298455fdda167..14d81ce2a79e4b567a0988c8d1a01d96d43375b9 100644 (file)
@@ -5,8 +5,6 @@
 use hippotat::prelude::*;
 use hippotat_macros::into_crlfs;
 
-const MAX_BATCH_DOWN_RESP_OVERHEAD: usize = 10_000;
-
 #[derive(StructOpt,Debug)]
 pub struct Opts {
   #[structopt(flatten)]
@@ -47,12 +45,10 @@ fn submit_request<'r, 'c:'r, C:HCC>(
     .saturating_add(Duration::from_nanos(999_999_999))
     .as_secs();
 
-  let time_t = SystemTime::now()
-    .duration_since(UNIX_EPOCH)
-    .unwrap_or_else(|_| Duration::default()) // clock is being weird
-    .as_secs();
+  let time_t = time_t_now();
   let time_t = format!("{:x}", time_t);
   let hmac = token_hmac(c.ic.secret.0.as_bytes(), time_t.as_bytes());
+  //dbg!(DumpHex(&hmac));
   let mut token = time_t;
   write!(token, " ").unwrap();
   base64::encode_config_buf(&hmac, BASE64_CONFIG, &mut token);
@@ -68,12 +64,16 @@ fn submit_request<'r, 'c:'r, C:HCC>(
        {}
        {}
        {}
+       {}
+       {}
        {}"#),
                        &c.ic.link.client,
                        token,
                        c.ic.target_requests_outstanding,
                        show_timeout,
+                       c.ic.mtu,
                        c.ic.max_batch_down,
+                       c.ic.max_batch_up,
   );
 
   let prefix2 = format!(into_crlfs!(
@@ -116,7 +116,7 @@ fn submit_request<'r, 'c:'r, C:HCC>(
     as_ref,
   ).map(|b| b.len()).sum();
 
-  trace!("{} #{}: req; tx bytes={} frames={}",
+  trace!("{} #{}: req; tx body_len={} frames={}",
          &c.ic, req_num, body_len, upbound.len());
 
   let body = hyper::body::Body::wrap_stream(
@@ -140,9 +140,12 @@ fn submit_request<'r, 'c:'r, C:HCC>(
     let r = async { tokio::time::timeout( c.ic.effective_http_timeout, async {
       let resp = resp.await.context("make request")?;
       let status = resp.status();
-      let resp = resp.into_body();
-      let max_body = c.ic.max_batch_down.sat() + MAX_BATCH_DOWN_RESP_OVERHEAD;
-      let resp = read_limited_body(max_body, resp).await?;
+      let mut resp = resp.into_body();
+      let max_body = c.ic.max_batch_down.sat() + MAX_OVERHEAD;
+      let resp = read_limited_bytes(
+        max_body, default(), default(), &mut resp
+      ).await
+        .discard_data().context("fetching response body")?;
 
       if ! status.is_success() {
         throw!(anyhow!("HTTP error status={} body={:?}",
@@ -179,30 +182,10 @@ async fn run_client<C:HCC>(
     ic: &ic,
   };
 
-  let mut ipif = tokio::process::Command::new("sh")
-    .args(&["-c", &ic.ipif])
-    .stdin (process::Stdio::piped())
-    .stdout(process::Stdio::piped())
-    .stderr(process::Stdio::piped())
-    .kill_on_drop(true)
-    .spawn().context("spawn ipif")?;
-  
-  let stderr = ipif.stderr.take().unwrap();
-  let ic_name = ic.to_string();
-  let stderr_task = task::spawn(async move {
-    let mut stderr = tokio::io::BufReader::new(stderr).lines();
-    while let Some(l) = stderr.next_line().await? {
-      error!("{}: ipif stderr: {}", &ic_name, l.trim_end());
-    }
-    Ok::<_,io::Error>(())
-  });
+  let mut ipif = Ipif::start(&ic.ipif, Some(ic.to_string()))?;
 
   let mut req_num: ReqNum = 0;
 
-  let tx_stream = ipif.stdout.take().unwrap();
-  let mut rx_stream = ipif.stdin .take().unwrap();
-
-  let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_END);
   let mut tx_queue: VecDeque<TxQueued> = default();
   let mut upbound = Frames::default();
 
@@ -223,7 +206,7 @@ async fn run_client<C:HCC>(
       select! {
         biased;
 
-        y = rx_stream.write_all_buf(&mut rx_queue),
+        y = ipif.rx.write_all_buf(&mut rx_queue),
         if ! rx_queue.is_empty() =>
         {
           let () = y.context("write rx data to ipif")?;
@@ -238,17 +221,17 @@ async fn run_client<C:HCC>(
           let _ = tx_queue.pop_front();
         },
 
-        data = tx_stream.next_segment(),
+        data = ipif.tx.next_segment(),
         if tx_queue.is_empty() =>
         {
-          let data =
-            data.context("read from ipif")?
-            .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
+          let data = (||{
+            data?.ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))
+          })().context("read from ipif")?;
           //eprintln!("data={:?}", DumpHex(&data));
 
           match check1(Slip2Mime, ic.mtu, &data, |header| {
-            let addr = ip_packet_addr::<false>(header)?;
-            if addr != ic.link.client.0 { throw!(PE::Src(addr)) }
+            let saddr = ip_packet_addr::<false>(header)?;
+            if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
             Ok(())
           }) {
             Ok(data) => tx_queue.push_back(TxQueued {
@@ -267,7 +250,10 @@ async fn run_client<C:HCC>(
         {
           while let Some(TxQueued { data, expires }) = tx_queue.pop_front() {
             match upbound.add(ic.max_batch_up, data.into()/*todo:504*/) {
-              Err(data) => { tx_queue.push_front(TxQueued { data: data.into(), expires }); break; }
+              Err(data) => {
+                tx_queue.push_front(TxQueued { data: data.into(), expires });
+                break;
+              }
               Ok(()) => { },
             }
           }
@@ -290,14 +276,23 @@ async fn run_client<C:HCC>(
           reqs.swap_remove(goti);
 
           if let Some(got) = got {
-            reporter.lock().success();
+            
             //eprintln!("got={:?}", DumpHex(&got));
-            checkn(SlipNoConv,ic.mtu, &got, &mut rx_queue, |header| {
+            match checkn(SlipNoConv,ic.mtu, &got, |header| {
               let addr = ip_packet_addr::<true>(header)?;
               if addr != ic.link.client.0 { throw!(PE::Dst(addr)) }
               Ok(())
-            }, |e| error!("{} #{}: rx discarding: {}", &ic, req_num, e));
-          
+            }, |o| rx_queue.push(o),
+               |e| error!("{} #{}: rx discarding: {}", &ic, req_num, e))
+            {
+              Ok(()) => reporter.lock().success(),
+              Err(ErrorOnlyBad) => {
+                reqs.push(Box::pin(async {
+                  tokio::time::sleep(ic.http_retry).await;
+                  None
+                }));
+              },
+            }
           }
         },
 
@@ -312,39 +307,20 @@ async fn run_client<C:HCC>(
     }
   }.await;
 
-  drop(tx_stream);
-
-  match ipif.wait().await {
-    Err(e) => error!("{}: also, failed to await ipif child: {}", &ic, e),
-    Ok(st) => {
-      let stderr_timeout = Duration::from_millis(1000);
-      match tokio::time::timeout(stderr_timeout, stderr_task).await {
-        Err::<_,tokio::time::error::Elapsed>(_)
-          => warn!("{}: ipif stderr task continues!", &ic),
-        Ok(Err(e)) => error!("{}: ipif stderr task crashed: {}", &ic, e),
-        Ok(Ok(Err(e))) => error!("{}: ipif stderr read failed: {}", &ic, e),
-        Ok(Ok(Ok(()))) => { },
-      }
-      if ! st.success() {
-        error!("{}: ipif process failed: {}", &ic, st);
-      }
-    }
-  }
-
+  ipif.quitting(Some(&ic)).await;
   trouble
 }
 
 #[tokio::main]
-async fn main() -> Result<(), AE> {
+async fn main() {
   let opts = Opts::from_args();
-
-  let ics = config::read(&opts.config, LinkEnd::Client)?;
-  if ics.is_empty() { throw!(anyhow!("no associations with server(s)")); }
-
-  opts.log.log_init()?;
+  let (ics,) = config::startup("hippotat", LinkEnd::Client,
+                               &opts.config, &opts.log, |ics|Ok((ics,)));
 
   let https = HttpsConnector::new();
-  let hclient = hyper::Client::builder().build::<_, hyper::Body>(https);
+  let hclient = hyper::Client::builder()
+    .http1_preserve_header_case(true)
+    .build::<_, hyper::Body>(https);
   let hclient = Arc::new(hclient);
 
   info!("starting");
@@ -358,7 +334,7 @@ async fn main() -> Result<(), AE> {
       });
       match join.await {
         Ok(e) => {
-          error!("{} failed: {:?}", &assocname, e);
+          error!("{} failed: {}", &assocname, e);
         },
         Err(je) => {
           error!("{} panicked!", &assocname);